"""
Collection of UBX helper methods which can be used
outside the UBXMessage or UBXReader classes
Created on 15 Dec 2020
:author: semuadmin
:copyright: SEMU Consulting © 2020
:license: BSD 3-Clause
"""
import struct
from datetime import datetime, timedelta
from math import cos, pi, sin, trunc
from pynmeagps.nmeatypes_core import NMEA_HDR
import pyubx2.exceptions as ube
import pyubx2.ubxtypes_configdb as ubcdb
import pyubx2.ubxtypes_core as ubt
from pyubx2.ubxtypes_core import POLL, SET, UBX_HDR
from pyubx2.ubxtypes_decodes import FIXTYPE, GNSSLIST
EPOCH0 = datetime(1980, 1, 6) # EPOCH start date
LEAPOFFSET = 18 # leap year offset in seconds, valid as from 1/1/2017
SIW = 604800 # seconds in week = 3600*24*7
[docs]
def att2idx(att: str) -> int:
"""
Get integer index corresponding to grouped attribute.
e.g. svid_06 -> 6; gnssId_103 -> 103
:param str att: grouped attribute name e.g. svid_01
:return: index as integer, or 0 if not grouped
:rtype: int
"""
try:
return int(att[att.rindex("_") - len(att) + 1 :])
except ValueError:
return 0
[docs]
def att2name(att: str) -> str:
"""
Get name of grouped attribute.
e.g. svid_06 -> svid; gnssId_103 -> gnssId
:param str att: grouped attribute name e.g. svid_01
:return: name without index e.g. DF406
:rtype: str
"""
try:
return att[: att.rindex("_")]
except ValueError:
return att
[docs]
def calc_checksum(content: bytes) -> bytes:
"""
Calculate checksum using 8-bit Fletcher's algorithm.
:param bytes content: message content, excluding header and checksum bytes
:return: checksum
:rtype: bytes
"""
check_a = 0
check_b = 0
for char in content:
check_a += char
check_a &= 0xFF
check_b += check_a
check_b &= 0xFF
return bytes((check_a, check_b))
[docs]
def isvalid_checksum(message: bytes) -> bool:
"""
Validate message checksum.
:param bytes message: message including header and checksum bytes
:return: checksum valid flag
:rtype: bool
"""
lenm = len(message)
ckm = message[lenm - 2 : lenm]
return ckm == calc_checksum(message[2 : lenm - 2])
[docs]
def atttyp(att: str) -> str:
"""
Helper function to return attribute type as string.
:param str: attribute type e.g. 'U002'
:return: type of attribute as string e.g. 'U'
:rtype: str
"""
return att[0:1]
[docs]
def attsiz(att: str) -> int:
"""
Helper function to return attribute size in bytes.
:param str: attribute type e.g. 'U002'
:return: size of attribute in bytes
:rtype: int
"""
return int(att[1:4])
[docs]
def itow2utc(itow: int) -> datetime.time:
"""
Convert GPS Time Of Week to UTC time
:param int itow: GPS Time Of Week in milliseconds
:return: UTC time hh.mm.ss
:rtype: datetime.time
"""
utc = EPOCH0 + timedelta(seconds=(itow / 1000) - LEAPOFFSET)
return utc.time()
[docs]
def utc2itow(utc: datetime) -> tuple:
"""
Convert UTC datetime to GPS Week Number, Time Of Week
:param datetime utc: datetime
:return: GPS Week Number, Time of Week in milliseconds
:rtype: tuple
"""
wno = int((utc - EPOCH0).total_seconds() / SIW)
sow = EPOCH0 + timedelta(seconds=wno * SIW)
itow = int(((utc - sow).total_seconds() + LEAPOFFSET) * 1000)
return wno, itow
[docs]
def gpsfix2str(fix: int) -> str:
"""
Convert GPS fix integer to descriptive string.
:param int fix: GPS fix type (0-5)
:return: GPS fix type as string
:rtype: str
"""
try:
return FIXTYPE[fix]
except KeyError:
return str(fix)
[docs]
def dop2str(dop: float) -> str:
"""
Convert Dilution of Precision float to descriptive string.
:param float dop: dilution of precision as float
:return: dilution of precision as string
:rtype: str
"""
if dop == 1:
dops = "Ideal"
elif dop <= 2:
dops = "Excellent"
elif dop <= 5:
dops = "Good"
elif dop <= 10:
dops = "Moderate"
elif dop <= 20:
dops = "Fair"
else:
dops = "Poor"
return dops
[docs]
def gnss2str(gnss_id: int) -> str:
"""
Convert GNSS ID to descriptive string
('GPS', 'GLONASS', etc.).
:param int gnss_id: GNSS identifier as integer (0-6)
:return: GNSS identifier as string
:rtype: str
"""
try:
return GNSSLIST[gnss_id]
except KeyError:
return str(gnss_id)
[docs]
def key_from_val(dictionary: dict, value) -> str:
"""
Helper method - get dictionary key corresponding to (unique) value.
:param dict dictionary: dictionary
:param object value: unique dictionary value
:return: dictionary key
:rtype: str
:raises: KeyError: if no key found for value
"""
val = None
for key, val in dictionary.items():
if val == value:
return key
raise KeyError(f"No key found for value {value}")
[docs]
def get_bits(bitfield: bytes, bitmask: int) -> int:
"""
Get integer value of specified (masked) bit(s) in a UBX bitfield (attribute type 'X')
e.g. to get value of bits 6,7 in bitfield b'\\\\x89' (binary 0b10001001)::
get_bits(b'\\x89', 0b11000000) = get_bits(b'\\x89', 192) = 2
:param bytes bitfield: bitfield byte(s)
:param int bitmask: bitmask as integer (= Σ(2**n), where n is the number of the bit)
:return: value of masked bit(s)
:rtype: int
"""
i = 0
val = int(bitfield.hex(), 16)
while bitmask & 1 == 0:
bitmask = bitmask >> 1
i += 1
return val >> i & bitmask
[docs]
def val2bytes(val, att: str) -> bytes:
"""
Convert value to bytes for given UBX attribute type.
:param object val: attribute value e.g. 25
:param str att: attribute type e.g. 'U004'
:return: attribute value as bytes
:rtype: bytes
:raises: UBXTypeError
"""
if att == ubt.CH: # single variable-length string (e.g. INF-NOTICE)
return val.encode("utf-8", "backslashreplace")
atts = attsiz(att)
if atttyp(att) in ("C", "X"): # byte or char
valb = val
elif atttyp(att) in ("E", "L", "U"): # unsigned integer
valb = val.to_bytes(atts, byteorder="little", signed=False)
elif atttyp(att) == "A": # array of unsigned integers
atts = attsiz(att)
valb = b""
for i in range(atts):
valb += val[i].to_bytes(1, byteorder="little", signed=False)
elif atttyp(att) == "I": # signed integer
valb = val.to_bytes(atts, byteorder="little", signed=True)
elif att == ubt.R4: # single precision floating point
valb = struct.pack("<f", val)
elif att == ubt.R8: # double precision floating point
valb = struct.pack("<d", val)
else:
raise ube.UBXTypeError(f"Unknown attribute type {att}")
return valb
[docs]
def bytes2val(valb: bytes, att: str) -> object:
"""
Convert bytes to value for given UBX attribute type.
:param bytes valb: attribute value in byte format e.g. b'\\\\x19\\\\x00\\\\x00\\\\x00'
:param str att: attribute type e.g. 'U004'
:return: attribute value as int, float, str or bytes
:rtype: object
:raises: UBXTypeError
"""
if att == ubt.CH: # single variable-length string (e.g. INF-NOTICE)
val = valb.decode("utf-8", "backslashreplace")
elif atttyp(att) in ("X", "C"):
val = valb
elif atttyp(att) in ("E", "L", "U"): # unsigned integer
val = int.from_bytes(valb, "little", signed=False)
elif atttyp(att) == "A": # array of unsigned integers
atts = attsiz(att)
val = []
for i in range(atts):
val.append(valb[i])
elif atttyp(att) == "I": # signed integer
val = int.from_bytes(valb, "little", signed=True)
elif att == ubt.R4: # single precision floating point
val = struct.unpack("<f", valb)[0]
elif att == ubt.R8: # double precision floating point
val = struct.unpack("<d", valb)[0]
else:
raise ube.UBXTypeError(f"Unknown attribute type {att}")
return val
[docs]
def nomval(att: str) -> object:
"""
Get nominal value for given UBX attribute type.
:param str att: attribute type e.g. 'U004'
:return: attribute value as int, float, str or bytes
:rtype: object
:raises: UBXTypeError
"""
if att == "CH":
val = ""
elif atttyp(att) in ("X", "C"):
val = b"\x00" * attsiz(att)
elif atttyp(att) == "R":
val = 0.0
elif atttyp(att) in ("E", "I", "L", "U"):
val = 0
elif atttyp(att) == "A": # array of unsigned integers
val = [0] * attsiz(att)
else:
raise ube.UBXTypeError(f"Unknown attribute type {att}")
return val
[docs]
def msgclass2bytes(msgclass: int, msgid: int) -> bytes:
"""
Convert message class/id integers to bytes.
:param int msgClass: message class as integer e.g. 6
:param int msgID: message ID as integer e.g. 1
:return: message class as bytes e.g. b'/x06/x01'
:rtype: bytes
"""
msgclass = val2bytes(msgclass, ubt.U1)
msgid = val2bytes(msgid, ubt.U1)
return (msgclass, msgid)
[docs]
def msgstr2bytes(msgclass: str, msgid: str) -> bytes:
"""
Convert plain text UBX message class to bytes.
:param str msgClass: message class as str e.g. 'CFG'
:param str msgID: message ID as str e.g. 'CFG-MSG'
:return: message class as bytes e.g. b'/x06/x01'
:rtype: bytes
:raises: UBXMessageError
"""
try:
clsid = key_from_val(ubt.UBX_CLASSES, msgclass)
msgid = key_from_val(ubt.UBX_MSGIDS, msgid)[1:2]
return (clsid, msgid)
except KeyError as err:
raise ube.UBXMessageError(
f"Undefined message, class {msgclass}, id {msgid}"
) from err
[docs]
def cfgname2key(name: str) -> tuple:
"""
Return hexadecimal key and data type for given
configuration database key name.
:param str name: config key as string e.g. "CFG_NMEA_PROTVER"
:return: tuple of (key, type)
:rtype: tuple: (int, str)
:raises: UBXMessageError
"""
try:
return ubcdb.UBX_CONFIG_DATABASE[name]
except KeyError as err:
raise ube.UBXMessageError(
f"Undefined configuration database key {name}"
) from err
[docs]
def cfgkey2name(keyid: int) -> tuple:
"""
Return key name and data type for given
configuration database hexadecimal key.
:param int keyID: config key as integer e.g. 0x20930001
:return: tuple of (keyname, type)
:rtype: tuple: (str, str)
:raises: UBXMessageError
"""
try:
val = None
for key, val in ubcdb.UBX_CONFIG_DATABASE.items():
(kid, typ) = val
if keyid == kid:
return (key, typ)
# undocumented configuration database key
# type is derived from keyID
key = f"CFG_{hex(keyid)}"
typ = f"X{ubcdb.UBX_CONFIG_STORSIZE[int(hex(keyid)[2:3])]:03d}"
return (key, typ)
except KeyError as err:
raise ube.UBXMessageError(
f"Invalid configuration database key {hex(keyid)}"
) from err
[docs]
def protocol(raw: bytes) -> int:
"""
Gets protocol of raw message.
:param bytes raw: raw (binary) message
:return: protocol type (1 = NMEA, 2 = UBX, 4 = RTCM3, 0 = unknown)
:rtype: int
"""
p = raw[0:2]
if p == UBX_HDR:
return 2
if p in NMEA_HDR:
return 1
if p[0] == 0xD3 and (p[1] & ~0x03) == 0:
return 4
return 0
[docs]
def hextable(raw: bytes, cols: int = 8) -> str:
"""
Formats raw (binary) message in tabular hexadecimal format e.g.
000: 2447 4e47 5341 2c41 2c33 2c33 342c 3233 | b'$GNGSA,A,3,34,23' |
:param bytes raw: raw (binary) data
:param int cols: number of columns in hex table (8)
:return: table of hex data
:rtype: str
"""
hextbl = ""
colw = cols * 4
rawh = raw.hex()
for i in range(0, len(rawh), colw):
rawl = rawh[i : i + colw].ljust(colw, " ")
hextbl += f"{int(i/2):03}: "
for col in range(0, colw, 4):
hextbl += f"{rawl[col : col + 4]} "
hextbl += f" | {bytes.fromhex(rawl)} |\n"
return hextbl
[docs]
def cel2cart(elevation: float, azimuth: float) -> tuple:
"""
Convert celestial coordinates (degrees) to Cartesian coordinates.
:param float elevation: elevation
:param float azimuth: azimuth
:return: cartesian x,y coordinates
:rtype: tuple
"""
if not (isinstance(elevation, (float, int)) and isinstance(azimuth, (float, int))):
return (0, 0)
ele, azi = [c * pi / 180 for c in (elevation, azimuth)]
x = cos(azi) * cos(ele)
y = sin(azi) * cos(ele)
return (x, y)
[docs]
def escapeall(val: bytes) -> str:
"""
Escape all byte characters e.g. b'\\\\x73' rather than b`s`
:param bytes val: bytes
:return: string of escaped bytes
:rtype: str
"""
return "b'{}'".format("".join(f"\\x{b:02x}" for b in val))
[docs]
def val2sphp(val: float, scale: float = 1e-7) -> tuple:
"""
Convert a float value into separate standard and high precisions components,
multiplied by a scaling factor to render them as integers, as required by some
CFG and NAV messages.
e.g. 48.123456789 becomes (481234567, 89)
:param float val: value as float
:param float scale: scaling factor e.g. 1e-7
:return: tuple of (standard precision, high precision)
:rtype: tuple
"""
val = val / scale
val_sp = trunc(val)
val_hp = round((val - val_sp) * 100)
return val_sp, val_hp