Source code for pyspartn.spartnreader

"""
SPARTNReader class.

The SPARTNReader class will parse individual SPARTN messages
from any binary stream containing *solely* SPARTN data e.g. an
MQTT `/pp/ip` topic.

Information from pubic domain SPARTN Protocol v2.0.3 (November 2025) © 2025 u-blox AG.
https://www.spartnformat.org/download/

SPARTN 1X transport layer bit format:

+-----------+------------+-------------+-------------+------------+-----------+
| preamble  | framestart | payload     |  payload    | embedded   |   crc     |
|           |            | descriptor  |             | auth data  |           |
+===========+============+=============+=============+============+===========+
| 8 bits    |  24 bits   | 32-64 bits  | 8-8192 bits | 0-512 bits | 8-32 bits |
| 0x73 's'  |            |             |             |            |           |
+-----------+------------+-------------+-------------+------------+-----------+

NB Use of gnssTimeTag for message decryption:

The SPARTN protocol requires a key and basedate to calculate the Initialisation
Vector (IV) for encrypted messages (eaf=1). The key is provided by the
SPARTN service provider. The basedate is derived in one of two ways:

1. For messages with unambiguous 32-bit gnssTimeTag values (timeTagtype = 1),
   the basedate is the gnssTimeTag. No other information is needed.

2. For messages with ambiguous 16-bit gnssTimeTag values (timeTagtype = 0),
   the basedate can be derived from a 32-bit gnssTimeTag for the same message
   subtype (GPS, GLO, etc.) from the same datastream, or provided as an external
   parameter. SPARTNReader will accumulate any 32-bit gnssTimeTag in the incoming
   datastream for use in decryption.


Created on 10 Feb 2023

:author: semuadmin (Steve Smith)
:copyright: semuadmin © 2023
:license: BSD 3-Clause
"""

# pylint: disable=invalid-name too-many-instance-attributes

from datetime import datetime
from logging import getLogger
from os import getenv
from socket import socket
from types import FunctionType, NoneType

from pyspartn.exceptions import (
    SPARTNDecryptionError,
    SPARTNMessageError,
    SPARTNParseError,
    SPARTNStreamError,
    SPARTNTypeError,
)
from pyspartn.socket_wrapper import SocketWrapper
from pyspartn.spartnhelpers import bitsval, valid_crc
from pyspartn.spartnmessage import SPARTNMessage
from pyspartn.spartntables import ALN_ENUM
from pyspartn.spartntypes_core import ERRLOG, ERRRAISE, SPARTN_PREB, VALCRC


