"""
UBXReader class.
Reads and parses individual UBX, NMEA or RTCM3 messages from any viable
data stream which supports a read(n) -> bytes method.
Returns both the raw binary data (as bytes) and the parsed data
(as a UBXMessage, NMEAMessage or RTCMMessage object).
- 'protfilter' governs which protocols (NMEA, UBX or RTCM3) are processed
- 'quitonerror' governs how errors are handled
- 'msgmode' indicates the type of UBX datastream (output GET, input SET, query POLL).
If msgmode is set to SETPOLL, input/query mode will be automatically detected by parser.
Created on 2 Oct 2020
:author: semuadmin
:copyright: SEMU Consulting © 2020
:license: BSD 3-Clause
"""
from logging import getLogger
from socket import socket
import pynmeagps.exceptions as nme
import pyrtcm.exceptions as rte
from pynmeagps import NMEA_HDR, NMEAReader, SocketWrapper
from pyrtcm import RTCMReader
from pyubx2.exceptions import (
UBXMessageError,
UBXParseError,
UBXStreamError,
UBXTypeError,
)
from pyubx2.ubxhelpers import bytes2val, calc_checksum, getinputmode, val2bytes
from pyubx2.ubxmessage import UBXMessage
from pyubx2.ubxtypes_core import (
ERR_LOG,
ERR_RAISE,
GET,
NMEA_PROTOCOL,
POLL,
RTCM3_PROTOCOL,
SET,
SETPOLL,
U2,
UBX_HDR,
UBX_PROTOCOL,
VALCKSUM,
)
[docs]
class UBXReader:
"""
UBXReader class.
"""
[docs]
def __init__(
self,
datastream,
msgmode: int = GET,
validate: int = VALCKSUM,
protfilter: int = NMEA_PROTOCOL | UBX_PROTOCOL | RTCM3_PROTOCOL,
quitonerror: int = ERR_LOG,
parsebitfield: bool = True,
labelmsm: int = 1,
bufsize: int = 4096,
parsing: bool = True,
errorhandler: object = None,
):
"""Constructor.
:param datastream stream: input data stream
:param int msgmode: 0=GET, 1=SET, 2=POLL, 3=SETPOLL (0)
:param int validate: VALCKSUM (1) = Validate checksum,
VALNONE (0) = ignore invalid checksum (1)
:param int protfilter: NMEA_PROTOCOL (1), UBX_PROTOCOL (2), RTCM3_PROTOCOL (4),
Can be OR'd (7)
:param int quitonerror: ERR_IGNORE (0) = ignore errors, ERR_LOG (1) = log continue,
ERR_RAISE (2) = (re)raise (1)
:param bool parsebitfield: 1 = parse bitfields, 0 = leave as bytes (1)
:param int labelmsm: RTCM3 MSM label type 1 = RINEX, 2 = BAND (1)
:param int bufsize: socket recv buffer size (4096)
:param bool parsing: True = parse data, False = don't parse data (output raw only) (True)
:param object errorhandler: error handling object or function (None)
:raises: UBXStreamError (if mode is invalid)
"""
# pylint: disable=too-many-arguments
if isinstance(datastream, socket):
self._stream = SocketWrapper(datastream, bufsize=bufsize)
else:
self._stream = datastream
self._protfilter = protfilter
self._quitonerror = quitonerror
self._errorhandler = errorhandler
self._validate = validate
self._parsebf = parsebitfield
self._labelmsm = labelmsm
self._msgmode = msgmode
self._parsing = parsing
self._logger = getLogger(__name__)
if self._msgmode not in (GET, SET, POLL, SETPOLL):
raise UBXStreamError(
f"Invalid stream mode {self._msgmode} - must be 0, 1, 2 or 3"
)
def __iter__(self):
"""Iterator."""
return self
def __next__(self) -> tuple:
"""
Return next item in iteration.
:return: tuple of (raw_data as bytes, parsed_data as UBXMessage)
:rtype: tuple
:raises: StopIteration
"""
(raw_data, parsed_data) = self.read()
if raw_data is None and parsed_data is None:
raise StopIteration
return (raw_data, parsed_data)
[docs]
def read(self) -> tuple:
"""
Read a single NMEA, UBX or RTCM3 message from the stream buffer
and return both raw and parsed data.
'protfilter' determines which protocols are parsed.
'quitonerror' determines whether to raise, log or ignore parsing errors.
:return: tuple of (raw_data as bytes, parsed_data as UBXMessage, NMEAMessage or RTCMMessage)
:rtype: tuple
:raises: Exception (if invalid or unrecognised protocol in data stream)
"""
parsing = True
while parsing: # loop until end of valid message or EOF
try:
raw_data = None
parsed_data = None
byte1 = self._read_bytes(1) # read the first byte
# if not UBX, NMEA or RTCM3, discard and continue
if byte1 not in (b"\xb5", b"\x24", b"\xd3"):
continue
byte2 = self._read_bytes(1)
bytehdr = byte1 + byte2
# if it's a UBX message (b'\xb5\x62')
if bytehdr == UBX_HDR:
(raw_data, parsed_data) = self._parse_ubx(bytehdr)
# if protocol filter passes UBX, return message,
# otherwise discard and continue
if self._protfilter & UBX_PROTOCOL:
parsing = False
else:
continue
# if it's an NMEA message (b'\x24\x..)
elif bytehdr in NMEA_HDR:
(raw_data, parsed_data) = self._parse_nmea(bytehdr)
# if protocol filter passes NMEA, return message,
# otherwise discard and continue
if self._protfilter & NMEA_PROTOCOL:
parsing = False
else:
continue
# if it's a RTCM3 message
# (byte1 = 0xd3; byte2 = 0b000000**)
elif byte1 == b"\xd3" and (byte2[0] & ~0x03) == 0:
(raw_data, parsed_data) = self._parse_rtcm3(bytehdr)
# if protocol filter passes RTCM, return message,
# otherwise discard and continue
if self._protfilter & RTCM3_PROTOCOL:
parsing = False
else:
continue
# unrecognised protocol header
else:
raise UBXParseError(f"Unknown protocol header {bytehdr}.")
except EOFError:
return (None, None)
except (
UBXMessageError,
UBXTypeError,
UBXParseError,
UBXStreamError,
nme.NMEAMessageError,
nme.NMEATypeError,
nme.NMEAParseError,
nme.NMEAStreamError,
rte.RTCMMessageError,
rte.RTCMParseError,
rte.RTCMStreamError,
rte.RTCMTypeError,
) as err:
if self._quitonerror:
self._do_error(err)
continue
return (raw_data, parsed_data)
def _parse_ubx(self, hdr: bytes) -> tuple:
"""
Parse remainder of UBX message.
:param bytes hdr: UBX header (b'\\xb5\\x62')
:return: tuple of (raw_data as bytes, parsed_data as UBXMessage or None)
:rtype: tuple
"""
# read the rest of the UBX message from the buffer
byten = self._read_bytes(4)
clsid = byten[0:1]
msgid = byten[1:2]
lenb = byten[2:4]
leni = int.from_bytes(lenb, "little", signed=False)
byten = self._read_bytes(leni + 2)
plb = byten[0:leni]
cksum = byten[leni : leni + 2]
raw_data = hdr + clsid + msgid + lenb + plb + cksum
# only parse if we need to (filter passes UBX)
if (self._protfilter & UBX_PROTOCOL) and self._parsing:
parsed_data = self.parse(
raw_data,
validate=self._validate,
msgmode=self._msgmode,
parsebitfield=self._parsebf,
)
else:
parsed_data = None
return (raw_data, parsed_data)
def _parse_nmea(self, hdr: bytes) -> tuple:
"""
Parse remainder of NMEA message (using pynmeagps library).
:param bytes hdr: NMEA header (b'\\x24\\x..')
:return: tuple of (raw_data as bytes, parsed_data as NMEAMessage or None)
:rtype: tuple
"""
# read the rest of the NMEA message from the buffer
byten = self._read_line() # NMEA protocol is CRLF-terminated
raw_data = hdr + byten
# only parse if we need to (filter passes NMEA)
if (self._protfilter & NMEA_PROTOCOL) and self._parsing:
# invoke pynmeagps parser
parsed_data = NMEAReader.parse(
raw_data,
validate=self._validate,
msgmode=self._msgmode,
)
else:
parsed_data = None
return (raw_data, parsed_data)
def _parse_rtcm3(self, hdr: bytes) -> tuple:
"""
Parse any RTCM3 data in the stream (using pyrtcm library).
:param bytes hdr: first 2 bytes of RTCM3 header
:return: tuple of (raw_data as bytes, parsed_stub as RTCMMessage)
:rtype: tuple
"""
hdr3 = self._read_bytes(1)
size = hdr3[0] | (hdr[1] << 8)
payload = self._read_bytes(size)
crc = self._read_bytes(3)
raw_data = hdr + hdr3 + payload + crc
# only parse if we need to (filter passes RTCM)
if (self._protfilter & RTCM3_PROTOCOL) and self._parsing:
# invoke pyrtcm parser
parsed_data = RTCMReader.parse(
raw_data,
validate=self._validate,
labelmsm=self._labelmsm,
)
else:
parsed_data = None
return (raw_data, parsed_data)
def _read_bytes(self, size: int) -> bytes:
"""
Read a specified number of bytes from stream.
:param int size: number of bytes to read
:return: bytes
:rtype: bytes
:raises: UBXStreamError if stream ends prematurely
"""
data = self._stream.read(size)
if len(data) == 0: # EOF
raise EOFError()
if 0 < len(data) < size: # truncated stream
raise UBXStreamError(
"Serial stream terminated unexpectedly. "
f"{size} bytes requested, {len(data)} bytes returned."
)
return data
def _read_line(self) -> bytes:
"""
Read bytes until LF (0x0a) terminator.
:return: bytes
:rtype: bytes
:raises: UBXStreamError if stream ends prematurely
"""
data = self._stream.readline() # NMEA protocol is CRLF-terminated
if len(data) == 0:
raise EOFError() # pragma: no cover
if data[-1:] != b"\x0a": # truncated stream
raise UBXStreamError(
"Serial stream terminated unexpectedly. "
f"Line requested, {len(data)} bytes returned."
)
return data
def _do_error(self, err: Exception):
"""
Handle error.
:param Exception err: error
:raises: Exception if quitonerror = ERR_RAISE (2)
"""
if self._quitonerror == ERR_RAISE:
raise err from err
if self._quitonerror == ERR_LOG:
# pass to error handler if there is one
# else just log
if self._errorhandler is None:
self._logger.error(err)
else:
self._errorhandler(err)
@property
def datastream(self) -> object:
"""
Getter for stream.
:return: data stream
:rtype: object
"""
return self._stream
[docs]
@staticmethod
def parse(
message: bytes,
msgmode: int = GET,
validate: int = VALCKSUM,
parsebitfield: bool = True,
) -> object:
"""
Parse UBX byte stream to UBXMessage object.
:param bytes message: binary message to parse
:param int msgmode: GET (0), SET (1), POLL (2) (0)
:param int validate: VALCKSUM (1) = Validate checksum,
VALNONE (0) = ignore invalid checksum (1)
:param bool parsebitfield: 1 = parse bitfields, 0 = leave as bytes (1)
:return: UBXMessage object
:rtype: UBXMessage
:raises: Exception (if data stream contains invalid data or unknown message type)
"""
# pylint: disable=too-many-arguments
if msgmode not in (GET, SET, POLL, SETPOLL):
raise UBXParseError(
f"Invalid message mode {msgmode} - must be 0, 1, 2 or 3"
)
lenm = len(message)
hdr = message[0:2]
clsid = message[2:3]
msgid = message[3:4]
lenb = message[4:6]
if lenb == b"\x00\x00":
payload = None
leni = 0
else:
payload = message[6 : lenm - 2]
leni = len(payload)
ckm = message[lenm - 2 : lenm]
if payload is not None:
ckv = calc_checksum(clsid + msgid + lenb + payload)
else:
ckv = calc_checksum(clsid + msgid + lenb)
if validate & VALCKSUM:
if hdr != UBX_HDR:
raise UBXParseError(
(f"Invalid message header {hdr}" f" - should be {UBX_HDR}")
)
if leni != bytes2val(lenb, U2):
raise UBXParseError(
(
f"Invalid payload length {lenb}"
f" - should be {val2bytes(leni, U2)}"
)
)
if ckm != ckv:
raise UBXParseError(
(f"Message checksum {ckm}" f" invalid - should be {ckv}")
)
# if input message (SET or POLL), determine mode automatically
if msgmode == SETPOLL:
msgmode = getinputmode(message) # returns SET or POLL
if payload is None:
return UBXMessage(clsid, msgid, msgmode)
return UBXMessage(
clsid,
msgid,
msgmode,
payload=payload,
parsebitfield=parsebitfield,
)