Source code for driver2200087.runner

#!/usr/bin/env python
# encoding: utf-8

# Copyright (C) 2015 Chintalagiri Shashank
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
This module provides an asynchronous backend to the RadioShack 2200087
multimeter's PC interface. It uses crochet to provide a synchronous API
to an underlying Twisted based implementation.

While the intent of this module is to allow the use of the device from
within a larger framework, the use of crochet should allow the use of
this API and therefore the instrument in a naive python script as well.

See the 'main' section of this file for a minimal example of it's usage.
"""


from collections import deque

from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet.protocol import connectionDone
from twisted.internet.serialport import SerialPort

from serialDecoder import process_chunk
from serialDecoder import detect_device_port

from crochet import setup
from crochet import run_in_reactor
from crochet import wait_for

from twisted.internet import reactor


[docs]def unwrap_failures(err): """ Takes nested failures and flattens the nodes into a list. The branches are discarded. """ errs = [] check_unwrap = [err] while len(check_unwrap) > 0: err = check_unwrap.pop() if hasattr(err.value, 'reasons'): errs.extend(err.value.reasons) check_unwrap.extend(err.value.reasons) else: errs.append(err) return errs
[docs]class InstProtocol2200087(Protocol): """ This is a twisted protocol which handles serial communications with 2200087 multimeters. This protocol exists and operates within the context of a twisted reactor. Applications themselves built on twisted should be able to simply import this protocol (or its factory). If you would like the protocol to produce datapoints in a different format, this protocol should be sub-classed in order to do so. The changes necessary would likely begin in this class's frame_recieved() function. Synchronous / non-twisted applications should use the InstInterface2200087 class instead. The InstInterface2200087 class accepts a parameter to specify which protocol factory to use, in case you intend to subclass this protocol. :param port: Port on which the device is connected. Default '/dev/ttyUSB0'. :type port: str :param buffer_size: Length of the point buffer in the protocol. Default 100. :type buffer_size: int """ def __init__(self, port, buffer_size=100): self._buffer = "" self._frame_size = 14 self._point_buffer_size = buffer_size self.point_buffer = None self.reset_buffer() self._serial_port = port self._serial_transport = None self._frame_processor = process_chunk
[docs] def reset_buffer(self): """ Resets the point buffer. Any data presently within it will be lost. """ self.point_buffer = deque(maxlen=self._point_buffer_size)
[docs] def make_serial_connection(self): """ Creates the serial connection to the port specified by the instance's _serial_port variable and sets the instance's _serial_transport variable to the twisted.internet.serialport.SerialPort instance. """ self._serial_transport = SerialPort(self, self._serial_port, reactor, baudrate=2400, bytesize=8, parity='N', stopbits=1, timeout=5, xonxoff=0, rtscts=0)
[docs] def break_serial_connection(self): """ Calls loseConnection() on the instance's _serial_transport object. """ self._serial_transport.loseConnection()
[docs] def connectionMade(self): """ This function is called by twisted when a connection to the serial transport is successfully opened. """ pass
[docs] def connectionLost(self, reason=connectionDone): """ This function is called by twisted when the connection to the serial transport is lost. """ print "Lost Connection to Device" print reason
[docs] def dataReceived(self, data): """ This function is called by twisted when new bytes are received by the serial transport. This data is appended to the protocol's framing buffer, _buffer, and when the length of the buffer is longer than the frame size, that many bytes are pulled out of the start of the buffer and frame_recieved is called with the frame. This function also performs the initial frame synchronization by dumping any bytes in the beginning of the buffer which aren't the first byte of the frame. In its steady state, the protocol framing buffer will always have the beginning of a frame as the first element. :param data: The data bytes received :type data: str """ self._buffer += data while len(self._buffer) and self._buffer[0].encode('hex')[0] != '1': self._buffer = self._buffer[1:] while len(self._buffer) >= self._frame_size: self.frame_received(self._buffer[0:self._frame_size]) self._buffer = self._buffer[self._frame_size:]
[docs] def frame_received(self, frame): """ This function is called by data_received when a full frame is received by the serial transport and the protocol. This function recasts the frame into the format used by the serialDecoder and then uses that module to process the frame into the final string. This string is then appended to the protocol's point buffer. This string is treated as a fully processed datapoint for the purposes of this module. :param frame: The full frame representing a single data point :type frame: str """ frame = [byte.encode('hex') for byte in frame] chunk = ' '.join(frame) point = self._frame_processor(chunk) self.point_buffer.append(point)
[docs] def latest_point(self, flush=True): """ This function can be called to obtain the latest data point from the protocol's point buffer. The intended use of this function is to allow random reads from the DMM. Such a typical application will want to discard all the older data points (including the one returned), which it can do with flush=True. This function should only be called when there is data already in the protocol buffer, which can be determined using data_available(). This is a twisted protocol function, and should not be called directly by synchronous / non-twisted code. Instead, its counterpart in the InstInterface object should be used. :param flush: Whether to flush all the older data points. :type flush: bool :return: Latest Data Point as processed by the serialDecoder :rtype: str """ rval = self.point_buffer[-1] if flush is True: self.point_buffer.clear() return rval
[docs] def next_point(self): """ This function can be called to obtain the next data point from the protocol's point buffer. The intended use of this function is to allow continuous streaming reads from the DMM. Such a typical application will want to pop the element from the left of the point buffer, which is what this function does. This function should only be called when there is data already in the protocol buffer, which can be determined using data_available(). This is a twisted protocol function, and should not be called directly by synchronous / non-twisted code. Instead, its counterpart in the InstInterface object should be used. :return: Next Data Point in the point buffer as processed by the serialDecoder :rtype: str """ return self.point_buffer.popleft()
[docs] def next_chunk(self): """ This function can be called to obtain a copy of the protocol's point buffer with all but the latest point in protocol's point buffer. The intended use of this function is to allow continuous streaming reads from the DMM. Such a typical application will want to pop the elements from the left of the point buffer, which is what this function does. This function should only be called when there is data already in the protocol buffer, which can be determined using data_available(). This is a twisted protocol function, and should not be called directly by synchronous / non-twisted code. Instead, its counterpart in the InstInterface object should be used. :return: Copy of point_buffer with all but the latest_point :rtype: deque """ rval = self.point_buffer self.point_buffer = deque([rval.pop()], maxlen=self._point_buffer_size) return rval
[docs] def data_available(self): """ This function can be called to read the number of data points waiting in the protocol's point buffer. This is a twisted protocol function, and should not be called directly by synchronous / non-twisted code. Instead, its counterpart in the InstInterface object should be used. :return: Number of points waiting in the protocol's point buffer :rtype: int """ return len(self.point_buffer)
[docs]class InstFactory2200087(Factory): """ This is a twisted protocol factory which produces twisted protocol objects which handle serial communications with 2200087 multimeters. This class is typically not to be instantiated by application code. This module includes a single instance of this class (factory), which can be used to create as many such objects as are necessary. This protocol factory exists and operates within the context of a twisted reactor. Applications themselves built on twisted should be able to simply import this protocol factory. Synchronous / non-twisted applications should use the InstInterface2200087 class instead. """ def __init__(self): self.instances = []
[docs] def buildProtocol(self, port, buffer_size=100): """ This function returns a InstProtocol2200087 instance, bound to the port specified by the param port. This is a twisted protocol factory function, and should not be called directly by synchronous / non-twisted code. The InstInterface2200087 class should be instantiated instead. :param port: Serial port identifier to which the device is connected :type port: str :param buffer_size: Length of the point buffer in the protocol. Default 100. :type buffer_size: int """ instance = InstProtocol2200087(port, buffer_size=buffer_size) return instance
factory = InstFactory2200087()
[docs]class InstInterface2200087(object): """ This class provides an synchronous / non-twisted interface to 2200087 multimeters. It uses the underlying _protocol object which does most of the heavy lifting using twisted / crochet. For each DMM you want to connect to, instantiate this class once with the correct serial port string. If you would like to use a custom protocol to interface with the device, you can do so by passing in the custom protocol factory as the named parameter pfactory. See the documentation of the default protocol object for information on creating a custom Protocol class. :param port: Port on which the device is connected. Default '/dev/ttyUSB0'. :type port: str :param buffer_size: Length of the point buffer in the protocol. Default 100. :type buffer_size: int :param pfactory: Custom protocol factory to use, if not the one implemented here. :type pfactory: InstFactory2200087 Your application code is expected to setup crochet before creating the instance. A short example : >>> from crochet import setup >>> setup() >>> from driver2200087.runner import InstInterface2200087 >>> dmm = InstInterface2200087('/dev/ttyUSB0') >>> dmm.connect() >>> print dmm.latest_point() """ def __init__(self, port=None, buffer_size=100, pfactory=factory): if port is None: port = detect_device_port() self._port = port self._protocol = pfactory.buildProtocol(port, buffer_size) @run_in_reactor
[docs] def connect(self): """ This function connects to the serial port specified during the instantiation of the class. This function should be called before anything else can be done with the object. """ self._protocol.make_serial_connection()
@run_in_reactor
[docs] def disconnect(self): """ This function disconnects from the serial port specified during the instantiation of the class. """ self._protocol.break_serial_connection()
@wait_for(timeout=1)
[docs] def latest_point(self, flush=True): """ This function can be called to obtain the latest data point from the protocol's point buffer. The intended use of this function is to allow random reads from the DMM. Such a typical application will want to discard all the older data points (including the one returned), which it can do with flush=True. This function should only be called when there is data already in the protocol buffer, which can be determined using data_available(). :param flush: Whether to flush all the older data points. :type flush: bool :return: Latest Data Point as processed by the protocol :rtype: str or type of each datapoint """ return self._protocol.latest_point(flush)
@wait_for(timeout=1)
[docs] def next_point(self): """ This function can be called to obtain the next data point from the protocol's point buffer. The intended use of this function is to allow continuous streaming reads from the DMM. Such a typical application will want to pop the element from the left of the point buffer, which is what this function does. This function should only be called when there is data already in the protocol buffer, which can be determined using data_available(). :return: Next Data Point in the point buffer as processed by the protocol :rtype: str or type of each datapoint """ return self._protocol.next_point()
@wait_for(timeout=1)
[docs] def next_chunk(self): """ This function can be called to obtain the next chunk of data from the protocol's point buffer. The intended use of this function is to allow continuous streaming reads from the DMM. Such a typical application will want to pop the elements from the left of the point buffer, which is what this function effectively does. This function should only be called when there is data already in the protocol buffer, which can be determined using data_available(). :return: Point buffer with all but the latest point in the protocol's point buffer :rtype: deque or type of the point_buffer """ return self._protocol.next_chunk()
@wait_for(timeout=1)
[docs] def data_available(self): """ This function can be called to read the number of data points waiting in the protocol's point buffer. :return: Number of points waiting in the protocol's point buffer :rtype: int """ return self._protocol.data_available()
@wait_for(timeout=1)
[docs] def reset_buffer(self): """ This function can be called to reset the point buffer. This should be used for starting wave acquisition. """ return self._protocol.reset_buffer()
if __name__ == '__main__': setup() dmm = InstInterface2200087() dmm.connect() while True: if dmm.data_available() > 0: print dmm.next_point()