Source code for pyubx2.socket_stream

"""
socket_stream class.

A skeleton socket wrapper which provides basic stream-like
read(bytes) and readline() methods.

NB: this will read from a socket indefinitely. It is the
responsibility of the calling application to monitor
data returned and implement appropriate socket error,
timeout or inactivity procedures.

Created on 4 Apr 2022

:author: semuadmin
:copyright: SEMU Consulting © 2022
:license: BSD 3-Clause
"""

from socket import socket


[docs] class SocketStream: """ socket stream class. """
[docs] def __init__(self, sock: socket, **kwargs): """ Constructor. :param sock socket: socket object :param int bufsize: (kwarg) internal buffer size (4096) """ self._socket = sock self._bufsize = kwargs.get("bufsize", 4096) self._buffer = bytearray() self._recv() # populate initial buffer
def _recv(self) -> bool: """ Read bytes from socket into internal buffer. :return: return code (0 = failure, 1 = success) :rtype: bool """ try: data = self._socket.recv(self._bufsize) if len(data) == 0: return False self._buffer += data except (OSError, TimeoutError): return False return True @property def buffer(self) -> bytearray: """ Getter for buffer. :return: buffer :rtype: bytearray """ return self._buffer
[docs] def read(self, num: int) -> bytes: """ Read specified number of bytes from buffer. NB: always check length of return data. :param int num: number of bytes to read :return: bytes read (which may be less than num) :rtype: bytes """ # if at end of internal buffer, top it up from socket while len(self._buffer) < num: if not self._recv(): return b"" data = self._buffer[:num] self._buffer = self._buffer[num:] return bytes(data)
[docs] def readline(self) -> bytes: """ Read bytes from buffer until LF reached. NB: always check that return data terminator is LF. :return: bytes :rtype: bytes """ line = b"" while True: data = self.read(1) if len(data) == 1: line += data if line[-1:] == b"\n": # LF break else: break return line
[docs] def write(self, data: bytes, **kwargs): """ Write bytes to socket. :param bytes data: data :return: None if successful :rtype: Nonetype """ return self._socket.send(data, **kwargs)