[docs] class SPARTNReader: """ SPARTNReader class. """
[docs] def __init__( self, datastream, validate: int = VALCRC, quitonerror: int = ERRLOG, decode: bool = False, key: str | NoneType = None, basedate: datetime | int | NoneType = None, bufsize: int = 4096, errorhandler: FunctionType | NoneType = None, timetags: dict | NoneType = None, ): """Constructor. :param datastream stream: input data stream :param int validate: VALCRC (1) = validate CRC, VALNONE (1) = ignore invalid CRC (1) :param int quitonerror: ERROR_IGNORE (0) = ignore, ERROR_LOG (1) = log and continue, ERROR_RAISE (2) = (re)raise (1) :param bool decode: decrypt and decode payload (False) :param str | NoneType key: decryption key as hexadecimal string (None) :param datetime | int | NoneType basedate: decryption basedate as datetime or 32-bit gnssTimeTag as integer (None). If basedate = TIMEBASE, SPARTNMessage will use timetags argument :param int bufsize: socket recv buffer size (4096) :param FunctionType | NoneType errorhandler: error handling object or function (None) :param dict | NoneType timetags: dict of decryption timetags in format {0: 442626332, 1: 449347321, 2: 412947745} where key = msgSubtype (0=GPS, 1=GLO, etc) and value = gnssTimeTag (None) :raises: ParameterError if invalid parameters :raises: SPARTNDecryptionError if unable to decrypt message using key and basedate/timetags provided :raises: SPARTN***Error if unable to parse message """ # pylint: disable=too-many-arguments self._logger = getLogger(__name__) if isinstance(datastream, socket): self._stream = SocketWrapper(datastream, bufsize=bufsize) else: self._stream = datastream self.key = getenv("MQTTKEY", None) if key is None else key self._validate = validate self._quitonerror = quitonerror self._errorhandler = errorhandler self._decode = decode self._key = key self._basedate = basedate # accumlated array of 32-bit gnssTimeTag from datastream self._timetags = {} if timetags is None else timetags
# if self._decode and self._key is None: # raise ParameterError("Key must be provided if decoding is enabled") def __iter__(self): """Iterator.""" return self def __next__(self) -> tuple: """ Return next item in iteration. :return: tuple of (raw_data as bytes, parsed_data as SPARTNessage or None if error) :rtype: tuple :raises: StopIteration """ raw_data, parsed_data = self.read() if raw_data is None and parsed_data is None: raise StopIteration return (raw_data, parsed_data)
[docs] def read(self) -> tuple: """ Read a single SPARTN message from the stream buffer and return both raw and parsed data. The 'quitonerror' flag determines whether to raise, log or ignore parsing errors. If error and quitonerror = 1, the 'parsed' value will contain the error message. :return: tuple of (raw_data as bytes, parsed_data as SPARTNMessage) :rtype: tuple :raises: SPARTN***Error if error during parsing """ parsing = True raw_data = parsed_data = None while parsing: # loop until end of valid message or EOF try: raw_data = None parsed_data = None byte1 = self._read_bytes(1) # read the first byte # if not SPARTN, discard and continue if byte1 != SPARTN_PREB: raise SPARTNParseError(f"Unknown protocol {byte1}") raw_data, parsed_data = self._parse_spartn(byte1) parsing = False except EOFError: return (None, None) except ( SPARTNParseError, SPARTNMessageError, SPARTNTypeError, SPARTNStreamError, SPARTNDecryptionError, ) as err: if self._quitonerror: self._do_error(err) continue return (raw_data, parsed_data)
def _parse_spartn(self, preamble: bytes) -> tuple: """ Parse any SPARTN data in the stream. The structure of the transport layer depends on encryption type, GNSS timetag format and CRC format. :param preamble hdr: preamble of SPARTN message :return: tuple of (raw_data as bytes, parsed_stub as SPARTNMessage) :rtype: tuple :raises: SPARTN...Error if CRC invalid or other parsing error """ # pylint: disable=unused-variable framestart = self._read_bytes(3) # msgType = bitsval(framestart, 0, 7) nData = bitsval(framestart, 7, 10) eaf = bitsval(framestart, 17, 1) crcType = bitsval(framestart, 18, 2) # frameCrc = bitsval(framestart, 20, 4) payDesc = self._read_bytes(4) # msgSubtype denotes constellation - GPS, GLO, GAL, etc. msgSubtype = bitsval(payDesc, 0, 4) timeTagtype = bitsval(payDesc, 4, 1) if timeTagtype: payDesc += self._read_bytes(2) gtlen = 32 if timeTagtype else 16 gnssTimeTag = bitsval(payDesc, 5, gtlen) # store 32-bit timetag for this subtype for later use in decryption if timeTagtype == 1: self._timetags[msgSubtype] = gnssTimeTag # solutionId = bitsval(payDesc, gtlen + 5, 7) # solutionProcId = bitsval(payDesc, gtlen + 12, 4) authInd = 0 embAuthLen = 0 if eaf: payDesc += self._read_bytes(2) # encryptionId = bitsval(payDesc, gtlen + 16, 4) # encryptionSeq = bitsval(payDesc, gtlen + 20, 6) authInd = bitsval(payDesc, gtlen + 26, 3) embAuthLen = bitsval(payDesc, gtlen + 29, 3) payload = self._read_bytes(nData) embAuth = b"" if authInd > 1: aln = ALN_ENUM.get(embAuthLen, 0) embAuth = self._read_bytes(aln) crcb = self._read_bytes(crcType + 1) crc = int.from_bytes(crcb, "big") # validate CRC core = framestart + payDesc + payload + embAuth raw_data = preamble + core + crcb if self._validate & VALCRC: if not valid_crc(core, crc, crcType): raise SPARTNParseError(f"Invalid CRC {crc}") parsed_data = self.parse( raw_data, validate=self._validate, decode=self._decode, key=self._key, basedate=self._basedate, timetags=self.timetags, ) return (raw_data, parsed_data) def _read_bytes(self, size: int) -> bytes: """ Read a specified number of bytes from stream. :param int size: number of bytes to read :return: bytes :rtype: bytes :raises: EOFError if stream ends prematurely """ data = self._stream.read(size) if len(data) == 0: # EOF raise EOFError() if 0 < len(data) < size: # truncated stream raise SPARTNStreamError( "Serial stream terminated unexpectedly. " f"{size} bytes requested, {len(data)} bytes returned." ) return data def _do_error(self, err: Exception): """ Handle error. :param Exception err: error message :raises: Exception if quitonerror = 2 """ if self._quitonerror == ERRRAISE: raise err from err if self._quitonerror == ERRLOG: # pass to error handler if there is one if self._errorhandler is None: self._logger.error(err) else: self._errorhandler(err) @property def datastream(self) -> object: """ Getter for stream. :return: data stream :rtype: object """ return self._stream @property def timetags(self) -> dict: """ Getter for accumulated 32-bit gnssTimeTag time tags from data stream. Can be used as a source of decryption basedate for each msgSubtype (i.e. GNSS constellation) if no other basedate is supplied. :return: dict of gnssTimeTag from data stream (key is msgSubtype) :rtype: dict """ return self._timetags
[docs] @staticmethod def parse( message: bytes, validate: int = VALCRC, decode: bool = False, key: str = None, basedate: object = None, timetags: dict = None, ) -> SPARTNMessage: """ Parse SPARTN message to SPARTNMessage object. :param bytes message: SPARTN raw message bytes :param int validate: 0 = ignore invalid CRC, 1 = validate CRC (1) :param int decode: decode payload True/False :param str key: decryption key (required if decode = 1) :param object basedate: basedate as datetime or 32-bit gnssTimeTag as integer (None) :param dict timetags: dict of accumulated gnssTimeTags from data stream (None) :return: SPARTNMessage object :rtype: SPARTNMessage :raises: SPARTN...Error (if data stream contains invalid data or unknown message type) """ # pylint: disable=unused-argument # if decode and key is None: # raise ParameterError("Key must be provided if decoding is enabled") return SPARTNMessage( transport=message, validate=validate, decode=decode, key=key, basedate=basedate, timetags=timetags, )