"""
Main UBX Message Protocol Class.
Created on 26 Sep 2020
:author: semuadmin
:copyright: SEMU Consulting © 2020
:license: BSD 3-Clause
"""
# pylint: disable=invalid-name
import struct
import pyubx2.exceptions as ube
import pyubx2.ubxtypes_core as ubt
import pyubx2.ubxtypes_get as ubg
import pyubx2.ubxtypes_poll as ubp
import pyubx2.ubxtypes_set as ubs
from pyubx2.ubxhelpers import (
attsiz,
bytes2val,
calc_checksum,
cfgkey2name,
cfgname2key,
escapeall,
gnss2str,
itow2utc,
msgclass2bytes,
msgstr2bytes,
nomval,
val2bytes,
)
[docs]
class UBXMessage:
"""UBX Message Class."""
[docs]
def __init__(
self,
ubxClass,
ubxID,
msgmode: int,
parsebitfield: bool = True,
scaling: bool = True,
**kwargs,
):
"""Constructor.
If no keyword parms are passed, the payload is taken to be empty.
If 'payload' is passed as a keyword parm, this is taken to contain the complete
payload as a sequence of bytes; any other keyword parms are ignored.
Otherwise, any named attributes will be assigned the value given, all others will
be assigned a nominal value according to type.
:param object msgClass: message class as str, int or byte
:param object msgID: message ID as str, int or byte
:param int msgmode: message mode (0=GET, 1=SET, 2=POLL)
:param bool parsebitfield: parse bitfields ('X' type attributes) Y/N
:param bool scaling: apply scale factors Y/N
:param kwargs: optional payload keyword arguments
:raises: UBXMessageError
"""
# object is mutable during initialisation only
super().__setattr__("_immutable", False)
self._mode = msgmode
self._payload = b""
self._length = b""
self._checksum = b""
self._parsebf = parsebitfield # parsing bitfields Y/N?
self._scaling = scaling # apply scale factors Y/N?
if msgmode not in (ubt.GET, ubt.SET, ubt.POLL):
raise ube.UBXMessageError(f"Invalid msgmode {msgmode} - must be 0, 1 or 2")
# accommodate different formats of msgClass and msgID
if isinstance(ubxClass, str) and isinstance(
ubxID, str
): # string e.g. 'CFG', 'CFG-PRT'
(self._ubxClass, self._ubxID) = msgstr2bytes(ubxClass, ubxID)
elif isinstance(ubxClass, int) and isinstance(ubxID, int): # int e.g. 6, 1
(self._ubxClass, self._ubxID) = msgclass2bytes(ubxClass, ubxID)
else: # bytes e.g. b'\x06', b'\x01'
self._ubxClass = ubxClass
self._ubxID = ubxID
self._do_attributes(**kwargs)
self._immutable = True # once initialised, object is immutable
def _do_attributes(self, **kwargs):
"""
Populate UBXMessage from named attribute keywords.
Where a named attribute is absent, set to a nominal value (zeros or blanks).
:param kwargs: optional payload key/value pairs
:raises: UBXTypeError
"""
offset = 0 # payload offset in bytes
index = [] # array of (nested) group indices
try:
if len(kwargs) == 0: # if no kwargs, assume null payload
self._payload = None
else:
self._payload = kwargs.get("payload", b"")
pdict = self._get_dict(**kwargs) # get appropriate payload dict
for anam in pdict: # process each attribute in dict
(offset, index) = self._set_attribute(
anam, pdict, offset, index, **kwargs
)
self._do_len_checksum()
except (
AttributeError,
struct.error,
TypeError,
ValueError,
) as err:
raise ube.UBXTypeError(
(
f"Incorrect type for attribute '{anam}' "
f"in {['GET', 'SET', 'POLL'][self._mode]} message "
f"class {self.identity}"
)
) from err
except (OverflowError,) as err:
raise ube.UBXTypeError(
(
f"Overflow error for attribute '{anam}' "
f"in {['GET', 'SET', 'POLL'][self._mode]} message "
f"class {self.identity}"
)
) from err
def _set_attribute(
self, anam: str, pdict: dict, offset: int, index: list, **kwargs
) -> tuple:
"""
Recursive routine to set individual or grouped payload attributes.
:param str anam: attribute name
:param dict pdict: dict representing payload definition
:param int offset: payload offset in bytes
:param list index: repeating group index array
:param kwargs: optional payload key/value pairs
:return: (offset, index[])
:rtype: tuple
"""
adef = pdict[anam] # get attribute definition
if isinstance(
adef, tuple
): # repeating group of attributes or subdefined bitfield
numr, _ = adef
if numr in (ubt.X1, ubt.X2, ubt.X4, ubt.X6, ubt.X8, ubt.X24): # bitfield
if self._parsebf: # if we're parsing bitfields
(offset, index) = self._set_attribute_bitfield(
adef, offset, index, **kwargs
)
else: # treat bitfield as a single byte array
offset = self._set_attribute_single(
anam, numr, offset, index, **kwargs
)
else: # repeating group of attributes
(offset, index) = self._set_attribute_group(
adef, offset, index, **kwargs
)
else: # single attribute
offset = self._set_attribute_single(anam, adef, offset, index, **kwargs)
return (offset, index)
def _set_attribute_group(
self, adef: tuple, offset: int, index: list, **kwargs
) -> tuple:
"""
Process (nested) group of attributes.
:param tuple adef: attribute definition - tuple of (num repeats, attribute dict)
:param int offset: payload offset in bytes
:param list index: repeating group index array
:param kwargs: optional payload key/value pairs
:return: (offset, index[])
:rtype: tuple
"""
index.append(0) # add a (nested) group index
anam, gdict = adef # attribute signifying group size, group dictionary
# if CFG-VALGET or CFG-VALSET message, use dedicated method to
# parse as configuration key value pairs
if self._ubxClass == b"\x06" and (
(self._ubxID == b"\x8b" and self._mode == ubt.GET)
or (self._ubxID == b"\x8a" and self._mode == ubt.SET)
):
self._set_attribute_cfgval(offset, **kwargs)
else:
# derive or retrieve number of items in group
if isinstance(anam, int): # fixed number of repeats
gsiz = anam
elif anam == "None": # number of repeats 'variable by size'
gsiz = self._calc_num_repeats(gdict, self._payload, offset, 0)
else: # number of repeats is defined in named attribute
gsiz = getattr(self, anam)
# special handling for ESF-MEAS message types
if (
self._ubxClass == b"\x10"
and self._ubxID == b"\x02"
and self._mode == ubt.SET
):
if getattr(self, "calibTtagValid", 0):
gsiz += 1
# recursively process each group attribute,
# incrementing the payload offset and index as we go
for i in range(gsiz):
index[-1] = i + 1
for key1 in gdict:
(offset, index) = self._set_attribute(
key1, gdict, offset, index, **kwargs
)
index.pop() # remove this (nested) group index
return (offset, index)
def _set_attribute_single(
self, anam: str, adef: object, offset: int, index: list, **kwargs
) -> int:
"""
Set individual attribute value, applying scaling where appropriate.
:param str anam: attribute keyword
EITHER
:param str adef: attribute definition string e.g. 'U002'
OR
:param list adef: if scaled, list of [attribute type string, scaling factor float]
:param int offset: payload offset in bytes
:param list index: repeating group index array
:param kwargs: optional payload key/value pairs
:return: offset
:rtype: int
"""
# pylint: disable=no-member
# if attribute is scaled
ares = 1
if isinstance(adef, list) and self._scaling:
ares = adef[1] # attribute resolution (i.e. scaling factor)
adef = adef[0] # attribute definition
# if attribute is part of a (nested) repeating group, suffix name with index
anami = anam
for i in index: # one index for each nested level
if i > 0:
anami += f"_{i:02d}"
# determine attribute size (bytes)
if adef == ubt.CH: # variable length string
asiz = len(self._payload)
else:
asiz = attsiz(adef)
# if payload keyword has been provided,
# use the appropriate offset of the payload
if "payload" in kwargs:
valb = self._payload[offset : offset + asiz]
if ares == 1:
val = bytes2val(valb, adef)
else:
val = round(bytes2val(valb, adef) * ares, ubt.SCALROUND)
else:
# if individual keyword has been provided,
# set to provided value, else set to
# nominal value
val = kwargs.get(anami, nomval(adef))
if ares == 1:
valb = val2bytes(val, adef)
else:
valb = val2bytes(int(val / ares), adef)
self._payload += valb
if anami[0:3] == "_HP": # high precision component of earlier attribute
# add standard and high precision values in a single attribute
setattr(
self, anami[3:], round(getattr(self, anami[3:]) + val, ubt.SCALROUND)
)
else:
setattr(self, anami, val)
offset += asiz
return offset
def _set_attribute_bitfield(
self, atyp: str, offset: int, index: list, **kwargs
) -> tuple:
"""
Parse bitfield attribute (type 'X').
:param str atyp: attribute type e.g. 'X002'
:param int offset: payload offset in bytes
:param list index: repeating group index array
:param kwargs: optional payload key/value pairs
:return: (offset, index[])
:rtype: tuple
"""
# pylint: disable=no-member
btyp, bdict = atyp # type of bitfield, bitfield dictionary
bsiz = attsiz(btyp) # size of bitfield in bytes
bfoffset = 0
# if payload keyword has been provided,
# use the appropriate offset of the payload
if "payload" in kwargs:
bitfield = int.from_bytes(self._payload[offset : offset + bsiz], "little")
else:
bitfield = 0
# process each flag in bitfield
for key, keyt in bdict.items():
(bitfield, bfoffset) = self._set_attribute_bits(
bitfield, bfoffset, key, keyt, index, **kwargs
)
# update payload
offset += bsiz
if "payload" not in kwargs:
self._payload += bitfield.to_bytes(bsiz, "little")
return (offset, index)
def _set_attribute_bits(
self,
bitfield: int,
bfoffset: int,
key: str,
keyt: str,
index: list,
**kwargs,
) -> tuple:
"""
Set individual bit flag from bitfield.
:param int bitfield: bitfield
:param int bfoffset: bitfield offset in bits
:param str key: attribute key name
:param str keyt: key type e.g. 'U001'
:param list index: repeating group index array
:param kwargs: optional payload key/value pairs
:return: (bitfield, bfoffset)
:rtype: tuple
"""
# pylint: disable=no-member
# if attribute is part of a (nested) repeating group, suffix name with index
keyr = key
for i in index: # one index for each nested level
if i > 0:
keyr += f"_{i:02d}"
atts = attsiz(keyt) # determine flag size in bits
if "payload" in kwargs:
mask = pow(2, atts) - 1
val = (bitfield >> bfoffset) & mask
else:
val = kwargs.get(keyr, 0)
bitfield = bitfield | (val << bfoffset)
if key[0:8] != "reserved": # don't bother to set reserved bits
setattr(self, keyr, val)
bfoffset += atts
return (bitfield, bfoffset)
def _set_attribute_cfgval(self, offset: int, **kwargs):
"""
Parse CFG-VALGET / CFG-VALSET payload to set of configuration
key value pairs.
:param int offset: payload offset
:param **kwargs: optional payload key/value pairs
:raises: UBXMessageError
"""
KEYLEN = 4
if "payload" in kwargs:
self._payload = kwargs["payload"]
else:
raise ube.UBXMessageError(
"CFG-VALGET message definitions must include payload keyword"
)
cfglen = len(self._payload[offset:])
i = 0
while offset < cfglen:
if i == KEYLEN:
key = int.from_bytes(
self._payload[offset : offset + KEYLEN], "little", signed=False
)
(keyname, att) = cfgkey2name(key)
atts = attsiz(att)
valb = self._payload[offset + KEYLEN : offset + KEYLEN + atts]
val = bytes2val(valb, att)
setattr(self, keyname, val)
i = 0
offset += KEYLEN + atts
else:
i += 1
def _do_len_checksum(self):
"""
Calculate and format payload length and checksum as bytes."""
if self._payload is None:
self._length = val2bytes(0, ubt.U2)
self._checksum = calc_checksum(self._ubxClass + self._ubxID + self._length)
else:
self._length = val2bytes(len(self._payload), ubt.U2)
self._checksum = calc_checksum(
self._ubxClass + self._ubxID + self._length + self._payload
)
def _get_dict(self, **kwargs) -> dict:
"""
Get payload dictionary corresponding to message mode (GET/SET/POLL)
Certain message types need special handling as alternate payload
definitions exist for the same ubxClass/ubxID.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
"""
try:
if self._mode == ubt.POLL:
# CFG-TP5 POLL
if self._ubxClass == b"\x06" and self._ubxID == b"\x31":
pdict = self._get_cfgtp5_version(**kwargs)
else:
pdict = ubp.UBX_PAYLOADS_POLL[self.identity]
elif self._mode == ubt.SET:
# MGA SET
if self._ubxClass == b"\x13" and self._ubxID != b"\x80":
pdict = self._get_mga_version(ubt.SET, **kwargs)
# RXM-PMP SET
elif self._ubxClass == b"\x02" and self._ubxID == b"\x72":
pdict = self._get_rxmpmp_version(**kwargs)
# RXM-PMREQ SET
elif self._ubxClass == b"\x02" and self._ubxID == b"\x41":
pdict = self._get_rxmpmreq_version(**kwargs)
# TIM-VCOCAL SET
elif self._ubxClass == b"\x0d" and self._ubxID == b"\x15":
pdict = self._get_timvcocal_version(**kwargs)
# CFG-DAT SET
elif self._ubxClass == b"\x06" and self._ubxID == b"\x06":
pdict = self._get_cfgdat_version(**kwargs)
else:
pdict = ubs.UBX_PAYLOADS_SET[self.identity]
else:
# MGA GET
if self._ubxClass == b"\x13" and self._ubxID != b"\x80":
pdict = self._get_mga_version(ubt.GET, **kwargs)
# RXM-PMP GET
elif self._ubxClass == b"\x02" and self._ubxID == b"\x72":
pdict = self._get_rxmpmp_version(**kwargs)
# RXM-RLM GET
elif self._ubxClass == b"\x02" and self._ubxID == b"\x59":
pdict = self._get_rxmrlm_version(**kwargs)
# CFG-NMEA GET
elif self._ubxClass == b"\x06" and self._ubxID == b"\x17":
pdict = self._get_cfgnmea_version(**kwargs)
# NAV-AOPSTATUS GET
elif self._ubxClass == b"\x01" and self._ubxID == b"\x60":
pdict = self._get_aopstatus_version(**kwargs)
# NAV-RELPOSNED GET
elif self._ubxClass == b"\x01" and self._ubxID == b"\x3C":
pdict = self._get_relposned_version(**kwargs)
# Unknown GET message, parsed to nominal definition
elif self.identity[-7:] == "NOMINAL":
pdict = {}
else:
pdict = ubg.UBX_PAYLOADS_GET[self.identity]
return pdict
except KeyError as err:
raise KeyError(
f"{err} - Check 'msgmode' keyword argument is appropriate for data stream"
) from err
def _get_cfgtp5_version(self, **kwargs) -> dict:
"""
Select appropriate CFG-TP5 POLL payload definition by checking
presence of tpIdx or payload argument.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
"""
lp = 0
if "payload" in kwargs:
lp = len(kwargs["payload"])
elif "tpIdx" in kwargs:
lp = 1
if lp == 1:
pdict = ubp.UBX_PAYLOADS_POLL["CFG-TP5-TPX"]
else:
pdict = ubp.UBX_PAYLOADS_POLL["CFG-TP5"]
return pdict
def _get_mga_version(self, mode: int, **kwargs) -> dict:
"""
Select appropriate MGA payload definition by checking
value of 'type' attribute (1st byte of payload).
:param str mode: mode (0=GET, 1=SET, 2=POLL)
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""
if "type" in kwargs:
typ = val2bytes(kwargs["type"], ubt.U1)
elif "payload" in kwargs:
typ = kwargs["payload"][0:1]
else:
raise ube.UBXMessageError(
"MGA message definitions must include type or payload keyword"
)
identity = ubt.UBX_MSGIDS[self._ubxClass + self._ubxID + typ]
if mode == ubt.SET:
pdict = ubs.UBX_PAYLOADS_SET[identity]
else:
pdict = ubg.UBX_PAYLOADS_GET[identity]
return pdict
def _get_rxmpmreq_version(self, **kwargs) -> dict:
"""
Select appropriate RXM-PMREQ payload definition by checking
the 'version' keyword or payload length.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""
lpd = 0
if "version" in kwargs: # assume longer version
lpd = 16
elif "payload" in kwargs:
lpd = len(kwargs["payload"])
else:
raise ube.UBXMessageError(
"RXM-PMREQ message definitions must include version or payload keyword"
)
if lpd == 16:
pdict = ubs.UBX_PAYLOADS_SET["RXM-PMREQ"] # long
else:
pdict = ubs.UBX_PAYLOADS_SET["RXM-PMREQ-S"] # short
return pdict
def _get_rxmpmp_version(self, **kwargs) -> dict:
"""
Select appropriate RXM-PMP payload definition by checking
value of 'version' attribute (1st byte of payload).
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""
if "version" in kwargs:
ver = val2bytes(kwargs["version"], ubt.U1)
elif "payload" in kwargs:
ver = kwargs["payload"][0:1]
else:
raise ube.UBXMessageError(
"RXM-PMP message definitions must include version or payload keyword"
)
if ver == b"\x00":
pdict = ubs.UBX_PAYLOADS_SET["RXM-PMP-V0"]
else:
pdict = ubs.UBX_PAYLOADS_SET["RXM-PMP-V1"]
return pdict
def _get_rxmrlm_version(self, **kwargs) -> dict:
"""
Select appropriate RXM-RLM payload definition by checking
value of 'type' attribute (2nd byte of payload).
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""
if "type" in kwargs:
typ = val2bytes(kwargs["type"], ubt.U1)
elif "payload" in kwargs:
typ = kwargs["payload"][1:2]
else:
raise ube.UBXMessageError(
"RXM-RLM message definitions must include type or payload keyword"
)
if typ == b"\x01":
pdict = ubg.UBX_PAYLOADS_GET["RXM-RLM-S"] # short
else:
pdict = ubg.UBX_PAYLOADS_GET["RXM-RLM-L"] # long
return pdict
def _get_cfgnmea_version(self, **kwargs) -> dict:
"""
Select appropriate payload definition version for older
generations of CFG-NMEA message by checking payload length.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""
if "payload" in kwargs:
lpd = len(kwargs["payload"])
else:
raise ube.UBXMessageError(
"CFG-NMEA message definitions must include payload keyword"
)
if lpd == 4:
pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEAvX"]
elif lpd == 12:
pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEAv0"]
else:
pdict = ubg.UBX_PAYLOADS_GET["CFG-NMEA"]
return pdict
def _get_aopstatus_version(self, **kwargs) -> dict:
"""
Select appropriate payload definition version for older
generations of NAV-AOPSTATUS message by checking payload length.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""
if "payload" in kwargs:
lpd = len(kwargs["payload"])
else:
raise ube.UBXMessageError(
"NAV-AOPSTATUS message definitions must include payload keyword"
)
if lpd == 20:
pdict = ubg.UBX_PAYLOADS_GET["NAV-AOPSTATUS-L"]
else:
pdict = ubg.UBX_PAYLOADS_GET["NAV-AOPSTATUS"]
return pdict
def _get_relposned_version(self, **kwargs) -> dict:
"""
Select appropriate NAV-RELPOSNED payload definition by checking
value of 'version' attribute (1st byte of payload).
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""
if "version" in kwargs:
ver = val2bytes(kwargs["version"], ubt.U1)
elif "payload" in kwargs:
ver = kwargs["payload"][0:1]
else:
raise ube.UBXMessageError(
"NAV-RELPOSNED message definitions must include version or payload keyword"
)
if ver == b"\x00":
pdict = ubg.UBX_PAYLOADS_GET["NAV-RELPOSNED-V0"]
else:
pdict = ubg.UBX_PAYLOADS_GET["NAV-RELPOSNED"]
return pdict
def _get_timvcocal_version(self, **kwargs) -> dict:
"""
Select appropriate TIM-VCOCAL SET payload definition by checking
the payload length.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
:raises: UBXMessageError
"""
lpd = 1
typ = 0
if "type" in kwargs:
typ = kwargs["type"]
elif "payload" in kwargs:
lpd = len(kwargs["payload"])
else:
raise ube.UBXMessageError(
"TIM-VCOCAL SET message definitions must include type or payload keyword"
)
if lpd == 1 and typ == 0:
pdict = ubs.UBX_PAYLOADS_SET["TIM-VCOCAL-V0"] # stop cal
else:
pdict = ubs.UBX_PAYLOADS_SET["TIM-VCOCAL"] # cal
return pdict
def _get_cfgdat_version(self, **kwargs) -> dict:
"""
Select appropriate CFG-DAT SET payload definition by checking
presence of datumNum keyword or payload length of 2 bytes.
:param kwargs: optional payload key/value pairs
:return: dictionary representing payload definition
:rtype: dict
"""
lpd = 0
if "payload" in kwargs:
lpd = len(kwargs["payload"])
if lpd == 2 or "datumNum" in kwargs:
pdict = ubs.UBX_PAYLOADS_SET["CFG-DAT-NUM"] # datum num set
else:
pdict = ubs.UBX_PAYLOADS_SET["CFG-DAT"] # manual datum set
return pdict
def _calc_num_repeats(
self, attd: dict, payload: bytes, offset: int, offsetend: int = 0
) -> int:
"""
Deduce number of items in 'variable by size' repeating group by
dividing length of remaining payload by length of group.
This is predicated on there being only one such repeating group
per message payload, which is true for all currently supported types.
:param dict attd: grouped attribute dictionary
:param bytes payload : raw payload
:param int offset: number of bytes in payload before repeating group
:param int offsetend: number of bytes in payload after repeating group
:return: number of repeats
:rtype: int
"""
lenpayload = len(payload) - offset - offsetend
lengroup = 0
for _, val in attd.items():
if isinstance(val, tuple):
val, _ = val
lengroup += attsiz(val)
return int(lenpayload / lengroup)
def __str__(self) -> str:
"""
Human readable representation.
:return: human readable representation
:rtype: str
"""
clsid = None
msgid = None
umsg_name = self.identity
if self.payload is None:
return f"<UBX({umsg_name})>"
if self.identity[-7:] == "NOMINAL":
return f"<UBX({umsg_name}, payload={escapeall(self._payload)})>"
stg = f"<UBX({umsg_name}, "
for i, att in enumerate(self.__dict__):
if att[0] != "_": # only show public attributes
val = self.__dict__[att]
# escape all byte chars
if (
isinstance(val, bytes)
and att not in ("datumName",)
and self.identity != "MON-VER"
):
val = escapeall(val)
if att[0:6] == "gnssId": # attribute is a GNSS ID
val = gnss2str(val) # get string representation e.g. 'GPS'
if att == "iTOW": # attribute is a GPS Time of Week
val = itow2utc(val) # show time in UTC format
# if it's an ACK, we show what it's acknowledging in plain text
# if it's a CFG-MSG, we show what message class/id it refers to in plain text
if self._ubxClass == b"\x05" or (
self._ubxClass == b"\x06" and self._ubxID == b"\x01"
):
if att in ["clsID", "msgClass"]:
clsid = val2bytes(val, ubt.U1)
val = ubt.UBX_CLASSES.get(clsid, clsid)
if att == "msgID" and clsid:
msgid = val2bytes(val, ubt.U1)
val = ubt.UBX_MSGIDS.get(clsid + msgid, clsid + msgid)
stg += att + "=" + str(val)
if i < len(self.__dict__) - 1:
stg += ", "
stg += ")>"
return stg
def __repr__(self) -> str:
"""
Machine readable representation.
eval(repr(obj)) = obj
:return: machine readable representation
:rtype: str
"""
if self._payload is None:
return f"UBXMessage({self._ubxClass}, {self._ubxID}, {self._mode})"
return f"UBXMessage({self._ubxClass}, {self._ubxID}, {self._mode}, payload={self._payload})"
def __setattr__(self, name, value):
"""
Override setattr to make object immutable after instantiation.
:param str name: attribute name
:param object value: attribute value
:raises: UBXMessageError
"""
if self._immutable:
raise ube.UBXMessageError(
f"Object is immutable. Updates to {name} not permitted after initialisation."
)
super().__setattr__(name, value)
[docs]
def serialize(self) -> bytes:
"""
Serialize message.
:return: serialized output
:rtype: bytes
"""
output = ubt.UBX_HDR + self._ubxClass + self._ubxID + self._length
output += (
self._checksum if self._payload is None else self._payload + self._checksum
)
return output
@property
def identity(self) -> str:
"""
Returns message identity in plain text form.
If the message is unrecognised, the message is parsed
to a nominal payload definition UBX-NOMINAL and
the term 'NOMINAL' is appended to the identity.
:return: message identity e.g. 'CFG-MSG'
:rtype: str
"""
try:
# all MGA messages except MGA-DBD need to be identified by the
# 'type' attribute - the first byte of the payload
if self._ubxClass == b"\x13" and self._ubxID != b"\x80":
umsg_name = ubt.UBX_MSGIDS[
self._ubxClass + self._ubxID + self._payload[0:1]
]
else:
umsg_name = ubt.UBX_MSGIDS[self._ubxClass + self._ubxID]
except KeyError:
# unrecognised u-blox message, parsed to UBX-NOMINAL definition
if self._ubxClass in ubt.UBX_CLASSES: # known class
cls = ubt.UBX_CLASSES[self._ubxClass]
else: # unknown class
cls = "UNKNOWN"
umsg_name = (
f"{cls}-{int.from_bytes(self._ubxClass, 'little'):02x}"
+ f"{int.from_bytes(self._ubxID, 'little'):02x}-NOMINAL"
)
return umsg_name
@property
def msg_cls(self) -> bytes:
"""
Class id getter.
:return: message class as bytes
:rtype: bytes
"""
return self._ubxClass
@property
def msg_id(self) -> bytes:
"""
Message id getter.
:return: message id as bytes
:rtype: bytes
"""
return self._ubxID
@property
def length(self) -> int:
"""
Payload length getter.
:return: payload length as integer
:rtype: int
"""
return bytes2val(self._length, ubt.U2)
@property
def payload(self) -> bytes:
"""
Payload getter - returns the raw payload bytes.
:return: raw payload as bytes
:rtype: bytes
"""
return self._payload
@property
def msgmode(self) -> int:
"""
Message mode getter.
:return: msgmode as integer
:rtype: int
"""
return self._mode
[docs]
@staticmethod
def config_set(layers: int, transaction: int, cfgData: list) -> object:
"""
Construct CFG-VALSET message from an array of
configuration database (key, value) tuples. Keys
can be in int (keyID) or str (keyname) format.
:param int layers: memory layer(s) (1=RAM, 2=BBR, 4=Flash)
:param int transaction: 0=no txn, 1=start txn, 2=continue txn, 3=apply txn
:param list cfgData: list of up to 64 tuples (key, value)
:return: UBXMessage CFG-VALSET
:rtype: UBXMessage
:raises: UBXMessageError
"""
num = len(cfgData)
if num > 64:
raise ube.UBXMessageError(
f"Number of configuration tuples {num} exceeds maximum of 64"
)
version = val2bytes(0 if transaction == 0 else 1, ubt.U1)
layers = val2bytes(layers, ubt.U1)
transaction = val2bytes(transaction, ubt.U1)
payload = version + layers + transaction + b"\x00"
lis = b""
for cfgItem in cfgData:
att = ""
(key, val) = cfgItem
if isinstance(key, str): # if key is a string (keyname)
(key, att) = cfgname2key(key) # lookup keyID & attribute type
else:
(_, att) = cfgkey2name(key) # lookup attribute type
keyb = val2bytes(key, ubt.U4)
valb = val2bytes(val, att)
lis = lis + keyb + valb
return UBXMessage("CFG", "CFG-VALSET", ubt.SET, payload=payload + lis)
[docs]
@staticmethod
def config_del(layers: int, transaction: int, keys: list) -> object:
"""
Construct CFG-VALDEL message from an array of
configuration database keys, which can be in int (keyID)
or str (keyname) format.
:param int layers: memory layer(s) (2=BBR, 4=Flash)
:param int transaction: 0=no txn, 1=start txn, 2=continue txn, 3=apply txn
:param list keys: array of up to 64 keys as int (keyID) or string (keyname)
:return: UBXMessage CFG-VALDEL
:rtype: UBXMessage
:raises: UBXMessageError
"""
num = len(keys)
if num > 64:
raise ube.UBXMessageError(
f"Number of configuration keys {num} exceeds maximum of 64"
)
version = val2bytes(0 if transaction == 0 else 1, ubt.U1)
layers = val2bytes(layers, ubt.U1)
transaction = val2bytes(transaction, ubt.U1)
payload = version + layers + transaction + b"\x00"
lis = b""
for key in keys:
if isinstance(key, str): # if keyname as a string
(key, _) = cfgname2key(key) # lookup keyID
keyb = val2bytes(key, ubt.U4)
lis = lis + keyb
return UBXMessage("CFG", "CFG-VALDEL", ubt.SET, payload=payload + lis)
[docs]
@staticmethod
def config_poll(layer: int, position: int, keys: list) -> object:
"""
Construct CFG-VALGET message from an array of
configuration database keys, which can be in int (keyID)
or str (keyname) format.
:param int layer: memory layer (0=RAM, 1=BBR, 2=Flash, 7 = Default)
:param int position: number of keys to skip before returning result
:param list keys: array of up to 64 keys as int (keyID) or str (keyname)
:return: UBXMessage CFG-VALGET
:rtype: UBXMessage
:raises: UBXMessageError
"""
num = len(keys)
if num > 64:
raise ube.UBXMessageError(
f"Number of configuration keys {num} exceeds maximum of 64"
)
version = val2bytes(0, ubt.U1)
layer = val2bytes(layer, ubt.U1)
position = val2bytes(position, ubt.U2)
payload = version + layer + position
lis = b""
for key in keys:
if isinstance(key, str): # if keyname as a string
(key, _) = cfgname2key(key) # lookup keyID
keyb = val2bytes(key, ubt.U4)
lis = lis + keyb
return UBXMessage("CFG", "CFG-VALGET", ubt.POLL, payload=payload + lis)