Source code for pygpsclient.ubx_handler

"""
ubx_handler.py

UBX Protocol handler - handles all incoming UBX messages

Parses individual UBX messages (using pyubx2 library)
and adds selected attribute values to the app.gnss_status
data dictionary. This dictionary is then used to periodically
update the various user-selectable widget panels.

Created on 30 Sep 2020

:author: semuadmin
:copyright: 2020 SEMU Consulting
:license: BSD 3-Clause
"""

import logging

from pyubx2 import UBXMessage, itow2utc

from pygpsclient.globals import GLONASS_NMEA, UTF8
from pygpsclient.helpers import corrage2int, fix2desc, ned2vector, svid2gnssid
from pygpsclient.strings import DLGTSPARTN, DLGTUBX, NA
from pygpsclient.widget_state import VISIBLE, WDGSPECTRUM, WDGSYSMON


[docs] class UBXHandler: """ UBXHandler class """
[docs] def __init__(self, app): """ Constructor. :param Frame app: reference to main tkinter application """ self.__app = app # Reference to main application class self.__master = self.__app.appmaster # Reference to root class (Tk) self.logger = logging.getLogger(__name__) self._cdb = 0 self._raw_data = None self._parsed_data = None # Holds array of current satellites in view from NMEA GSV or UBX NAV-SVINFO sentences self.gsv_data = {}
[docs] def process_data(self, raw_data: bytes, parsed_data: object): """ Process relevant UBX message types :param bytes raw_data: raw data :param UBXMessage parsed_data: parsed data """ if raw_data is None: return # self.logger.debug(f"data received {parsed_data.identity}") if parsed_data.identity[0:3] in ("ACK", "CFG"): self._process_ACK(parsed_data) elif parsed_data.identity == "ESF-ALG": self._process_ESF_ALG(parsed_data) elif parsed_data.identity == "HNR-ATT": self._process_HNR_ATT(parsed_data) elif parsed_data.identity == "HNR-PVT": self._process_HNR_PVT(parsed_data) elif parsed_data.identity == "NAV-ATT": self._process_NAV_ATT(parsed_data) elif parsed_data.identity in ("NAV-DOP", "NAV2-DOP"): self._process_NAV_DOP(parsed_data) elif parsed_data.identity in ("NAV-POSLLH", "NAV-HPPOSLLH"): self._process_NAV_POSLLH(parsed_data) elif parsed_data.identity in ("NAV-PVT", "NAV2-PVT"): self._process_NAV_PVT(parsed_data) elif parsed_data.identity == "NAV-PVAT": self._process_NAV_PVAT(parsed_data) elif parsed_data.identity == "NAV-RELPOSNED": self._process_NAV_RELPOSNED(parsed_data) elif parsed_data.identity in ("NAV-SAT", "NAV2-SAT"): self._process_NAV_SAT(parsed_data) elif parsed_data.identity in ("NAV-STATUS", "NAV2-STATUS"): self._process_NAV_STATUS(parsed_data) elif parsed_data.identity == "NAV-SVIN": self._process_NAV_SVIN(parsed_data) elif parsed_data.identity == "NAV-SVINFO": self._process_NAV_SVINFO(parsed_data) elif parsed_data.identity == "NAV-SOL": self._process_NAV_SOL(parsed_data) elif parsed_data.identity == "NAV-VELNED": self._process_NAV_VELNED(parsed_data) elif parsed_data.identity == "MON-COMMS": self._process_MON_COMMS(parsed_data) elif parsed_data.identity == "MON-SPAN": self._process_MON_SPAN(parsed_data) elif parsed_data.identity == "MON-SYS": self._process_MON_SYS(parsed_data) elif parsed_data.identity == "MON-VER": self._process_MONVER(parsed_data) elif parsed_data.identity == "RXM-RTCM": self._process_RXM_RTCM(parsed_data) elif parsed_data.identity == "RXM-PMP": self._process_RXM_PMP(parsed_data) elif parsed_data.identity == "RXM-SPARTN-KEY": self._process_RXM_SPARTN_KEY(parsed_data)
def _process_ACK(self, msg: UBXMessage): """ Process ACK-ACK & ACK-NAK sentences and CFG poll responses. :param UBXMessage msg: UBX config message """ if self.__app.dialog(DLGTUBX) is not None: self.__app.dialog(DLGTUBX).update_pending(msg) # if SPARTN config dialog is open, send CFG & ACKs there if self.__app.dialog(DLGTSPARTN) is not None: self.__app.dialog(DLGTSPARTN).update_pending(msg) # if Spectrumview or Sysmon widgets are active, send ACKSs there if msg.identity in ("ACK-ACK", "ACK-NAK"): wdgs = self.__app.widget_state.state for wdg in (WDGSYSMON, WDGSPECTRUM): if wdgs[wdg][VISIBLE]: if msg.clsID == 6 and msg.msgID == 1: # CFG-MSG getattr(self.__app, wdgs[wdg]["frm"]).update_pending(msg) def _process_MONVER(self, msg: UBXMessage): """ Process MON-VER sentences. :param UBXMessage msg: UBX MON-VER config message """ exts = [] fw_version = NA rom_version = NA gnss_supported = "" model = "" sw_version = getattr(msg, "swVersion", b"N/A") sw_version = sw_version.replace(b"\x00", b"").decode(UTF8) sw_version = sw_version.replace("ROM CORE", "ROM") sw_version = sw_version.replace("EXT CORE", "Flash") hw_version = getattr(msg, "hwVersion", b"N/A") hw_version = hw_version.replace(b"\x00", b"").decode(UTF8) for i in range(9): ext = getattr(msg, f"extension_{i+1:02d}", b"") ext = ext.replace(b"\x00", b"").decode(UTF8) exts.append(ext) if "FWVER=" in exts[i]: fw_version = exts[i].replace("FWVER=", "") if "PROTVER=" in exts[i]: rom_version = exts[i].replace("PROTVER=", "") if "PROTVER " in exts[i]: rom_version = exts[i].replace("PROTVER ", "") if "MOD=" in exts[i]: model = exts[i].replace("MOD=", "") hw_version = f"{model} {hw_version}" for gnss in ( "GPS", "GLO", "GAL", "BDS", "SBAS", "IMES", "QZSS", "NAVIC", ): if gnss in exts[i]: gnss_supported = gnss_supported + gnss + " " verdata = {} verdata["swversion"] = sw_version verdata["hwversion"] = hw_version verdata["fwversion"] = fw_version verdata["romversion"] = rom_version verdata["gnss"] = gnss_supported self.__app.gnss_status.version_data = verdata if self.__app.dialog(DLGTUBX) is not None: self.__app.dialog(DLGTUBX).update_pending(msg) def _process_MON_SYS(self, data: UBXMessage): """ Process MON-SYS sentences - System Monitor Information. :param UBXMessage data: MON-SYS parsed message """ sysdata = {} sysdata["bootType"] = data.bootType sysdata["cpuLoad"] = data.cpuLoad sysdata["cpuLoadMax"] = data.cpuLoadMax sysdata["memUsage"] = data.memUsage sysdata["memUsageMax"] = data.memUsageMax sysdata["ioUsage"] = data.ioUsage sysdata["ioUsageMax"] = data.ioUsageMax sysdata["runTime"] = data.runTime sysdata["noticeCount"] = data.noticeCount sysdata["warnCount"] = data.warnCount sysdata["errorCount"] = data.errorCount sysdata["tempValue"] = data.tempValue self.__app.gnss_status.sysmon_data = sysdata def _process_MON_COMMS(self, data: UBXMessage): """ Process MON-COMMS sentences - Comms Port Information. :param UBXMessage data: MON-COMMS parsed message """ commsdata = {} for i in range(1, data.nPorts + 1): idx = f"_{i:02}" pid = getattr(data, "portId" + idx) tx = getattr(data, "txUsage" + idx) txmax = getattr(data, "txPeakUsage" + idx) txbytes = getattr(data, "txBytes" + idx) txpending = getattr(data, "txPending" + idx) rx = getattr(data, "rxUsage" + idx) rxmax = getattr(data, "rxPeakUsage" + idx) rxbytes = getattr(data, "rxBytes" + idx) rxpending = getattr(data, "rxPending" + idx) commsdata[pid] = ( tx, txmax, txbytes, txpending, rx, rxmax, rxbytes, rxpending, ) self.__app.gnss_status.comms_data = commsdata def _process_NAV_POSLLH(self, data: UBXMessage): """ Process NAV-(HP)POSLLH sentence - Latitude, Longitude, Height. :param UBXMessage data: NAV-(HP)POSLLH parsed message """ self.__app.gnss_status.utc = itow2utc(data.iTOW) # datetime.time self.__app.gnss_status.lat = data.lat self.__app.gnss_status.lon = data.lon self.__app.gnss_status.alt = data.hMSL / 1000 # meters self.__app.gnss_status.hacc = data.hAcc / 1000 # meters self.__app.gnss_status.vacc = data.vAcc / 1000 # meters self.__app.gnss_status.hae = data.height / 1000 # meters def _process_NAV_PVT(self, data: UBXMessage): """ Process NAV-PVT sentence - Navigation position velocity time solution. :param UBXMessage data: NAV-PVT parsed message """ self.__app.gnss_status.utc = itow2utc(data.iTOW) # datetime.time self.__app.gnss_status.lat = data.lat self.__app.gnss_status.lon = data.lon self.__app.gnss_status.alt = data.hMSL / 1000 # meters self.__app.gnss_status.hacc = data.hAcc / 1000 # meters self.__app.gnss_status.vacc = data.vAcc / 1000 # meters self.__app.gnss_status.pdop = data.pDOP self.__app.gnss_status.sip = data.numSV self.__app.gnss_status.speed = data.gSpeed / 1000 # m/s self.__app.gnss_status.track = data.headMot self.__app.gnss_status.hae = data.height / 1000 # meters self.__app.gnss_status.fix = fix2desc("NAV-PVT", data.fixType) if data.carrSoln > 0: self.__app.gnss_status.fix = fix2desc("NAV-PVT", data.carrSoln + 5) if hasattr(data, "difSoln"): # for pyubx2 <= 1.2.42 self.__app.gnss_status.diff_corr = data.difSoln elif hasattr(data, "diffSoln"): self.__app.gnss_status.diff_corr = data.diffSoln if data.lastCorrectionAge != 0: self.__app.gnss_status.diff_age = corrage2int(data.lastCorrectionAge) def _process_NAV_PVAT(self, data: UBXMessage): """ Process NAV-PVAT sentence - Navigation position velocity attitude time solution. :param UBXMessage data: NAV-PVT parsed message """ self.__app.gnss_status.utc = itow2utc(data.iTOW) # datetime.time self.__app.gnss_status.lat = data.lat self.__app.gnss_status.lon = data.lon self.__app.gnss_status.alt = data.hMSL / 1000 # meters self.__app.gnss_status.speed = data.gSpeed / 1000 # m/s self.__app.gnss_status.sip = data.numSV self.__app.gnss_status.hae = data.height / 1000 # meters ims = self.__app.gnss_status.imu_data ims["source"] = data.identity ims["roll"] = data.vehRoll ims["pitch"] = data.vehPitch ims["yaw"] = data.vehHeading ims["status"] = ( (data.vehRollValid << 3) + (data.vehPitchValid << 2) + data.vehHeadingValid ) def _process_NAV_VELNED(self, data: UBXMessage): """ Process NAV-VELNED sentence - Velocity Solution in North East Down format. :param UBXMessage data: NAV-VELNED parsed message """ self.__app.gnss_status.track = data.heading self.__app.gnss_status.speed = data.gSpeed / 100 # m/s def _process_NAV_SAT(self, data: UBXMessage): """ Process NAV-SAT sentences - Space Vehicle Information. NB: For consistency with NMEA GSV and UBX NAV-SVINFO message types, this uses the NMEA SVID numbering range for GLONASS satellites (65 - 96) rather than the Slot ID (1-24) by default. To change this, set the GLONASS_NMEA flag in globals.py to False. :param UBXMessage data: NAV-SAT parsed message """ show_unused = self.__app.configuration.get("unusedsat_b") self.gsv_data = {} num_siv = int(data.numSvs) for i in range(num_siv): idx = f"_{i+1:02d}" gnssId = getattr(data, "gnssId" + idx) svid = getattr(data, "svId" + idx) # use NMEA GLONASS numbering (65-96) rather than slotID (1-24) if gnssId == 6 and svid < 25 and svid != 255 and GLONASS_NMEA: svid += 64 elev = getattr(data, "elev" + idx) azim = getattr(data, "azim" + idx) cno = getattr(data, "cno" + idx) if cno == 0 and not show_unused: # omit unused sats continue self.gsv_data[f"{gnssId}-{svid}"] = (gnssId, svid, elev, azim, cno) self.__app.gnss_status.siv = len(self.gsv_data) self.__app.gnss_status.gsv_data = self.gsv_data def _process_NAV_STATUS(self, data: UBXMessage): """ Process NAV-STATUS sentences - Status Information. :param UBXMessage data: NAV-STATUS parsed message """ self.__app.gnss_status.diff_corr = data.diffSoln # self.__app.gnss_status.diff_age = "<60" self.__app.gnss_status.fix = fix2desc("NAV-STATUS", data.gpsFix) if data.carrSoln > 0: self.__app.gnss_status.fix = fix2desc("NAV-STATUS", data.carrSoln + 5) def _process_NAV_SVIN(self, data: UBXMessage): """ Process NAV-SVIN sentences - Survey In Status. :param UBXMessage data: NAV-SVIN parsed message """ self.__app.svin_countdown(data.dur, data.valid, data.active) def _process_NAV_SVINFO(self, data: UBXMessage): """ Process NAV-SVINFO sentences - Space Vehicle Information. NB: Since UBX Gen8 this message is deprecated in favour of NAV-SAT :param UBXMessage data: NAV-SVINFO parsed message """ show_unused = self.__app.configuration.get("unusedsat_b") self.gsv_data = {} num_siv = int(data.numCh) for i in range(num_siv): idx = f"_{i+1:02d}" svid = getattr(data, "svid" + idx) gnssId = svid2gnssid(svid) # derive gnssId from svid elev = getattr(data, "elev" + idx) azim = getattr(data, "azim" + idx) cno = getattr(data, "cno" + idx) if cno == 0 and not show_unused: # omit unused sats continue self.gsv_data[f"{gnssId}-{svid}"] = (gnssId, svid, elev, azim, cno) self.__app.gnss_status.gsv_data = self.gsv_data def _process_NAV_SOL(self, data: UBXMessage): """ Process NAV-SOL sentence - Navigation Solution. :param UBXMessage data: NAV-SOL parsed message """ self.__app.gnss_status.pdop = data.pDOP self.__app.gnss_status.sip = data.numSV self.__app.gnss_status.fix = fix2desc("NAV-SOL", data.gpsFix) def _process_NAV_DOP(self, data: UBXMessage): """ Process NAV-DOP sentence - Dilution of Precision. :param UBXMessage data: NAV-DOP parsed message """ self.__app.gnss_status.pdop = data.pDOP self.__app.gnss_status.hdop = data.hDOP self.__app.gnss_status.vdop = data.vDOP def _process_NAV_RELPOSNED(self, data: UBXMessage): """ Process NAV-RELPOSNED sentence - Relative position of rover. Two version of this message: - version 0 has no heading or length attributes, so these must be derived from the NED values - version 1 has heading and length attributes NB: pyubx2 parses relPosHP values as mm, so total relPosN in cm = relPosN + (relPosHPN * 1e-1), etc. :param UBXMessage data: NAV-RELPOSNED parsed message """ if data.version == 0x00: n = data.relPosN e = data.relPosE d = data.relPosD relPosLength, relPosHeading = ned2vector(n, e, d) n = data.accN * 1e-2 e = data.accE * 1e-2 d = data.accD * 1e-2 accLength, _ = ned2vector(n, e, d) accHeading = accLength * relPosHeading / relPosLength # ballpark else: relPosLength, relPosHeading, accLength, accHeading = ( data.relPosLength, data.relPosHeading, data.accLength * 1e-2, data.accHeading, ) self.__app.gnss_status.rel_pos_heading = relPosHeading self.__app.gnss_status.rel_pos_length = relPosLength self.__app.gnss_status.acc_heading = accHeading self.__app.gnss_status.acc_length = accLength self.__app.gnss_status.rel_pos_flags = [ data.gnssFixOK, data.diffSoln, data.relPosValid, data.carrSoln, data.isMoving, data.refPosMiss, data.refObsMiss, data.relPosHeadingValid, data.relPosNormalized, ] def _process_HNR_PVT(self, data: UBXMessage): """ Process HNR-PVT sentence - High Rate Navigation position velocity time solution. :param UBXMessage data: HNR-PVT parsed message """ self.__app.gnss_status.utc = itow2utc(data.iTOW) # datetime.time self.__app.gnss_status.lat = data.lat self.__app.gnss_status.lon = data.lon self.__app.gnss_status.alt = data.hMSL / 1000 # meters self.__app.gnss_status.hacc = data.hAcc / 1000 # meters self.__app.gnss_status.vacc = data.vAcc / 1000 # meters self.__app.gnss_status.speed = data.gSpeed / 1000 # m/s self.__app.gnss_status.track = data.headMot self.__app.gnss_status.fix = fix2desc("HNR-PVT", data.gpsFix) self.__app.gnss_status.diff_corr = data.DiffSoln if data.DiffSoln > 0: self.__app.gnss_status.fix = fix2desc("HNR-PVT", 6) def _process_RXM_RTCM(self, data: UBXMessage): """ Process RXM-RTCM sentences - Status Information. :param UBXMessage data: RXM-RTCM parsed message """ self.__app.gnss_status.diff_corr = data.msgUsed >= 1 self.__app.gnss_status.diff_station = data.refStation def _process_MON_SPAN(self, data: UBXMessage): """ Process MON-SPAN sentences - Spectrum Information. :param UBXMessage data: MON-SPAN parsed message """ numrf = data.numRfBlocks rfbs = [] for i in range(1, numrf + 1): idx = f"_{i:02}" spec = getattr(data, "spectrum" + idx) spn = getattr(data, "span" + idx) res = getattr(data, "res" + idx) ctr = getattr(data, "center" + idx) pga = getattr(data, "pga" + idx) rfbs.append((spec, spn, res, ctr, pga)) self.__app.gnss_status.spectrum_data = rfbs def _process_RXM_SPARTN_KEY(self, data: UBXMessage): """ Process RXM-SPARTN_KEY sentences - poll response. :param UBXMessage data: RXM-SPARTN_KEY poll response message """ if self.__app.dialog(DLGTSPARTN) is not None: self.__app.dialog(DLGTSPARTN).update_pending(data) def _process_RXM_PMP(self, data: UBXMessage): """ Process RXM-PMP sentence - SPARTN L-Band data. :param UBXMessage data: RXM-PMP message """ if self.__app.dialog(DLGTSPARTN) is not None: self.__app.dialog(DLGTSPARTN).update_pending(data) def _process_ESF_ALG(self, data: UBXMessage): """ Process ESF-ALG sentence - External Sensor Fusion IMU Alignment. :param UBXMessage data: ESF-ALG message """ ims = self.__app.gnss_status.imu_data ims["source"] = data.identity ims["roll"] = data.roll ims["pitch"] = data.pitch ims["yaw"] = data.yaw ims["status"] = data.status def _process_NAV_ATT(self, data: UBXMessage): """ Process NAV_ATT sentence - Navigation Attitude. :param UBXMessage data: NAV_ATT message """ ims = self.__app.gnss_status.imu_data ims["source"] = data.identity ims["roll"] = data.roll ims["pitch"] = data.pitch ims["yaw"] = data.heading ims["status"] = "" def _process_HNR_ATT(self, data: UBXMessage): """ Process HNR_ATT sentence - High Rate Navigation Attitude.. :param UBXMessage data: HNR_ATT message """ ims = self.__app.gnss_status.imu_data ims["source"] = data.identity ims["roll"] = data.roll ims["pitch"] = data.pitch ims["yaw"] = data.heading ims["status"] = ""