kopia lustrzana https://github.com/jacklinquan/usbserial4a
455 wiersze
14 KiB
Python
455 wiersze
14 KiB
Python
"""Android USB serial CDC ACM driver.
|
|
|
|
Classes:
|
|
CdcAcmSerial
|
|
"""
|
|
|
|
from struct import pack, unpack
|
|
import time
|
|
from .utilserial4a import (
|
|
SerialBase,
|
|
SerialException,
|
|
to_bytes,
|
|
PortNotOpenError,
|
|
Timeout,
|
|
)
|
|
from usb4a import usb
|
|
|
|
|
|
class CdcAcmSerial(SerialBase):
|
|
"""CDC ACM serial port class."""
|
|
|
|
REQTYPE_HOST2DEVICE = 0x21
|
|
REQTYPE_DEVICE2HOST = 0xA1
|
|
|
|
SET_LINE_CODING = 0x20
|
|
GET_LINE_CODING = 0x21
|
|
SET_CONTROL_LINE_STATE = 0x22
|
|
SEND_BREAK = 0x23
|
|
|
|
SET_CONTROL_LINE_STATE_DTR = 0x1
|
|
SET_CONTROL_LINE_STATE_RTS = 0x2
|
|
|
|
STOPBIT_MAP = {1: 0, 1.5: 1, 2: 2}
|
|
PARITY_MAP = {"N": 0, "O": 1, "E": 2, "M": 3, "S": 4}
|
|
|
|
DEFAULT_READ_BUFFER_SIZE = 1024
|
|
DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024
|
|
|
|
USB_READ_TIMEOUT_MILLIS = 50
|
|
USB_WRITE_TIMEOUT_MILLIS = 5000
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._device = None
|
|
self._connection = None
|
|
self._control_index = None
|
|
self._control_interface = None
|
|
self._data_interface = None
|
|
self._control_endpoint = None
|
|
self._read_endpoint = None
|
|
self._write_endpoint = None
|
|
self._read_buffer = bytearray()
|
|
super(CdcAcmSerial, self).__init__(*args, **kwargs)
|
|
|
|
def open(self):
|
|
"""Open the serial port.
|
|
|
|
When the serial port is instantiated, it will try to open automatically.
|
|
"""
|
|
self.close()
|
|
|
|
device = usb.get_usb_device(self.portstr)
|
|
if not device:
|
|
raise SerialException("Device not present {}".format(self.portstr))
|
|
|
|
if not usb.has_usb_permission(device):
|
|
usb.request_usb_permission(device)
|
|
return
|
|
|
|
connection = usb.get_usb_manager().openDevice(device)
|
|
if not connection:
|
|
raise SerialException("Failed to open device!")
|
|
|
|
self._device = device
|
|
self._connection = connection
|
|
|
|
if self._device.getInterfaceCount() == 1:
|
|
# Device might be castrated ACM device. Try single interface logic.
|
|
self._open_single_interface()
|
|
else:
|
|
# Try default interface logic.
|
|
self._open_interface()
|
|
|
|
# Check that all endpoints are good
|
|
if None in [self._control_endpoint, self._write_endpoint, self._read_endpoint]:
|
|
raise SerialException("Could not establish all endpoints!")
|
|
|
|
self.is_open = True
|
|
self._set_dtr_rts(self._dtr_state, self._rts_state)
|
|
self._reconfigure_port()
|
|
|
|
def _open_single_interface(self):
|
|
"""Open single interface device."""
|
|
device = self._device
|
|
|
|
# Claiming control/data interface.
|
|
self._control_index = 0
|
|
self._control_interface = device.getInterface(0)
|
|
self._data_interface = device.getInterface(0)
|
|
if not self._connection.claimInterface(self._control_interface, True):
|
|
raise SerialException("Could not claim shared control/data interface.")
|
|
|
|
num_endpoints = self._control_interface.getEndpointCount()
|
|
if num_endpoints < 3:
|
|
msg = "Not enough endpoints - need 3, got {}".format(num_endpoints)
|
|
raise SerialException(msg)
|
|
|
|
for i in range(num_endpoints):
|
|
ep = self._control_interface.getEndpoint(i)
|
|
if (ep.getDirection() == usb.UsbConstants.USB_DIR_IN) and (
|
|
ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_INT
|
|
):
|
|
# Found controlling endpoint.
|
|
self._control_endpoint = ep
|
|
elif (ep.getDirection() == usb.UsbConstants.USB_DIR_IN) and (
|
|
ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_BULK
|
|
):
|
|
# Found reading endpoint.
|
|
self._read_endpoint = ep
|
|
elif (ep.getDirection() == usb.UsbConstants.USB_DIR_OUT) and (
|
|
ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_BULK
|
|
):
|
|
# Found writing endpoint
|
|
self._write_endpoint = ep
|
|
|
|
if None not in [
|
|
self._control_endpoint,
|
|
self._write_endpoint,
|
|
self._read_endpoint,
|
|
]:
|
|
# Found all endpoints.
|
|
break
|
|
|
|
def _open_interface(self):
|
|
"""Open default interface device."""
|
|
device = self._device
|
|
|
|
for i in range(device.getInterfaceCount()):
|
|
interface = device.getInterface(i)
|
|
if interface.getInterfaceClass() == usb.UsbConstants.USB_CLASS_COMM:
|
|
self._control_index = i
|
|
self._control_interface = interface
|
|
if interface.getInterfaceClass() == usb.UsbConstants.USB_CLASS_CDC_DATA:
|
|
self._data_interface = interface
|
|
|
|
if self._control_interface is None:
|
|
raise SerialException("Could not find control interface.")
|
|
if not self._connection.claimInterface(self._control_interface, True):
|
|
raise SerialException("Could not claim control interface.")
|
|
|
|
self._control_endpoint = self._control_interface.getEndpoint(0)
|
|
|
|
if self._data_interface is None:
|
|
raise SerialException("Could not find data interface.")
|
|
if not self._connection.claimInterface(self._data_interface, True):
|
|
raise SerialException("Could not claim data interface.")
|
|
|
|
for i in range(self._data_interface.getEndpointCount()):
|
|
ep = self._data_interface.getEndpoint(i)
|
|
if (
|
|
ep.getDirection() == usb.UsbConstants.USB_DIR_IN
|
|
and ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_BULK
|
|
):
|
|
self._read_endpoint = ep
|
|
if (
|
|
ep.getDirection() == usb.UsbConstants.USB_DIR_OUT
|
|
and ep.getType() == usb.UsbConstants.USB_ENDPOINT_XFER_BULK
|
|
):
|
|
self._write_endpoint = ep
|
|
|
|
if None in (self._read_endpoint, self._write_endpoint):
|
|
raise SerialException("Could not find read/write endpoint.")
|
|
|
|
def _reconfigure_port(self):
|
|
"""Reconfigure serial port parameters."""
|
|
msg = bytearray(
|
|
[
|
|
self.baudrate & 0xFF,
|
|
self.baudrate >> 8 & 0xFF,
|
|
self.baudrate >> 16 & 0xFF,
|
|
self.baudrate >> 24 & 0xFF,
|
|
self.STOPBIT_MAP[self.stopbits],
|
|
self.PARITY_MAP[self.parity],
|
|
self.bytesize,
|
|
]
|
|
)
|
|
# Set line coding.
|
|
self._ctrl_transfer_out(self.SET_LINE_CODING, 0, msg)
|
|
|
|
def close(self):
|
|
"""Close the serial port."""
|
|
if self._connection:
|
|
self._connection.close()
|
|
self._connection = None
|
|
self.is_open = False
|
|
|
|
# - - - - - - - - - - - - - - - - - - - - - - - -
|
|
|
|
@property
|
|
def in_waiting(self):
|
|
"""Return the number of bytes currently in the input buffer.
|
|
|
|
Returns:
|
|
Length (int): number of data bytes in the input buffer.
|
|
"""
|
|
# Read from serial port hardware and put the data into read buffer.
|
|
self._read_buffer.extend(self._read())
|
|
return len(self._read_buffer)
|
|
|
|
@property
|
|
def out_waiting(self):
|
|
"""Return the number of bytes currently in the output buffer.
|
|
|
|
Always return 0.
|
|
"""
|
|
return 0
|
|
|
|
def read(self, size=1):
|
|
"""Read data from the serial port.
|
|
|
|
Parameters:
|
|
size (int): the number of data bytes to read.
|
|
|
|
Returns:
|
|
read (bytes): data bytes read from the serial port.
|
|
"""
|
|
read = bytearray()
|
|
timeout = Timeout(self.timeout)
|
|
|
|
# If there is enough data in the buffer, do not bother to read.
|
|
if len(self._read_buffer) < size:
|
|
# Keep reading until there is enough data or timeout.
|
|
while self.in_waiting < size:
|
|
if timeout.expired():
|
|
break
|
|
|
|
# Get data from read buffer.
|
|
read = self._read_buffer[:size]
|
|
self._read_buffer = self._read_buffer[size:]
|
|
|
|
return bytes(read)
|
|
|
|
def write(self, data):
|
|
"""Write data to the serial port.
|
|
|
|
Parameters:
|
|
data (bytearray): data written to the serial port.
|
|
|
|
Returns:
|
|
wrote (int): the number of data bytes written.
|
|
"""
|
|
if not self.is_open:
|
|
return None
|
|
offset = 0
|
|
timeout = int(
|
|
self._write_timeout * 1000
|
|
if self._write_timeout
|
|
else self.USB_WRITE_TIMEOUT_MILLIS
|
|
)
|
|
wrote = 0
|
|
while offset < len(data):
|
|
data_length = min(len(data) - offset, self.DEFAULT_WRITE_BUFFER_SIZE)
|
|
buf = data[offset : offset + data_length]
|
|
i = self._connection.bulkTransfer(
|
|
self._write_endpoint, buf, data_length, timeout
|
|
)
|
|
if i <= 0:
|
|
raise SerialException("Failed to write {}: {}".format(buf, i))
|
|
offset += data_length
|
|
wrote += i
|
|
return wrote
|
|
|
|
def flush(self):
|
|
"""Simply wait some time to allow all data to be written."""
|
|
pass
|
|
|
|
def reset_input_buffer(self):
|
|
"""Clear input buffer."""
|
|
if not self.is_open:
|
|
raise PortNotOpenError()
|
|
self._purgeHwBuffers(True, False)
|
|
|
|
def reset_output_buffer(self):
|
|
"""Clear output buffer."""
|
|
if not self.is_open:
|
|
raise PortNotOpenError()
|
|
self._purgeHwBuffers(False, True)
|
|
|
|
def send_break(self, duration=0.25):
|
|
"""Send break condition.
|
|
|
|
Parameters:
|
|
duration (float): break time in seconds.
|
|
"""
|
|
if not self.is_open:
|
|
raise PortNotOpenError()
|
|
self._ctrl_transfer_out(self.SEND_BREAK, int(duration * 1000))
|
|
|
|
def _update_break_state(self):
|
|
"""Send break condition."""
|
|
self._set_break(self._break_state)
|
|
|
|
def _update_rts_state(self):
|
|
"""Set terminal status line: Request To Send."""
|
|
self._set_rts(self._rts_state)
|
|
|
|
def _update_dtr_state(self):
|
|
"""Set terminal status line: Data Terminal Ready."""
|
|
self._set_dtr(self._dtr_state)
|
|
|
|
@property
|
|
def cts(self):
|
|
"""Read terminal status line: Clear To Send."""
|
|
status = self._poll_modem_status()
|
|
return bool(status)
|
|
|
|
@property
|
|
def dsr(self):
|
|
"""Read terminal status line: Data Set Ready."""
|
|
status = self._poll_modem_status()
|
|
return bool(status)
|
|
|
|
@property
|
|
def ri(self):
|
|
"""Read terminal status line: Ring Indicator."""
|
|
status = self._poll_modem_status()
|
|
return bool(status)
|
|
|
|
@property
|
|
def cd(self):
|
|
"""Read terminal status line: Carrier Detect."""
|
|
status = self._poll_modem_status()
|
|
return bool(status)
|
|
|
|
# - - - - - - - - - - - - - - - - - - - - - - - -
|
|
|
|
def _ctrl_transfer_out(self, request, value, buf=None):
|
|
"""USB control transfer out.
|
|
|
|
This function does the USB configuration job.
|
|
"""
|
|
result = self._connection.controlTransfer(
|
|
self.REQTYPE_HOST2DEVICE,
|
|
request,
|
|
value,
|
|
self._control_index,
|
|
buf,
|
|
(0 if buf is None else len(buf)),
|
|
self.USB_WRITE_TIMEOUT_MILLIS,
|
|
)
|
|
|
|
return result
|
|
|
|
def _ctrl_transfer_in(self, request, value, buf):
|
|
"""USB control transfer in.
|
|
|
|
Request for a control message from the device.
|
|
"""
|
|
result = self._connection.controlTransfer(
|
|
self.REQTYPE_DEVICE2HOST,
|
|
request,
|
|
value,
|
|
self._control_index,
|
|
buf,
|
|
(0 if buf is None else len(buf)),
|
|
self.USB_READ_TIMEOUT_MILLIS,
|
|
)
|
|
|
|
return result
|
|
|
|
def _set_break(self, break_):
|
|
"""Start or stop a break exception event on the serial line.
|
|
|
|
Parameters:
|
|
break_ (bool): either start or stop break event.
|
|
"""
|
|
pass
|
|
|
|
def _set_dtr(self, state):
|
|
"""Set dtr line.
|
|
|
|
Parameters:
|
|
state (bool): new DTR logical level.
|
|
"""
|
|
self._set_dtr_rts(state, self._rts_state)
|
|
|
|
def _set_rts(self, state):
|
|
"""Set rts line.
|
|
|
|
Parameters:
|
|
state (bool): new RTS logical level.
|
|
"""
|
|
self._set_dtr_rts(self._dtr_state, state)
|
|
|
|
def _set_dtr_rts(self, dtr, rts):
|
|
"""Set dtr and rts lines at once.
|
|
|
|
Parameters:
|
|
dtr (bool): new DTR logical level.
|
|
rts (bool): new RTS logical level.
|
|
"""
|
|
value = 0
|
|
if dtr:
|
|
value |= self.SET_CONTROL_LINE_STATE_DTR
|
|
if rts:
|
|
value |= self.SET_CONTROL_LINE_STATE_RTS
|
|
self._ctrl_transfer_out(self.SET_CONTROL_LINE_STATE, value)
|
|
|
|
def _purgeHwBuffers(self, purgeReadBuffers, purgeWriteBuffers):
|
|
"""Set serial port parameters.
|
|
|
|
Parameters:
|
|
purgeReadBuffers (bool): need to purge read buffer or not.
|
|
purgeWriteBuffers (bool): need to purge write buffer or not.
|
|
Returns:
|
|
result (bool): successful or not.
|
|
"""
|
|
return True
|
|
|
|
def _read(self):
|
|
"""Hardware dependent read function.
|
|
|
|
Returns:
|
|
read (bytes): data bytes read from the serial port.
|
|
"""
|
|
if not self.is_open:
|
|
raise PortNotOpenError()
|
|
if not self._read_endpoint:
|
|
raise SerialException("Read endpoint does not exist!")
|
|
|
|
# Get raw data from hardware.
|
|
buf = bytearray(self.DEFAULT_READ_BUFFER_SIZE)
|
|
totalBytesRead = self._connection.bulkTransfer(
|
|
self._read_endpoint,
|
|
buf,
|
|
self.DEFAULT_READ_BUFFER_SIZE,
|
|
self.USB_READ_TIMEOUT_MILLIS,
|
|
)
|
|
if totalBytesRead < 0:
|
|
# Read timeout. Set totalBytesRead to 0.
|
|
totalBytesRead = 0
|
|
|
|
read = buf[:totalBytesRead]
|
|
return bytes(read)
|
|
|
|
def _poll_modem_status(self):
|
|
"""Poll modem status information.
|
|
|
|
This function allows the retrieve the one status byte of the
|
|
device, useful in UART mode.
|
|
|
|
Returns:
|
|
status (int): modem status, as a proprietary bitfield
|
|
"""
|
|
return 0
|