"""
NMEAReader class.
Reads and parses individual NMEA GNSS/GPS messages from
any stream which supports a read(n) -> bytes method.
Can also read from socket via SocketStream wrapper.
Returns both the raw binary data (as bytes) and the parsed
data (as a NMEAMessage object).
If the 'nmeaonly' kwarg is set to 'True', the reader
will raise a NMEAParseError if it encounters any non-NMEA
data. Otherwise, it will ignore the non-NMEA data and attempt
to carry on.
Created on 4 Mar 2021
:author: semuadmin
:copyright: SEMU Consulting © 2021
:license: BSD 3-Clause
"""
from socket import socket
import pynmeagps.exceptions as nme
from pynmeagps.nmeahelpers import calc_checksum, get_parts
from pynmeagps.nmeamessage import NMEAMessage
from pynmeagps.nmeatypes_core import (
ERR_LOG,
ERR_RAISE,
GET,
NMEA_HDR,
VALCKSUM,
VALMSGID,
)
from pynmeagps.socket_stream import SocketStream
[docs]
class NMEAReader:
"""
NMEAReader class.
"""
[docs]
def __init__(
self,
stream,
msgmode: int = GET,
validate: int = VALCKSUM,
nmeaonly: bool = False,
quitonerror: int = ERR_LOG,
bufsize: int = 4096,
errorhandler: object = None,
):
"""Constructor.
:param stream stream: input data stream (e.g. Serial or binary File)
:param int msgmode: 0 = GET (default), 1 = SET, 2 = POLL
:param int validate: bitfield validation flags - VALCKSUM (default), VALMSGID
:param bool nmeaonly: True = error on non-NMEA data, False = ignore non-NMEA data
:param int quitonerror: 0 = ignore, 1 = log and continue, 2 = (re)raise (1)
:param int bufsize: socket recv buffer size (4096)
:param object errorhandler: error handling object or function (None)
:raises: NMEAParseError (if mode is invalid)
"""
# pylint: disable=too-many-arguments
if isinstance(stream, socket):
self._stream = SocketStream(stream, bufsize=bufsize)
else:
self._stream = stream
if msgmode not in (0, 1, 2):
raise nme.NMEAParseError(
f"Invalid stream mode {msgmode} - must be 0, 1 or 2."
)
self._quitonerror = quitonerror
self._errorhandler = errorhandler
self._nmea_only = nmeaonly
self._validate = validate
self._mode = msgmode
def __iter__(self):
"""Iterator."""
return self
def __next__(self) -> tuple:
"""
Return next item in iteration.
:return: tuple of (raw_data as bytes, parsed_data as NMEAMessage)
:rtype: tuple
:raises: StopIteration
"""
(raw_data, parsed_data) = self.read()
if raw_data is None and parsed_data is None:
raise StopIteration
return (raw_data, parsed_data)
[docs]
def read(self) -> tuple:
"""
Read the binary data from the stream buffer.
:return: tuple of (raw_data as bytes, parsed_data as NMEAMessage)
:rtype: tuple
:raises: NMEAStreamError (if nmeaonly=True and stream includes non-NMEA data)
"""
parsing = True
raw_data = None
parsed_data = None
try:
while parsing: # loop until end of valid NMEA message or EOF
byte1 = self._read_bytes(1) # read 1st byte
if byte1 != b"\x24": # not NMEA, discard and continue
continue
byte2 = self._read_bytes(1) # read 2nd byte to confirm protocol
bytehdr = byte1 + byte2
if bytehdr in NMEA_HDR: # it's a NMEA message
byten = self._read_line() # NMEA protocol is CRLF terminated
raw_data = bytehdr + byten
parsed_data = self.parse(
raw_data, validate=self._validate, msgmode=self._mode
)
parsing = False
else: # it's not a NMEA message (UBX or something else)
if self._nmea_only: # raise error and quit
raise nme.NMEAParseError(f"Unknown data header {bytehdr}.")
except EOFError:
return (None, None)
except (
nme.NMEAMessageError,
nme.NMEATypeError,
nme.NMEAParseError,
nme.NMEAStreamError,
) as err:
if self._quitonerror:
self._do_error(str(err))
parsed_data = str(err)
return (raw_data, parsed_data)
def _read_bytes(self, size: int) -> bytes:
"""
Read a specified number of bytes from stream.
:param int size: number of bytes to read
:return: bytes
:rtype: bytes
:raises: EOFError if stream ends prematurely
"""
data = self._stream.read(size)
if len(data) < size: # EOF
raise EOFError()
return data
def _read_line(self) -> bytes:
"""
Read until end of line (CRLF).
:return: bytes
:rtype: bytes
:raises: EOFError if stream ends prematurely
"""
data = self._stream.readline() # NMEA protocol is CRLF terminated
if data[-1:] != b"\x0a": # EOF
raise EOFError()
return data
def _do_error(self, err: str):
"""
Handle error.
:param str err: error message
:raises: UBXParseError if quitonerror = 2
"""
if self._quitonerror == ERR_RAISE:
raise nme.NMEAParseError(err)
if self._quitonerror == ERR_LOG:
# pass to error handler if there is one
if self._errorhandler is None:
print(err)
else:
self._errorhandler(err)
@property
def datastream(self) -> object:
"""
Getter for stream.
:return: data stream
:rtype: object
"""
return self._stream
[docs]
@staticmethod
def parse(
message: bytes,
msgmode: int = GET,
validate: int = VALCKSUM,
) -> object:
"""
Parse NMEA byte stream to NMEAMessage object.
:param bytes message: bytes message to parse
:param int msgmode: 0 = GET (default), 1 = SET, 2 = POLL
:param int validate: 1 VALCKSUM (default), 2 VALMSGID (can be OR'd)
:return: NMEAMessage object (or None if unknown message and VALMSGID is not set)
:rtype: NMEAMessage
:raises: NMEAParseError (if data stream contains invalid data or unknown message type)
"""
if msgmode not in (0, 1, 2):
raise nme.NMEAParseError(
f"Invalid parse mode {msgmode} - must be 0, 1 or 2."
)
try:
content, talker, msgid, payload, checksum = get_parts(message)
if validate & VALCKSUM:
ccksum = calc_checksum(content)
if checksum != ccksum:
raise nme.NMEAParseError(
f"Message {talker}{msgid} invalid checksum {checksum}"
f" - should be {ccksum}."
)
return NMEAMessage(
talker, msgid, msgmode, payload=payload, checksum=checksum
)
except nme.NMEAMessageError as err:
if not validate & VALMSGID:
return None
raise nme.NMEAParseError(err)