#! python # # Base class and support functions used by various backends. # # This file is part of pySerial. https://github.com/pyserial/pyserial # (C) 2001-2016 Chris Liechti # # SPDX-License-Identifier: BSD-3-Clause import io import time # ``memoryview`` was introduced in Python 2.7 and ``bytes(some_memoryview)`` # isn't returning the contents (very unfortunate). Therefore we need special # cases and test for it. Ensure that there is a ``memoryview`` object for older # Python versions. This is easier than making every test dependent on its # existence. try: memoryview except (NameError, AttributeError): # implementation does not matter as we do not really use it. # it just must not inherit from something else we might care for. class memoryview(object): # pylint: disable=redefined-builtin,invalid-name pass try: unicode except (NameError, AttributeError): unicode = str # for Python 3, pylint: disable=redefined-builtin,invalid-name try: basestring except (NameError, AttributeError): basestring = (str,) # for Python 3, pylint: disable=redefined-builtin,invalid-name # "for byte in data" fails for python3 as it returns ints instead of bytes def iterbytes(b): """Iterate over bytes, returning bytes instead of ints (python3)""" if isinstance(b, memoryview): b = b.tobytes() i = 0 while True: a = b[i:i + 1] i += 1 if a: yield a else: break # all Python versions prior 3.x convert ``str([17])`` to '[17]' instead of '\x11' # so a simple ``bytes(sequence)`` doesn't work for all versions def to_bytes(seq): """convert a sequence to a bytes type""" if isinstance(seq, bytes): return seq elif isinstance(seq, bytearray): return bytes(seq) elif isinstance(seq, memoryview): return seq.tobytes() elif isinstance(seq, unicode): raise TypeError('unicode strings are not supported, please encode to bytes: {!r}'.format(seq)) else: # handle list of integers and bytes (one or more items) for Python 2 and 3 return bytes(bytearray(seq)) # create control bytes XON = to_bytes([17]) XOFF = to_bytes([19]) CR = to_bytes([13]) LF = to_bytes([10]) PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S' STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2) FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8) PARITY_NAMES = { PARITY_NONE: 'None', PARITY_EVEN: 'Even', PARITY_ODD: 'Odd', PARITY_MARK: 'Mark', PARITY_SPACE: 'Space', } class SerialException(IOError): """Base class for serial port related exceptions.""" class SerialTimeoutException(SerialException): """Write timeouts give an exception""" writeTimeoutError = SerialTimeoutException('Write timeout') portNotOpenError = SerialException('Attempting to use a port that is not open') class Timeout(object): """\ Abstraction for timeout operations. Using time.monotonic() if available or time.time() in all other cases. The class can also be initialized with 0 or None, in order to support non-blocking and fully blocking I/O operations. The attributes is_non_blocking and is_infinite are set accordingly. """ if hasattr(time, 'monotonic'): # Timeout implementation with time.monotonic(). This function is only # supported by Python 3.3 and above. It returns a time in seconds # (float) just as time.time(), but is not affected by system clock # adjustments. TIME = time.monotonic else: # Timeout implementation with time.time(). This is compatible with all # Python versions but has issues if the clock is adjusted while the # timeout is running. TIME = time.time def __init__(self, duration): """Initialize a timeout with given duration""" self.is_infinite = (duration is None) self.is_non_blocking = (duration == 0) self.duration = duration if duration is not None: self.target_time = self.TIME() + duration else: self.target_time = None def expired(self): """Return a boolean, telling if the timeout has expired""" return self.target_time is not None and self.time_left() <= 0 def time_left(self): """Return how many seconds are left until the timeout expires""" if self.is_non_blocking: return 0 elif self.is_infinite: return None else: delta = self.target_time - self.TIME() if delta > self.duration: # clock jumped, recalculate self.target_time = self.TIME() + self.duration return self.duration else: return max(0, delta) def restart(self, duration): """\ Restart a timeout, only supported if a timeout was already set up before. """ self.duration = duration self.target_time = self.TIME() + duration class SerialBase(io.RawIOBase): """\ Serial port base class. Provides __init__ function and properties to get/set port settings. """ # default values, may be overridden in subclasses that do not support all values BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000) BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS) PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE) STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO) def __init__(self, port=None, baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=None, xonxoff=False, rtscts=False, write_timeout=None, dsrdtr=False, inter_byte_timeout=None, exclusive=None, **kwargs): """\ Initialize comm port object. If a "port" is given, then the port will be opened immediately. Otherwise a Serial port object in closed state is returned. """ self.is_open = False self.portstr = None self.name = None # correct values are assigned below through properties self._port = None self._baudrate = None self._bytesize = None self._parity = None self._stopbits = None self._timeout = None self._write_timeout = None self._xonxoff = None self._rtscts = None self._dsrdtr = None self._inter_byte_timeout = None self._rs485_mode = None # disabled by default self._rts_state = True self._dtr_state = True self._break_state = False self._exclusive = None # assign values using get/set methods using the properties feature self.port = port self.baudrate = baudrate self.bytesize = bytesize self.parity = parity self.stopbits = stopbits self.timeout = timeout self.write_timeout = write_timeout self.xonxoff = xonxoff self.rtscts = rtscts self.dsrdtr = dsrdtr self.inter_byte_timeout = inter_byte_timeout self.exclusive = exclusive # watch for backward compatible kwargs if 'writeTimeout' in kwargs: self.write_timeout = kwargs.pop('writeTimeout') if 'interCharTimeout' in kwargs: self.inter_byte_timeout = kwargs.pop('interCharTimeout') if kwargs: raise ValueError('unexpected keyword arguments: {!r}'.format(kwargs)) if port is not None: self.open() # - - - - - - - - - - - - - - - - - - - - - - - - # to be implemented by subclasses: # def open(self): # def close(self): # - - - - - - - - - - - - - - - - - - - - - - - - @property def port(self): """\ Get the current port setting. The value that was passed on init or using setPort() is passed back. """ return self._port @port.setter def port(self, port): """\ Change the port. """ if port is not None and not isinstance(port, basestring): raise ValueError('"port" must be None or a string, not {}'.format(type(port))) was_open = self.is_open if was_open: self.close() self.portstr = port self._port = port self.name = self.portstr if was_open: self.open() @property def baudrate(self): """Get the current baud rate setting.""" return self._baudrate @baudrate.setter def baudrate(self, baudrate): """\ Change baud rate. It raises a ValueError if the port is open and the baud rate is not possible. If the port is closed, then the value is accepted and the exception is raised when the port is opened. """ try: b = int(baudrate) except TypeError: raise ValueError("Not a valid baudrate: {!r}".format(baudrate)) else: if b < 0: raise ValueError("Not a valid baudrate: {!r}".format(baudrate)) self._baudrate = b if self.is_open: self._reconfigure_port() @property def bytesize(self): """Get the current byte size setting.""" return self._bytesize @bytesize.setter def bytesize(self, bytesize): """Change byte size.""" if bytesize not in self.BYTESIZES: raise ValueError("Not a valid byte size: {!r}".format(bytesize)) self._bytesize = bytesize if self.is_open: self._reconfigure_port() @property def exclusive(self): """Get the current exclusive access setting.""" return self._exclusive @exclusive.setter def exclusive(self, exclusive): """Change the exclusive access setting.""" self._exclusive = exclusive if self.is_open: self._reconfigure_port() @property def parity(self): """Get the current parity setting.""" return self._parity @parity.setter def parity(self, parity): """Change parity setting.""" if parity not in self.PARITIES: raise ValueError("Not a valid parity: {!r}".format(parity)) self._parity = parity if self.is_open: self._reconfigure_port() @property def stopbits(self): """Get the current stop bits setting.""" return self._stopbits @stopbits.setter def stopbits(self, stopbits): """Change stop bits size.""" if stopbits not in self.STOPBITS: raise ValueError("Not a valid stop bit size: {!r}".format(stopbits)) self._stopbits = stopbits if self.is_open: self._reconfigure_port() @property def timeout(self): """Get the current timeout setting.""" return self._timeout @timeout.setter def timeout(self, timeout): """Change timeout setting.""" if timeout is not None: try: timeout + 1 # test if it's a number, will throw a TypeError if not... except TypeError: raise ValueError("Not a valid timeout: {!r}".format(timeout)) if timeout < 0: raise ValueError("Not a valid timeout: {!r}".format(timeout)) self._timeout = timeout if self.is_open: self._reconfigure_port() @property def write_timeout(self): """Get the current timeout setting.""" return self._write_timeout @write_timeout.setter def write_timeout(self, timeout): """Change timeout setting.""" if timeout is not None: if timeout < 0: raise ValueError("Not a valid timeout: {!r}".format(timeout)) try: timeout + 1 # test if it's a number, will throw a TypeError if not... except TypeError: raise ValueError("Not a valid timeout: {!r}".format(timeout)) self._write_timeout = timeout if self.is_open: self._reconfigure_port() @property def inter_byte_timeout(self): """Get the current inter-character timeout setting.""" return self._inter_byte_timeout @inter_byte_timeout.setter def inter_byte_timeout(self, ic_timeout): """Change inter-byte timeout setting.""" if ic_timeout is not None: if ic_timeout < 0: raise ValueError("Not a valid timeout: {!r}".format(ic_timeout)) try: ic_timeout + 1 # test if it's a number, will throw a TypeError if not... except TypeError: raise ValueError("Not a valid timeout: {!r}".format(ic_timeout)) self._inter_byte_timeout = ic_timeout if self.is_open: self._reconfigure_port() @property def xonxoff(self): """Get the current XON/XOFF setting.""" return self._xonxoff @xonxoff.setter def xonxoff(self, xonxoff): """Change XON/XOFF setting.""" self._xonxoff = xonxoff if self.is_open: self._reconfigure_port() @property def rtscts(self): """Get the current RTS/CTS flow control setting.""" return self._rtscts @rtscts.setter def rtscts(self, rtscts): """Change RTS/CTS flow control setting.""" self._rtscts = rtscts if self.is_open: self._reconfigure_port() @property def dsrdtr(self): """Get the current DSR/DTR flow control setting.""" return self._dsrdtr @dsrdtr.setter def dsrdtr(self, dsrdtr=None): """Change DsrDtr flow control setting.""" if dsrdtr is None: # if not set, keep backwards compatibility and follow rtscts setting self._dsrdtr = self._rtscts else: # if defined independently, follow its value self._dsrdtr = dsrdtr if self.is_open: self._reconfigure_port() @property def rts(self): return self._rts_state @rts.setter def rts(self, value): self._rts_state = value if self.is_open: self._update_rts_state() @property def dtr(self): return self._dtr_state @dtr.setter def dtr(self, value): self._dtr_state = value if self.is_open: self._update_dtr_state() @property def break_condition(self): return self._break_state @break_condition.setter def break_condition(self, value): self._break_state = value if self.is_open: self._update_break_state() # - - - - - - - - - - - - - - - - - - - - - - - - # functions useful for RS-485 adapters @property def rs485_mode(self): """\ Enable RS485 mode and apply new settings, set to None to disable. See serial.rs485.RS485Settings for more info about the value. """ return self._rs485_mode @rs485_mode.setter def rs485_mode(self, rs485_settings): self._rs485_mode = rs485_settings if self.is_open: self._reconfigure_port() # - - - - - - - - - - - - - - - - - - - - - - - - _SAVED_SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff', 'dsrdtr', 'rtscts', 'timeout', 'write_timeout', 'inter_byte_timeout') def get_settings(self): """\ Get current port settings as a dictionary. For use with apply_settings(). """ return dict([(key, getattr(self, '_' + key)) for key in self._SAVED_SETTINGS]) def apply_settings(self, d): """\ Apply stored settings from a dictionary returned from get_settings(). It's allowed to delete keys from the dictionary. These values will simply left unchanged. """ for key in self._SAVED_SETTINGS: if key in d and d[key] != getattr(self, '_' + key): # check against internal "_" value setattr(self, key, d[key]) # set non "_" value to use properties write function # - - - - - - - - - - - - - - - - - - - - - - - - def __repr__(self): """String representation of the current port settings and its state.""" return '{name}(port={p.portstr!r}, ' \ 'baudrate={p.baudrate!r}, bytesize={p.bytesize!r}, parity={p.parity!r}, ' \ 'stopbits={p.stopbits!r}, timeout={p.timeout!r}, xonxoff={p.xonxoff!r}, ' \ 'rtscts={p.rtscts!r}, dsrdtr={p.dsrdtr!r})'.format( name=self.__class__.__name__, id=id(self), p=self) # - - - - - - - - - - - - - - - - - - - - - - - - # compatibility with io library # pylint: disable=invalid-name,missing-docstring def readable(self): return True def writable(self): return True def seekable(self): return False def readinto(self, b): data = self.read(len(b)) n = len(data) try: b[:n] = data except TypeError as err: import array if not isinstance(b, array.array): raise err b[:n] = array.array('b', data) return n # - - - - - - - - - - - - - - - - - - - - - - - - # context manager def __enter__(self): if not self.is_open: self.open() return self def __exit__(self, *args, **kwargs): self.close() # - - - - - - - - - - - - - - - - - - - - - - - - def send_break(self, duration=0.25): """\ Send break condition. Timed, returns to idle state after given duration. """ if not self.is_open: raise portNotOpenError self.break_condition = True time.sleep(duration) self.break_condition = False # - - - - - - - - - - - - - - - - - - - - - - - - # backwards compatibility / deprecated functions def flushInput(self): self.reset_input_buffer() def flushOutput(self): self.reset_output_buffer() def inWaiting(self): return self.in_waiting def sendBreak(self, duration=0.25): self.send_break(duration) def setRTS(self, value=1): self.rts = value def setDTR(self, value=1): self.dtr = value def getCTS(self): return self.cts def getDSR(self): return self.dsr def getRI(self): return self.ri def getCD(self): return self.cd def setPort(self, port): self.port = port @property def writeTimeout(self): return self.write_timeout @writeTimeout.setter def writeTimeout(self, timeout): self.write_timeout = timeout @property def interCharTimeout(self): return self.inter_byte_timeout @interCharTimeout.setter def interCharTimeout(self, interCharTimeout): self.inter_byte_timeout = interCharTimeout def getSettingsDict(self): return self.get_settings() def applySettingsDict(self, d): self.apply_settings(d) def isOpen(self): return self.is_open # - - - - - - - - - - - - - - - - - - - - - - - - # additional functionality def read_all(self): """\ Read all bytes currently available in the buffer of the OS. """ return self.read(self.in_waiting) def read_until(self, terminator=LF, size=None): """\ Read until a termination sequence is found ('\n' by default), the size is exceeded or until timeout occurs. """ lenterm = len(terminator) line = bytearray() timeout = Timeout(self._timeout) while True: c = self.read(1) if c: line += c if line[-lenterm:] == terminator: break if size is not None and len(line) >= size: break else: break if timeout.expired(): break return bytes(line) def iread_until(self, *args, **kwargs): """\ Read lines, implemented as generator. It will raise StopIteration on timeout (empty read). """ while True: line = self.read_until(*args, **kwargs) if not line: break yield line # - - - - - - - - - - - - - - - - - - - - - - - - - if __name__ == '__main__': import sys s = SerialBase() sys.stdout.write('port name: {}\n'.format(s.name)) sys.stdout.write('baud rates: {}\n'.format(s.BAUDRATES)) sys.stdout.write('byte sizes: {}\n'.format(s.BYTESIZES)) sys.stdout.write('parities: {}\n'.format(s.PARITIES)) sys.stdout.write('stop bits: {}\n'.format(s.STOPBITS)) sys.stdout.write('{}\n'.format(s))