usbserial4a/usbserial4a/cdcacmserial4a.py

455 wiersze
14 KiB
Python

"""Android USB serial CDC ACM driver.
Classes:
CdcAcmSerial
"""
from struct import pack, unpack
import time
from .utilserial4a import (
SerialBase,
SerialException,
to_bytes,
PortNotOpenError,
Timeout,
)
from usb4a import usb
class CdcAcmSerial(SerialBase):
"""CDC ACM serial port class."""
REQTYPE_HOST2DEVICE = 0x21
REQTYPE_DEVICE2HOST = 0xA1
SET_LINE_CODING = 0x20
GET_LINE_CODING = 0x21
SET_CONTROL_LINE_STATE = 0x22
SEND_BREAK = 0x23
SET_CONTROL_LINE_STATE_DTR = 0x1
SET_CONTROL_LINE_STATE_RTS = 0x2
STOPBIT_MAP = {1: 0, 1.5: 1, 2: 2}
PARITY_MAP = {"N": 0, "O": 1, "E": 2, "M": 3, "S": 4}
DEFAULT_READ_BUFFER_SIZE = 1024
DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024
USB_READ_TIMEOUT_MILLIS = 50
USB_WRITE_TIMEOUT_MILLIS = 5000
def __init__(self, *args, **kwargs):
self._device = None
self._connection = None
self._control_index = None
self._control_interface = None
self._data_interface = None
self._control_endpoint = None
self._read_endpoint = None
self._write_endpoint = None
self._read_buffer = bytearray()
super(CdcAcmSerial, self).__init__(*args, **kwargs)
def open(self):
"""Open the serial port.
When the serial port is instantiated, it will try to open automatically.
"""
self.close()
device = usb.get_usb_device(self.portstr)
if not device:
raise SerialException("Device not present {}".format(self.portstr))
if not usb.has_usb_permission(device):
usb.request_usb_permission(device)
return
connection = usb.get_usb_manager().openDevice(device)
if not connection:
raise SerialException("Failed to open device!")
self._device = device
self._connection = connection
if self._device.getInterfaceCount() == 1:
# Device might be castrated ACM device. Try single interface logic.
self._open_single_interface()
else:
# Try default interface logic.
self._open_interface()
# Check that all endpoints are good
if None in [self._control_endpoint, self._write_endpoint, self._read_endpoint]:
raise SerialException("Could not establish all endpoints!")
self.is_open = True
self._set_dtr_rts(self._dtr_state, self._rts_state)
self._reconfigure_port()
def _open_single_interface(self):
"""Open single interface device."""
device = self._device
# Claiming control/data interface.
self._control_index = 0
self._control_interface = device.getInterface(0)
self._data_interface = device.getInterface(0)
if not self._connection.claimInterface(self._control_interface, True):
raise SerialException("Could not claim shared control/data interface.")
num_endpoints = self._control_interface.getEndpointCount()
if num_endpoints < 3:
msg = "Not enough endpoints - need 3, got {}".format(num_endpoints)
raise SerialException(msg)
for i in range(num_endpoints):
ep = self._control_interface.getEndpoint(i)
if (ep.getDirection() == usb.UsbConstants.USB_DIR_IN) and (
ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_INT
):
# Found controlling endpoint.
self._control_endpoint = ep
elif (ep.getDirection() == usb.UsbConstants.USB_DIR_IN) and (
ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_BULK
):
# Found reading endpoint.
self._read_endpoint = ep
elif (ep.getDirection() == usb.UsbConstants.USB_DIR_OUT) and (
ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_BULK
):
# Found writing endpoint
self._write_endpoint = ep
if None not in [
self._control_endpoint,
self._write_endpoint,
self._read_endpoint,
]:
# Found all endpoints.
break
def _open_interface(self):
"""Open default interface device."""
device = self._device
for i in range(device.getInterfaceCount()):
interface = device.getInterface(i)
if interface.getInterfaceClass() == usb.UsbConstants.USB_CLASS_COMM:
self._control_index = i
self._control_interface = interface
if interface.getInterfaceClass() == usb.UsbConstants.USB_CLASS_CDC_DATA:
self._data_interface = interface
if self._control_interface is None:
raise SerialException("Could not find control interface.")
if not self._connection.claimInterface(self._control_interface, True):
raise SerialException("Could not claim control interface.")
self._control_endpoint = self._control_interface.getEndpoint(0)
if self._data_interface is None:
raise SerialException("Could not find data interface.")
if not self._connection.claimInterface(self._data_interface, True):
raise SerialException("Could not claim data interface.")
for i in range(self._data_interface.getEndpointCount()):
ep = self._data_interface.getEndpoint(i)
if (
ep.getDirection() == usb.UsbConstants.USB_DIR_IN
and ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_BULK
):
self._read_endpoint = ep
if (
ep.getDirection() == usb.UsbConstants.USB_DIR_OUT
and ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_BULK
):
self._write_endpoint = ep
if None in (self._read_endpoint, self._write_endpoint):
raise SerialException("Could not find read/write endpoint.")
def _reconfigure_port(self):
"""Reconfigure serial port parameters."""
msg = bytearray(
[
self.baudrate & 0xFF,
self.baudrate >> 8 & 0xFF,
self.baudrate >> 16 & 0xFF,
self.baudrate >> 24 & 0xFF,
self.STOPBIT_MAP[self.stopbits],
self.PARITY_MAP[self.parity],
self.bytesize,
]
)
# Set line coding.
self._ctrl_transfer_out(self.SET_LINE_CODING, 0, msg)
def close(self):
"""Close the serial port."""
if self._connection:
self._connection.close()
self._connection = None
self.is_open = False
# - - - - - - - - - - - - - - - - - - - - - - - -
@property
def in_waiting(self):
"""Return the number of bytes currently in the input buffer.
Returns:
Length (int): number of data bytes in the input buffer.
"""
# Read from serial port hardware and put the data into read buffer.
self._read_buffer.extend(self._read())
return len(self._read_buffer)
@property
def out_waiting(self):
"""Return the number of bytes currently in the output buffer.
Always return 0.
"""
return 0
def read(self, size=1):
"""Read data from the serial port.
Parameters:
size (int): the number of data bytes to read.
Returns:
read (bytes): data bytes read from the serial port.
"""
read = bytearray()
timeout = Timeout(self.timeout)
# If there is enough data in the buffer, do not bother to read.
if len(self._read_buffer) < size:
# Keep reading until there is enough data or timeout.
while self.in_waiting < size:
if timeout.expired():
break
# Get data from read buffer.
read = self._read_buffer[:size]
self._read_buffer = self._read_buffer[size:]
return bytes(read)
def write(self, data):
"""Write data to the serial port.
Parameters:
data (bytearray): data written to the serial port.
Returns:
wrote (int): the number of data bytes written.
"""
if not self.is_open:
return None
offset = 0
timeout = int(
self._write_timeout * 1000
if self._write_timeout
else self.USB_WRITE_TIMEOUT_MILLIS
)
wrote = 0
while offset < len(data):
data_length = min(len(data) - offset, self.DEFAULT_WRITE_BUFFER_SIZE)
buf = data[offset : offset + data_length]
i = self._connection.bulkTransfer(
self._write_endpoint, buf, data_length, timeout
)
if i <= 0:
raise SerialException("Failed to write {}: {}".format(buf, i))
offset += data_length
wrote += i
return wrote
def flush(self):
"""Simply wait some time to allow all data to be written."""
pass
def reset_input_buffer(self):
"""Clear input buffer."""
if not self.is_open:
raise PortNotOpenError()
self._purgeHwBuffers(True, False)
def reset_output_buffer(self):
"""Clear output buffer."""
if not self.is_open:
raise PortNotOpenError()
self._purgeHwBuffers(False, True)
def send_break(self, duration=0.25):
"""Send break condition.
Parameters:
duration (float): break time in seconds.
"""
if not self.is_open:
raise PortNotOpenError()
self._ctrl_transfer_out(self.SEND_BREAK, int(duration * 1000))
def _update_break_state(self):
"""Send break condition."""
self._set_break(self._break_state)
def _update_rts_state(self):
"""Set terminal status line: Request To Send."""
self._set_rts(self._rts_state)
def _update_dtr_state(self):
"""Set terminal status line: Data Terminal Ready."""
self._set_dtr(self._dtr_state)
@property
def cts(self):
"""Read terminal status line: Clear To Send."""
status = self._poll_modem_status()
return bool(status)
@property
def dsr(self):
"""Read terminal status line: Data Set Ready."""
status = self._poll_modem_status()
return bool(status)
@property
def ri(self):
"""Read terminal status line: Ring Indicator."""
status = self._poll_modem_status()
return bool(status)
@property
def cd(self):
"""Read terminal status line: Carrier Detect."""
status = self._poll_modem_status()
return bool(status)
# - - - - - - - - - - - - - - - - - - - - - - - -
def _ctrl_transfer_out(self, request, value, buf=None):
"""USB control transfer out.
This function does the USB configuration job.
"""
result = self._connection.controlTransfer(
self.REQTYPE_HOST2DEVICE,
request,
value,
self._control_index,
buf,
(0 if buf is None else len(buf)),
self.USB_WRITE_TIMEOUT_MILLIS,
)
return result
def _ctrl_transfer_in(self, request, value, buf):
"""USB control transfer in.
Request for a control message from the device.
"""
result = self._connection.controlTransfer(
self.REQTYPE_DEVICE2HOST,
request,
value,
self._control_index,
buf,
(0 if buf is None else len(buf)),
self.USB_READ_TIMEOUT_MILLIS,
)
return result
def _set_break(self, break_):
"""Start or stop a break exception event on the serial line.
Parameters:
break_ (bool): either start or stop break event.
"""
pass
def _set_dtr(self, state):
"""Set dtr line.
Parameters:
state (bool): new DTR logical level.
"""
self._set_dtr_rts(state, self._rts_state)
def _set_rts(self, state):
"""Set rts line.
Parameters:
state (bool): new RTS logical level.
"""
self._set_dtr_rts(self._dtr_state, state)
def _set_dtr_rts(self, dtr, rts):
"""Set dtr and rts lines at once.
Parameters:
dtr (bool): new DTR logical level.
rts (bool): new RTS logical level.
"""
value = 0
if dtr:
value |= self.SET_CONTROL_LINE_STATE_DTR
if rts:
value |= self.SET_CONTROL_LINE_STATE_RTS
self._ctrl_transfer_out(self.SET_CONTROL_LINE_STATE, value)
def _purgeHwBuffers(self, purgeReadBuffers, purgeWriteBuffers):
"""Set serial port parameters.
Parameters:
purgeReadBuffers (bool): need to purge read buffer or not.
purgeWriteBuffers (bool): need to purge write buffer or not.
Returns:
result (bool): successful or not.
"""
return True
def _read(self):
"""Hardware dependent read function.
Returns:
read (bytes): data bytes read from the serial port.
"""
if not self.is_open:
raise PortNotOpenError()
if not self._read_endpoint:
raise SerialException("Read endpoint does not exist!")
# Get raw data from hardware.
buf = bytearray(self.DEFAULT_READ_BUFFER_SIZE)
totalBytesRead = self._connection.bulkTransfer(
self._read_endpoint,
buf,
self.DEFAULT_READ_BUFFER_SIZE,
self.USB_READ_TIMEOUT_MILLIS,
)
if totalBytesRead < 0:
# Read timeout. Set totalBytesRead to 0.
totalBytesRead = 0
read = buf[:totalBytesRead]
return bytes(read)
def _poll_modem_status(self):
"""Poll modem status information.
This function allows the retrieve the one status byte of the
device, useful in UART mode.
Returns:
status (int): modem status, as a proprietary bitfield
"""
return 0