Source code for pygnssutils.gnssserver

"""
gnssserver.py

This is a simple implementation of a TCP Socket
Server or NTRIP Server which reads the binary data stream from
a connected GNSS receiver and broadcasts the data to any
TCP socket or NTRIP client running on a local or remote
machine.

Created on 24 May 2022

:author: semuadmin (Steve Smith)
:copyright: semuadmin © 2022
:license: BSD 3-Clause
"""

# pylint: disable=too-many-arguments

from logging import getLogger
from os import getenv
from queue import Queue
from threading import Thread
from time import sleep
from typing import Literal

from pygnssutils.globals import (
    NTRIP2,
    OUTPORT,
    PYGNSSUTILS_PEM,
    PYGNSSUTILS_PEMPATH,
    RTCMSTR,
)
from pygnssutils.gnssstreamer import GNSSStreamer
from pygnssutils.helpers import format_conn, ipprot2int
from pygnssutils.socket_server import ClientHandler, ClientHandlerTLS, SocketServer


[docs] class GNSSSocketServer: """ GNSS Socket Server Class. """
[docs] def __init__( self, app=None, stream: object = None, ipprot: Literal["IPv4", "IPv6"] = "IPv4", hostip: str = "0.0.0.0", outport: int = OUTPORT, tls: bool = False, maxclients: int = 5, ntripmode: Literal[0, 1] = 0, ntripversion: Literal["1.0", "2.0"] = NTRIP2, ntripuser: str = "anon", ntrippassword: str = "password", tlspempath: str = getenv(PYGNSSUTILS_PEMPATH, PYGNSSUTILS_PEM), ntriprtcmstr: str = RTCMSTR, **kwargs, ): """ Context manager constructor. Example of usage: gnssserver inport=COM3 hostip=192.168.0.20 outport=50010 ntripmode=0 :param object app: application from which this class is invoked (None) :param object stream: input datastream :param Literal["IPv4", "IPv6"] ipprot: IP protocol IPv4/IPv6 ("IPv4") :param int hostip: host ip address (0.0.0.0) :param int outport: TCP port (50010) :param bool tls: Enable TLS (HTTPS) (False) :param int maxclients: maximum number of connected clients (5) :param Literal[0,1] ntripmode: 0 = socket server, 1 - NTRIP server (0) :param Literal["1.0","2.0"] ntripversion: NTRIP version "1.0"/"2.0" ("2.0") :param str ntripuser: NTRIP caster authentication user ("anon") :param str ntrippassword: NTRIP caster authentication password ("password") :param str tlspempath: Path to TLS PEM file ("pygnssutils.pem") :param str ntriprtcmstr: NTRIP caster RTCM types sourcetable entry e.g. '1006(5),1077(1),...' :param dict kwargs: optional keyword arguments to pass to GNSSStreamer """ # Reference to calling application class (if applicable) self.__app = app # pylint: disable=unused-private-member # configure logger with name "pygnssutils" in calling module self.logger = getLogger(__name__) self.logger.debug(kwargs) try: self._stream = stream self._ipprot = ipprot self._ntripmode = int(ntripmode) self._ntripversion = ntripversion self._ntripuser = ntripuser self._ntrippassword = ntrippassword self._tlspempath = tlspempath self._ntriprtcmstr = ntriprtcmstr self._hostip = hostip self._outport = int(outport) self._tls = tls self._maxclients = int(maxclients) self._kwargs = kwargs self._output = Queue() self._socket_server = None self._streamer = None self._in_thread = None self._out_thread = None self._validargs = True except ValueError as err: self.logger.critical(f"Invalid input arguments {kwargs}\n{err}") self._validargs = False
def __enter__(self): """ Context manager enter routine. """ self.run() return self def __exit__(self, exc_type, exc_value, exc_traceback): """ Context manager exit routine. Terminates SocketServer client threads in an orderly fashion. """ self.stop()
[docs] def run(self) -> int: """ Run server. :returns: rc 0 = fail, 1 = ok :rtype: int """ if self._validargs: self.logger.info("Starting server (type CTRL-C to stop)...") self._in_thread = self._start_input_thread(**self._kwargs) sleep(0.5) if self._in_thread.is_alive(): self._out_thread = self._start_output_thread() sleep(0.5) if self._out_thread.is_alive(): return 1 return 0
[docs] def stop(self): """ Shutdown server. """ self.logger.info("Stopping server...") if self._streamer is not None: self._streamer.stop() if self._socket_server is not None: self._socket_server.shutdown() self.logger.info("Server shutdown.")
def _start_input_thread(self, **kwargs) -> Thread: """ Start input (read) thread. :param dict kwargs: optional keyword arguments to pass to GNSSStreamer :returns: thread :rtype: Thread """ self.logger.info(f"Starting input thread, reading from {self._stream}...") thread = Thread( target=self._input_thread, args=(self._stream, self._output, kwargs), daemon=True, ) thread.start() return thread def _start_output_thread(self) -> Thread: """ Start output (socket) thread. :returns: thread :rtype: Thread """ self.logger.info( f"Starting output thread, broadcasting on {self._hostip}:{self._outport}..." ) thread = Thread( target=self._output_thread, args=( self._ipprot, self._hostip, self._outport, self._tls, self._ntripmode, self._maxclients, self._output, self._ntripversion, self._ntripuser, self._ntrippassword, self._tlspempath, self._ntriprtcmstr, ), daemon=True, ) thread.start() return thread def _input_thread(self, stream, output, kwargs): """ THREADED Input (Serial reader) thread. :param object stream: input datastream :param Queue output: output queue :param dict kwargs: optional keyword arguments to pass to GNSSStreamer """ with GNSSStreamer(self, stream, outqueue=output, **kwargs) as self._streamer: while True: sleep(1) def _output_thread( self, ipprot: str, hostip: str, outport: int, tls: bool, ntripmode: Literal[0, 1], maxclients: int, output: object, ntripversion: Literal["1.0", "2.0"], ntripuser: str, ntrippassword: str, tlspempath: str, ntriprtcmstr: str, ): """ THREADED Output (socket server) thread. :param str ipprot: IP protocol :param int hostip: host ip address :param str outport: TCP port :param bool tls: Use TLS :param int maxclients: maximum number of connected clients :param Literal[0,1] ntripmode: :param Literal["1.0","2.0"] ntripversion: NTRIP version :param str ntripuser: NTRIP caster authentication user :param str ntrippassword: NTRIP caster authentication password :param str tlspempath: path to TLS PEM file """ requesthandler = ClientHandlerTLS if tls else ClientHandler try: conn = format_conn(ipprot2int(ipprot), hostip, outport) with SocketServer( self, ntripmode, maxclients, output, conn, requesthandler, ntripversion=ntripversion, ntripuser=ntripuser, ntrippassword=ntrippassword, ipprot=ipprot, tlspempath=tlspempath, ntriprtcmstr=ntriprtcmstr, ) as self._socket_server: self._socket_server.serve_forever() except OSError as err: self.logger.critical(f"Error starting socket server {err}")