Source code for pyubx2.ubxhelpers

Collection of UBX helper methods which can be used
outside the UBXMessage or UBXReader classes

Created on 15 Dec 2020

:author: semuadmin
:copyright: SEMU Consulting © 2020
:license: BSD 3-Clause

import struct
from datetime import datetime, timedelta
from math import cos, pi, sin, trunc

from pynmeagps.nmeatypes_core import NMEA_HDR

import pyubx2.exceptions as ube
import pyubx2.ubxtypes_configdb as ubcdb
import pyubx2.ubxtypes_core as ubt
from pyubx2.ubxtypes_core import POLL, SET, UBX_HDR
from pyubx2.ubxtypes_decodes import FIXTYPE, GNSSLIST

EPOCH0 = datetime(1980, 1, 6)  # EPOCH start date
LEAPOFFSET = 18  # leap year offset in seconds, valid as from 1/1/2017
SIW = 604800  # seconds in week = 3600*24*7

[docs] def att2idx(att: str) -> int: """ Get integer index corresponding to grouped attribute. e.g. svid_06 -> 6; gnssId_103 -> 103 :param str att: grouped attribute name e.g. svid_01 :return: index as integer, or 0 if not grouped :rtype: int """ try: return int(att[att.rindex("_") - len(att) + 1 :]) except ValueError: return 0
[docs] def att2name(att: str) -> str: """ Get name of grouped attribute. e.g. svid_06 -> svid; gnssId_103 -> gnssId :param str att: grouped attribute name e.g. svid_01 :return: name without index e.g. DF406 :rtype: str """ try: return att[: att.rindex("_")] except ValueError: return att
[docs] def calc_checksum(content: bytes) -> bytes: """ Calculate checksum using 8-bit Fletcher's algorithm. :param bytes content: message content, excluding header and checksum bytes :return: checksum :rtype: bytes """ check_a = 0 check_b = 0 for char in content: check_a += char check_a &= 0xFF check_b += check_a check_b &= 0xFF return bytes((check_a, check_b))
[docs] def isvalid_checksum(message: bytes) -> bool: """ Validate message checksum. :param bytes message: message including header and checksum bytes :return: checksum valid flag :rtype: bool """ lenm = len(message) ckm = message[lenm - 2 : lenm] return ckm == calc_checksum(message[2 : lenm - 2])
[docs] def atttyp(att: str) -> str: """ Helper function to return attribute type as string. :param str: attribute type e.g. 'U002' :return: type of attribute as string e.g. 'U' :rtype: str """ return att[0:1]
[docs] def attsiz(att: str) -> int: """ Helper function to return attribute size in bytes. :param str: attribute type e.g. 'U002' :return: size of attribute in bytes :rtype: int """ return int(att[1:4])
[docs] def itow2utc(itow: int) -> datetime.time: """ Convert GPS Time Of Week to UTC time :param int itow: GPS Time Of Week in milliseconds :return: UTC time :rtype: datetime.time """ utc = EPOCH0 + timedelta(seconds=(itow / 1000) - LEAPOFFSET) return utc.time()
[docs] def utc2itow(utc: datetime) -> tuple: """ Convert UTC datetime to GPS Week Number, Time Of Week :param datetime utc: datetime :return: GPS Week Number, Time of Week in milliseconds :rtype: tuple """ wno = int((utc - EPOCH0).total_seconds() / SIW) sow = EPOCH0 + timedelta(seconds=wno * SIW) itow = int(((utc - sow).total_seconds() + LEAPOFFSET) * 1000) return wno, itow
[docs] def gpsfix2str(fix: int) -> str: """ Convert GPS fix integer to descriptive string. :param int fix: GPS fix type (0-5) :return: GPS fix type as string :rtype: str """ try: return FIXTYPE[fix] except KeyError: return str(fix)
[docs] def dop2str(dop: float) -> str: """ Convert Dilution of Precision float to descriptive string. :param float dop: dilution of precision as float :return: dilution of precision as string :rtype: str """ if dop == 1: dops = "Ideal" elif dop <= 2: dops = "Excellent" elif dop <= 5: dops = "Good" elif dop <= 10: dops = "Moderate" elif dop <= 20: dops = "Fair" else: dops = "Poor" return dops
[docs] def gnss2str(gnss_id: int) -> str: """ Convert GNSS ID to descriptive string ('GPS', 'GLONASS', etc.). :param int gnss_id: GNSS identifier as integer (0-6) :return: GNSS identifier as string :rtype: str """ try: return GNSSLIST[gnss_id] except KeyError: return str(gnss_id)
[docs] def key_from_val(dictionary: dict, value) -> str: """ Helper method - get dictionary key corresponding to (unique) value. :param dict dictionary: dictionary :param object value: unique dictionary value :return: dictionary key :rtype: str :raises: KeyError: if no key found for value """ val = None for key, val in dictionary.items(): if val == value: return key raise KeyError(f"No key found for value {value}")
[docs] def get_bits(bitfield: bytes, bitmask: int) -> int: """ Get integer value of specified (masked) bit(s) in a UBX bitfield (attribute type 'X') e.g. to get value of bits 6,7 in bitfield b'\\\\x89' (binary 0b10001001):: get_bits(b'\\x89', 0b11000000) = get_bits(b'\\x89', 192) = 2 :param bytes bitfield: bitfield byte(s) :param int bitmask: bitmask as integer (= Σ(2**n), where n is the number of the bit) :return: value of masked bit(s) :rtype: int """ i = 0 val = int(bitfield.hex(), 16) while bitmask & 1 == 0: bitmask = bitmask >> 1 i += 1 return val >> i & bitmask
[docs] def val2bytes(val, att: str) -> bytes: """ Convert value to bytes for given UBX attribute type. :param object val: attribute value e.g. 25 :param str att: attribute type e.g. 'U004' :return: attribute value as bytes :rtype: bytes :raises: UBXTypeError """ if att == ubt.CH: # single variable-length string (e.g. INF-NOTICE) return val.encode("utf-8", "backslashreplace") atts = attsiz(att) if atttyp(att) in ("C", "X"): # byte or char valb = val elif atttyp(att) in ("E", "L", "U"): # unsigned integer valb = val.to_bytes(atts, byteorder="little", signed=False) elif atttyp(att) == "A": # array of unsigned integers atts = attsiz(att) valb = b"" for i in range(atts): valb += val[i].to_bytes(1, byteorder="little", signed=False) elif atttyp(att) == "I": # signed integer valb = val.to_bytes(atts, byteorder="little", signed=True) elif att == ubt.R4: # single precision floating point valb = struct.pack("<f", val) elif att == ubt.R8: # double precision floating point valb = struct.pack("<d", val) else: raise ube.UBXTypeError(f"Unknown attribute type {att}") return valb
[docs] def bytes2val(valb: bytes, att: str) -> object: """ Convert bytes to value for given UBX attribute type. :param bytes valb: attribute value in byte format e.g. b'\\\\x19\\\\x00\\\\x00\\\\x00' :param str att: attribute type e.g. 'U004' :return: attribute value as int, float, str or bytes :rtype: object :raises: UBXTypeError """ if att == ubt.CH: # single variable-length string (e.g. INF-NOTICE) val = valb.decode("utf-8", "backslashreplace") elif atttyp(att) in ("X", "C"): val = valb elif atttyp(att) in ("E", "L", "U"): # unsigned integer val = int.from_bytes(valb, "little", signed=False) elif atttyp(att) == "A": # array of unsigned integers atts = attsiz(att) val = [] for i in range(atts): val.append(valb[i]) elif atttyp(att) == "I": # signed integer val = int.from_bytes(valb, "little", signed=True) elif att == ubt.R4: # single precision floating point val = struct.unpack("<f", valb)[0] elif att == ubt.R8: # double precision floating point val = struct.unpack("<d", valb)[0] else: raise ube.UBXTypeError(f"Unknown attribute type {att}") return val
[docs] def nomval(att: str) -> object: """ Get nominal value for given UBX attribute type. :param str att: attribute type e.g. 'U004' :return: attribute value as int, float, str or bytes :rtype: object :raises: UBXTypeError """ if att == "CH": val = "" elif atttyp(att) in ("X", "C"): val = b"\x00" * attsiz(att) elif atttyp(att) == "R": val = 0.0 elif atttyp(att) in ("E", "I", "L", "U"): val = 0 elif atttyp(att) == "A": # array of unsigned integers val = [0] * attsiz(att) else: raise ube.UBXTypeError(f"Unknown attribute type {att}") return val
[docs] def msgclass2bytes(msgclass: int, msgid: int) -> bytes: """ Convert message class/id integers to bytes. :param int msgClass: message class as integer e.g. 6 :param int msgID: message ID as integer e.g. 1 :return: message class as bytes e.g. b'/x06/x01' :rtype: bytes """ msgclass = val2bytes(msgclass, ubt.U1) msgid = val2bytes(msgid, ubt.U1) return (msgclass, msgid)
[docs] def msgstr2bytes(msgclass: str, msgid: str) -> bytes: """ Convert plain text UBX message class to bytes. :param str msgClass: message class as str e.g. 'CFG' :param str msgID: message ID as str e.g. 'CFG-MSG' :return: message class as bytes e.g. b'/x06/x01' :rtype: bytes :raises: UBXMessageError """ try: clsid = key_from_val(ubt.UBX_CLASSES, msgclass) msgid = key_from_val(ubt.UBX_MSGIDS, msgid)[1:2] return (clsid, msgid) except KeyError as err: raise ube.UBXMessageError( f"Undefined message, class {msgclass}, id {msgid}" ) from err
[docs] def cfgname2key(name: str) -> tuple: """ Return hexadecimal key and data type for given configuration database key name. :param str name: config key as string e.g. "CFG_NMEA_PROTVER" :return: tuple of (key, type) :rtype: tuple: (int, str) :raises: UBXMessageError """ try: return ubcdb.UBX_CONFIG_DATABASE[name] except KeyError as err: raise ube.UBXMessageError( f"Undefined configuration database key {name}" ) from err
[docs] def cfgkey2name(keyid: int) -> tuple: """ Return key name and data type for given configuration database hexadecimal key. :param int keyID: config key as integer e.g. 0x20930001 :return: tuple of (keyname, type) :rtype: tuple: (str, str) :raises: UBXMessageError """ try: val = None for key, val in ubcdb.UBX_CONFIG_DATABASE.items(): (kid, typ) = val if keyid == kid: return (key, typ) # undocumented configuration database key # type is derived from keyID key = f"CFG_{hex(keyid)}" typ = f"X{ubcdb.UBX_CONFIG_STORSIZE[int(hex(keyid)[2:3])]:03d}" return (key, typ) except KeyError as err: raise ube.UBXMessageError( f"Invalid configuration database key {hex(keyid)}" ) from err
[docs] def protocol(raw: bytes) -> int: """ Gets protocol of raw message. :param bytes raw: raw (binary) message :return: protocol type (1 = NMEA, 2 = UBX, 4 = RTCM3, 0 = unknown) :rtype: int """ p = raw[0:2] if p == UBX_HDR: return 2 if p in NMEA_HDR: return 1 if p[0] == 0xD3 and (p[1] & ~0x03) == 0: return 4 return 0
[docs] def hextable(raw: bytes, cols: int = 8) -> str: """ Formats raw (binary) message in tabular hexadecimal format e.g. 000: 2447 4e47 5341 2c41 2c33 2c33 342c 3233 | b'$GNGSA,A,3,34,23' | :param bytes raw: raw (binary) data :param int cols: number of columns in hex table (8) :return: table of hex data :rtype: str """ hextbl = "" colw = cols * 4 rawh = raw.hex() for i in range(0, len(rawh), colw): rawl = rawh[i : i + colw].ljust(colw, " ") hextbl += f"{int(i/2):03}: " for col in range(0, colw, 4): hextbl += f"{rawl[col : col + 4]} " hextbl += f" | {bytes.fromhex(rawl)} |\n" return hextbl
[docs] def cel2cart(elevation: float, azimuth: float) -> tuple: """ Convert celestial coordinates (degrees) to Cartesian coordinates. :param float elevation: elevation :param float azimuth: azimuth :return: cartesian x,y coordinates :rtype: tuple """ if not (isinstance(elevation, (float, int)) and isinstance(azimuth, (float, int))): return (0, 0) ele, azi = [c * pi / 180 for c in (elevation, azimuth)] x = cos(azi) * cos(ele) y = sin(azi) * cos(ele) return (x, y)
[docs] def escapeall(val: bytes) -> str: """ Escape all byte characters e.g. b'\\\\x73' rather than b`s` :param bytes val: bytes :return: string of escaped bytes :rtype: str """ return "b'{}'".format("".join(f"\\x{b:02x}" for b in val))
[docs] def val2sphp(val: float, scale: float = 1e-7) -> tuple: """ Convert a float value into separate standard and high precisions components, multiplied by a scaling factor to render them as integers, as required by some CFG and NAV messages. e.g. 48.123456789 becomes (481234567, 89) :param float val: value as float :param float scale: scaling factor e.g. 1e-7 :return: tuple of (standard precision, high precision) :rtype: tuple """ val = val / scale val_sp = trunc(val) val_hp = round((val - val_sp) * 100) return val_sp, val_hp
[docs] def getinputmode(data: bytes) -> int: """ Return input message mode (SET or POLL). :param bytes data: raw UBX input message :return: message mode (1 = SET, 2 = POLL) :rtype: int """ if ( len(data) == 8 or data[2:4] == b"\x06\x8b" # CFG-VALGET or ( data[2:4] in ( b"\x06\x01", b"\x06\x02", b"\x06\x03", b"\x06\x31", ) # CFG-INF, CFG-MSG, CFG-PRT, CFG-TP5 and len(data) <= 10 ) ): return POLL return SET