Porównaj commity

...

2 Commity

Autor SHA1 Wiadomość Data
Angus Gratton 81962f1de7 usb: Add USB device support packages.
These packages build on top of machine.USBDevice() to provide high level
and flexible support for implementing USB devices in Python code.

Additional credits, as per included copyright notices:

- CDC support based on initial implementation by @hoihu with fixes by
  @linted.

- MIDI support based on initial implementation by @paulhamsh.

- HID keypad example based on work by @turmoni.

- Everyone who tested and provided feedback on early versions of these
  packages.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2024-04-16 15:11:41 +10:00
Damien George 45ead11f96 ssl: Use "from tls import *" to be compatible with axtls.
axtls doesn't define all the CERT_xxx constants, nor the MBEDTLS_VERSION
constant.

This change means that `tls.SSLContext` is imported into the module, but
that's subsequently overridden by the class definition in this module.

Signed-off-by: Damien George <damien@micropython.org>
2024-03-28 17:44:37 +11:00
22 zmienionych plików z 2735 dodań i 9 usunięć

Wyświetl plik

@ -0,0 +1,85 @@
# Dynamic USB packages
These packages allow implementing USB functionality on a MicroPython device using pure Python code.
Currently only USB device is implemented, not USB host.
## USB Device support
### Support
USB Device support depends on the low-level [machine.USBDevice](https://docs.micropython.org/en/latest/library/machine.USBDevice.html) class. This class is new and not supported on all ports, so please check the documentation for your MicroPython version. It is possible to implement a USB device using only the low-level USBDevice class. These packages are higher level and easier to use.
For more information about how to install packages, or "freeze" them into a
firmware image, consult the [MicroPython documentation on "Package
management"](https://docs.micropython.org/en/latest/reference/packages.html).
### Examples
The [examples/device](examples/device) directory in this repo has a range of examples. After installing necessary packages, you can download an example and run it with `mpremote run EXAMPLE_FILENAME.py` ([mpremote docs](https://docs.micropython.org/en/latest/reference/mpremote.html#mpremote-command-run)).
#### Unexpected serial disconnects
If you normally connect to your MicroPython device over a USB serial port ("USB CDC"), then running a USB example will disconnect mpremote when the new USB device configuration activates and the serial port has to temporarily disconnect. It is likely that mpremote will print an error. The example should still start running, if necessary then you can reconnect with mpremote and type Ctrl-B to restore the MicroPython REPL and/or Ctrl-C to stop the running example.
If you use `mpremote run` again while a different runtime USB configuration is already active, then the USB serial port may disconnect immediately before the example runs. This is because mpremote has to soft-reset MicroPython, and when the existing USB device is reset then the entire USB port needs to reset. If this happens, run the same `mpremote run` command again.
We plan to add features to `mpremote` so that this limitation is less disruptive. Other tools that communicate with MicroPython over the serial port will encounter similar issues when runtime USB is in use.
### Initialising runtime USB
The overall pattern for enabling USB devices at runtime is:
1. Instantiate the Interface objects for your desired USB device.
2. Call `usb.device.get()` to get the singleton object for the high-level USB device.
3. Call `init(...)` to pass the desired interfaces as arguments, plus any custom
keyword arguments to configure the overall device.
An example, similar to [mouse_example.py](examples/device/mouse_example.py):
```py
m = usb.device.mouse.MouseInterface()
usb.device.get().init(m, builtin_driver=True)
```
Setting `builtin_driver=True` means that any built-in USB serial port will still
be available. Otherwise, you may permanently lose access to MicroPython until
the next time the device resets.
See [Unexpected serial disconnects](#Unexpected-serial-disconnects), above, for
an explanation of possible errors or disconnects when the runtime USB device
initialises.
Placing the call to `usb.device.get().init()` into the `boot.py` of the MicroPython file system allows the runtime USB device to initialise immediately on boot, before any built-in USB. However, note that calling this function on boot without `builtin_driver=True` will make the MicroPython USB serial interface permanently inaccessible until you "safe mode boot" (on supported boards) or completely erase the flash of your device.
### Package usb-device-keyboard
This package provides the `usb.device.keyboard` module. See [keyboard_example.py](examples/device/keyboard_example.py) for an example program.
### Package usb-device-mouse
This package provides the `usb.device.mouse` module. See [mouse_example.py](examples/device/mouse_example.py) for an example program.
### Package usb-device-hid
This package provides the `usb.device.hid` module. USB HID (Human Interface Device) class allows creating a wide variety of device types. The most common are mouse and keyboard, which have their own packages in micropython-lib. However, using the usb-device-hid package directly allows creation of any kind of HID device.
See [hid_custom_keypad_example.py](examples/device/hid_custom_keypad_example.py) for an example of a Keypad HID device with a custom HID descriptor.
### Package usb-device-cdc
This package provides the `usb.device.cdc` module. USB-CDC (Communications Device Class) is most commonly used for virtual serial port USB interfaces, and that is what is supported here.
The example [cdc_repl_example.py](examples/device/cdc_repl_example.py) demonstrates how to add a second USB serial interface and duplicate the MicroPython REPL between the two.
### Package usb-device-midi
This package provides the `usb.device.midi` module. This allows implementing MIDI devices in MicroPython.
The example [midi_example.py](examples/device/midi_example.py) demonstrates how to create a simple MIDI device to send MIDI data to the USB host.
### Package usb-device
This package contains the common implementation components for the other packages, and can be used to create new and different USB device types. All of the other packages depend on this package.
It provides the `usb.device.get()` function for accessing the Device singleton object, and the `usb.device.core` module which contains the low-level classes and utility functions for implementing new USB interface drivers in Python. The best examples of how to use the core classes is the source code of the other USB device packages.

Wyświetl plik

@ -0,0 +1,41 @@
# MicroPython USB CDC REPL example
#
# Example demonstrating how to use os.dupterm() to provide the
# MicroPython REPL on a dynamic CDCInterface() serial port.
#
# Note that if you run this example on the built-in USB CDC port via 'mpremote
# run' then you'll have to reconnect after it re-enumerates, and it may be
# necessary afterward to type Ctrl-B to exit the Raw REPL mode and resume the
# interactive REPL back.
#
# This example uses the usb-device-cdc package for the CDCInterface class.
# This can be installed with:
#
# mpremote mip install usb-device-cdc
#
# MIT license; Copyright (c) 2023-2024 Angus Gratton
import os
import time
import usb.device
from usb.device.cdc import CDCInterface
cdc = CDCInterface()
cdc.init(timeout=0) # zero timeout makes this non-blocking, suitable for os.dupterm()
# pass builtin_driver=True so that we get the built-in USB-CDC alongside,
# if it's available.
usb.device.get().init(cdc, builtin_driver=True)
print("Waiting for USB host to configure the interface...")
# wait for host enumerate as a CDC device...
while not cdc.is_open():
time.sleep_ms(100)
# Note: This example doesn't wait for the host to access the new CDC port,
# which could be done by polling cdc.dtr, as this will block the REPL
# from resuming while this code is still executing.
print("CDC port enumerated, duplicating REPL...")
old_term = os.dupterm(cdc)

Wyświetl plik

@ -0,0 +1,137 @@
# MicroPython USB HID custom Keypad example
#
# This example demonstrates creating a custom HID device with its own
# HID descriptor, in this case for a USB number keypad.
#
# For higher level examples that require less code to use, see mouse_example.py
# and keyboard_example.py
#
# This example uses the usb-device-hid package for the HIDInterface class.
# This can be installed with:
#
# mpremote mip install usb-device-hid
#
# MIT license; Copyright (c) 2023 Dave Wickham, 2023-2024 Angus Gratton
from micropython import const
import time
import usb.device
from usb.device.hid import HIDInterface
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
def keypad_example():
k = KeypadInterface()
usb.device.get().init(k, builtin_driver=True)
while not k.is_open():
time.sleep_ms(100)
while True:
time.sleep(2)
print("Press NumLock...")
k.send_key("<NumLock>")
time.sleep_ms(100)
k.send_key()
time.sleep(1)
# continue
print("Press ...")
for _ in range(3):
time.sleep(0.1)
k.send_key(".")
time.sleep(0.1)
k.send_key()
print("Starting again...")
class KeypadInterface(HIDInterface):
# Very basic synchronous USB keypad HID interface
def __init__(self):
super().__init__(
_KEYPAD_REPORT_DESC,
set_report_buf=bytearray(1),
protocol=_INTERFACE_PROTOCOL_KEYBOARD,
interface_str="MicroPython Keypad",
)
self.numlock = False
def on_set_report(self, report_data, _report_id, _report_type):
report = report_data[0]
b = bool(report & 1)
if b != self.numlock:
print("Numlock: ", b)
self.numlock = b
def send_key(self, key=None):
if key is None:
self.send_report(b"\x00")
else:
self.send_report(_key_to_id(key).to_bytes(1, "big"))
# See HID Usages and Descriptions 1.4, section 10 Keyboard/Keypad Page (0x07)
#
# This keypad example has a contiguous series of keys (KEYPAD_KEY_IDS) starting
# from the NumLock/Clear keypad key (0x53), but you can send any Key IDs from
# the table in the HID Usages specification.
_KEYPAD_KEY_OFFS = const(0x53)
_KEYPAD_KEY_IDS = [
"<NumLock>",
"/",
"*",
"-",
"+",
"<Enter>",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"0",
".",
]
def _key_to_id(key):
# This is a little slower than making a dict for lookup, but uses
# less memory and O(n) can be fast enough when n is small.
return _KEYPAD_KEY_IDS.index(key) + _KEYPAD_KEY_OFFS
# HID Report descriptor for a numeric keypad
#
# fmt: off
_KEYPAD_REPORT_DESC = (
b'\x05\x01' # Usage Page (Generic Desktop)
b'\x09\x07' # Usage (Keypad)
b'\xA1\x01' # Collection (Application)
b'\x05\x07' # Usage Page (Keypad)
b'\x19\x00' # Usage Minimum (0)
b'\x29\xFF' # Usage Maximum (ff)
b'\x15\x00' # Logical Minimum (0)
b'\x25\xFF' # Logical Maximum (ff)
b'\x95\x01' # Report Count (1),
b'\x75\x08' # Report Size (8),
b'\x81\x00' # Input (Data, Array, Absolute)
b'\x05\x08' # Usage page (LEDs)
b'\x19\x01' # Usage Minimum (1)
b'\x29\x01' # Usage Maximum (1),
b'\x95\x01' # Report Count (1),
b'\x75\x01' # Report Size (1),
b'\x91\x02' # Output (Data, Variable, Absolute)
b'\x95\x01' # Report Count (1),
b'\x75\x07' # Report Size (7),
b'\x91\x01' # Output (Constant) - padding bits
b'\xC0' # End Collection
)
# fmt: on
keypad_example()

Wyświetl plik

@ -0,0 +1,80 @@
# MicroPython USB Keyboard example
#
# This example uses the usb-device-keyboard package for the KeyboardInterface class.
# This can be installed with:
#
# mpremote mip install usb-device-keyboard
#
# To implement a keyboard with different USB HID characteristics, copy the
# usb-device-keyboard/usb/device/keyboard.py file into your own project and modify
# KeyboardInterface.
#
# MIT license; Copyright (c) 2024 Angus Gratton
import usb.device
from usb.device.keyboard import KeyboardInterface, KeyCode, LEDCode
from machine import Pin
import time
# Tuples mapping Pin inputs to the KeyCode each input generates
#
# (Big keyboards usually multiplex multiple keys per input with a scan matrix,
# but this is a simple example.)
KEYS = (
(Pin.cpu.GPIO10, KeyCode.CAPS_LOCK),
(Pin.cpu.GPIO11, KeyCode.LEFT_SHIFT),
(Pin.cpu.GPIO12, KeyCode.M),
(Pin.cpu.GPIO13, KeyCode.P),
# ... add more pin to KeyCode mappings here if needed
)
# Tuples mapping Pin outputs to the LEDCode that turns the output on
LEDS = (
(Pin.board.LED, LEDCode.CAPS_LOCK),
# ... add more pin to LEDCode mappings here if needed
)
class ExampleKeyboard(KeyboardInterface):
def on_led_update(self, led_mask):
# print(hex(led_mask))
for pin, code in LEDS:
# Set the pin high if 'code' bit is set in led_mask
pin(code & led_mask)
def keyboard_example():
# Initialise all the pins as active-low inputs with pullup resistors
for pin, _ in KEYS:
pin.init(Pin.IN, Pin.PULL_UP)
# Initialise all the LEDs as active-high outputs
for pin, _ in LEDS:
pin.init(Pin.OUT, value=0)
# Register the keyboard interface and re-enumerate
k = ExampleKeyboard()
usb.device.get().init(k, builtin_driver=True)
print("Entering keyboard loop...")
keys = [] # Keys held down, reuse the same list object
prev_keys = [None] # Previous keys, starts with a dummy value so first
# iteration will always send
while True:
if k.is_open():
keys.clear()
for pin, code in KEYS:
if not pin(): # active-low
keys.append(code)
if keys != prev_keys:
# print(keys)
k.send_keys(keys)
prev_keys.clear()
prev_keys.extend(keys)
# This simple example scans each input in an infinite loop, but a more
# complex implementation would probably use a timer or similar.
time.sleep_ms(1)
keyboard_example()

Wyświetl plik

@ -0,0 +1,70 @@
# MicroPython USB MIDI example
#
# This example demonstrates creating a custom MIDI device.
#
# This example uses the usb-device-midi package for the MIDIInterface class.
# This can be installed with:
#
# mpremote mip install usb-device-midi
#
# MIT license; Copyright (c) 2023-2024 Angus Gratton
import usb.device
from usb.device.midi import MIDIInterface
import time
class MIDIExample(MIDIInterface):
# Very simple example event handler functions, showing how to receive note
# and control change messages sent from the host to the device.
#
# If you need to send MIDI data to the host, then it's fine to instantiate
# MIDIInterface class directly.
def on_open(self):
super().on_open()
print("Device opened by host")
def on_note_on(self, channel, pitch, vel):
print(f"RX Note On channel {channel} pitch {pitch} velocity {vel}")
def on_note_off(self, channel, pitch, vel):
print(f"RX Note Off channel {channel} pitch {pitch} velocity {vel}")
def on_control_change(self, channel, controller, value):
print(f"RX Control channel {channel} controller {controller} value {value}")
m = MIDIExample()
# Remove builtin_driver=True if you don't want the MicroPython serial REPL available.
usb.device.get().init(m, builtin_driver=True)
print("Waiting for USB host to configure the interface...")
while not m.is_open():
time.sleep_ms(100)
print("Starting MIDI loop...")
# TX constants
CHANNEL = 0
PITCH = 60
CONTROLLER = 64
control_val = 0
while m.is_open():
time.sleep(1)
print(f"TX Note On channel {CHANNEL} pitch {PITCH}")
m.note_on(CHANNEL, PITCH) # Velocity is an optional third argument
time.sleep(0.5)
print(f"TX Note Off channel {CHANNEL} pitch {PITCH}")
m.note_off(CHANNEL, PITCH)
time.sleep(1)
print(f"TX Control channel {CHANNEL} controller {CONTROLLER} value {control_val}")
m.control_change(CHANNEL, CONTROLLER, control_val)
control_val += 1
if control_val == 0x7F:
control_val = 0
time.sleep(1)
print("USB host has reset device, example done.")

Wyświetl plik

@ -0,0 +1,44 @@
# MicroPython USB Mouse example
#
# This example uses the usb-device-mouse package for the MouseInterface class.
# This can be installed with:
#
# mpremote mip install usb-device-mouse
#
# To implement a more complex mouse, copy the
# usb-device-mouse/usb/device/mouse.py file into your own project and modify
# MouseInterface.
#
# MIT license; Copyright (c) 2023-2024 Angus Gratton
import time
import usb.device
from usb.device.mouse import MouseInterface
def mouse_example():
m = MouseInterface()
# Note: builtin_driver=True means that if there's a USB-CDC REPL
# available then it will appear as well as the HID device.
usb.device.get().init(m, builtin_driver=True)
# wait for host to enumerate as a HID device...
while not m.is_open():
time.sleep_ms(100)
time.sleep_ms(2000)
print("Moving...")
m.move_by(-100, 0)
m.move_by(-100, 0)
time.sleep_ms(500)
print("Clicking...")
m.click_right(True)
time.sleep_ms(200)
m.click_right(False)
print("Done!")
mouse_example()

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device")
package("usb")

Wyświetl plik

@ -0,0 +1,437 @@
# MicroPython USB CDC module
# MIT license; Copyright (c) 2022 Martin Fischer, 2023-2024 Angus Gratton
import io
import time
import errno
import machine
import struct
from micropython import const
from .core import Interface, Buffer, split_bmRequestType
_EP_IN_FLAG = const(1 << 7)
# Control transfer stages
_STAGE_IDLE = const(0)
_STAGE_SETUP = const(1)
_STAGE_DATA = const(2)
_STAGE_ACK = const(3)
# Request types
_REQ_TYPE_STANDARD = const(0x0)
_REQ_TYPE_CLASS = const(0x1)
_REQ_TYPE_VENDOR = const(0x2)
_REQ_TYPE_RESERVED = const(0x3)
_DEV_CLASS_MISC = const(0xEF)
_CS_DESC_TYPE = const(0x24) # CS Interface type communication descriptor
# CDC control interface definitions
_INTERFACE_CLASS_CDC = const(2)
_INTERFACE_SUBCLASS_CDC = const(2) # Abstract Control Mode
_PROTOCOL_NONE = const(0) # no protocol
# CDC descriptor subtype
# see also CDC120.pdf, table 13
_CDC_FUNC_DESC_HEADER = const(0)
_CDC_FUNC_DESC_CALL_MANAGEMENT = const(1)
_CDC_FUNC_DESC_ABSTRACT_CONTROL = const(2)
_CDC_FUNC_DESC_UNION = const(6)
# CDC class requests, table 13, PSTN subclass
_SET_LINE_CODING_REQ = const(0x20)
_GET_LINE_CODING_REQ = const(0x21)
_SET_CONTROL_LINE_STATE = const(0x22)
_SEND_BREAK_REQ = const(0x23)
_LINE_CODING_STOP_BIT_1 = const(0)
_LINE_CODING_STOP_BIT_1_5 = const(1)
_LINE_CODING_STOP_BIT_2 = const(2)
_LINE_CODING_PARITY_NONE = const(0)
_LINE_CODING_PARITY_ODD = const(1)
_LINE_CODING_PARITY_EVEN = const(2)
_LINE_CODING_PARITY_MARK = const(3)
_LINE_CODING_PARITY_SPACE = const(4)
_LINE_STATE_DTR = const(1)
_LINE_STATE_RTS = const(2)
_PARITY_BITS_REPR = "NOEMS"
_STOP_BITS_REPR = ("1", "1.5", "2")
# Other definitions
_CDC_VERSION = const(0x0120) # release number in binary-coded decimal
# Number of endpoints in each interface
_CDC_CONTROL_EP_NUM = const(1)
_CDC_DATA_EP_NUM = const(2)
# CDC data interface definitions
_CDC_ITF_DATA_CLASS = const(0xA)
_CDC_ITF_DATA_SUBCLASS = const(0)
_CDC_ITF_DATA_PROT = const(0) # no protocol
# Length of the bulk transfer endpoints. Maybe should be configurable?
_BULK_EP_LEN = const(64)
# MicroPython error constants (negated as IOBase.ioctl uses negative return values for error codes)
# these must match values in py/mperrno.h
_MP_EINVAL = const(-22)
_MP_ETIMEDOUT = const(-110)
# MicroPython stream ioctl requests, same as py/stream.h
_MP_STREAM_FLUSH = const(1)
_MP_STREAM_POLL = const(3)
# MicroPython ioctl poll values, same as py/stream.h
_MP_STREAM_POLL_WR = const(0x04)
_MP_STREAM_POLL_RD = const(0x01)
_MP_STREAM_POLL_HUP = const(0x10)
class CDCInterface(io.IOBase, Interface):
# USB CDC serial device class, designed to resemble machine.UART
# with some additional methods.
#
# Relies on multiple inheritance so it can be an io.IOBase for stream
# functions and also a Interface (actually an Interface Association
# Descriptor holding two interfaces.)
def __init__(self, **kwargs):
# io.IOBase has no __init__()
Interface.__init__(self)
# Callbacks for particular control changes initiated by the host
self.break_cb = None # Host sent a "break" condition
self.line_state_cb = None
self.line_coding_cb = None
self._line_state = 0 # DTR & RTS
# Set a default line coding of 115200/8N1
self._line_coding = bytearray(b"\x00\xc2\x01\x00\x00\x00\x08")
self._wb = () # Optional write Buffer (IN endpoint), set by CDC.init()
self._rb = () # Optional read Buffer (OUT endpoint), set by CDC.init()
self._timeout = 1000 # set from CDC.init() as well
# one control interface endpoint, two data interface endpoints
self.ep_c_in = self.ep_d_in = self.ep_d_out = None
self._c_itf = None # Number of control interface, data interface is one more
self.init(**kwargs)
def init(
self, baudrate=9600, bits=8, parity="N", stop=1, timeout=None, txbuf=256, rxbuf=256, flow=0
):
# Configure the CDC serial port. Note that many of these settings like
# baudrate, bits, parity, stop don't change the USB-CDC device behavior
# at all, only the "line coding" as communicated from/to the USB host.
# Store initial line coding parameters in the USB CDC binary format
# (there is nothing implemented to further change these from Python
# code, the USB host sets them.)
struct.pack_into(
"<LBBB",
self._line_coding,
0,
baudrate,
_STOP_BITS_REPR.index(str(stop)),
_PARITY_BITS_REPR.index(parity),
bits,
)
if flow != 0:
raise NotImplementedError # UART flow control currently not supported
if not (txbuf and rxbuf):
raise ValueError # Buffer sizes are required
self._timeout = timeout
self._wb = Buffer(txbuf)
self._rb = Buffer(rxbuf)
###
### Line State & Line Coding State property getters
###
@property
def rts(self):
return bool(self._line_state & _LINE_STATE_RTS)
@property
def dtr(self):
return bool(self._line_state & _LINE_STATE_DTR)
# Line Coding Representation
# Byte 0-3 Byte 4 Byte 5 Byte 6
# dwDTERate bCharFormat bParityType bDataBits
@property
def baudrate(self):
return struct.unpack("<LBBB", self._line_coding)[0]
@property
def stop_bits(self):
return _STOP_BITS_REPR[self._line_coding[4]]
@property
def parity(self):
return _PARITY_BITS_REPR[self._line_coding[5]]
@property
def data_bits(self):
return self._line_coding[6]
def __repr__(self):
return f"{self.baudrate}/{self.data_bits}{self.parity}{self.stop_bits} rts={self.rts} dtr={self.dtr}"
###
### Set callbacks for operations initiated by the host
###
def set_break_cb(self, cb):
self.break_cb = cb
def set_line_state_cb(self, cb):
self.line_state_cb = cb
def set_line_coding_cb(self, cb):
self.line_coding_cb = cb
###
### USB Interface Implementation
###
def desc_cfg(self, desc, itf_num, ep_num, strs):
# CDC needs a Interface Association Descriptor (IAD) wrapping two interfaces: Control & Data interfaces
desc.interface_assoc(itf_num, 2, _INTERFACE_CLASS_CDC, _INTERFACE_SUBCLASS_CDC)
# Now add the Control interface descriptor
self._c_itf = itf_num
desc.interface(itf_num, _CDC_CONTROL_EP_NUM, _INTERFACE_CLASS_CDC, _INTERFACE_SUBCLASS_CDC)
# Append the CDC class-specific interface descriptor
# see CDC120-track, p20
desc.pack(
"<BBBH",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_HEADER, # bDescriptorSubtype
_CDC_VERSION, # cdc version
)
# CDC-PSTN table3 "Call Management"
# set to No
desc.pack(
"<BBBBB",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_CALL_MANAGEMENT, # bDescriptorSubtype
0, # bmCapabilities - XXX no call managment so far
itf_num + 1, # bDataInterface - interface 1
)
# CDC-PSTN table4 "Abstract Control"
# set to support line_coding and send_break
desc.pack(
"<BBBB",
4, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_ABSTRACT_CONTROL, # bDescriptorSubtype
0x6, # bmCapabilities D1, D2
)
# CDC-PSTN "Union"
# set control interface / data interface number
desc.pack(
"<BBBBB",
5, # bFunctionLength
_CS_DESC_TYPE, # bDescriptorType
_CDC_FUNC_DESC_UNION, # bDescriptorSubtype
itf_num, # bControlInterface
itf_num + 1, # bSubordinateInterface0 (data class itf number)
)
# Single control IN endpoint (currently unused in this implementation)
self.ep_c_in = ep_num | _EP_IN_FLAG
desc.endpoint(self.ep_c_in, "interrupt", 8, 16)
# Now add the data interface
desc.interface(
itf_num + 1,
_CDC_DATA_EP_NUM,
_CDC_ITF_DATA_CLASS,
_CDC_ITF_DATA_SUBCLASS,
_CDC_ITF_DATA_PROT,
)
# Two data endpoints, bulk OUT and IN
self.ep_d_out = ep_num + 1
self.ep_d_in = (ep_num + 1) | _EP_IN_FLAG
desc.endpoint(self.ep_d_out, "bulk", _BULK_EP_LEN, 0)
desc.endpoint(self.ep_d_in, "bulk", _BULK_EP_LEN, 0)
def num_itfs(self):
return 2
def num_eps(self):
return 2 # total after masking out _EP_IN_FLAG
def on_open(self):
super().on_open()
# kick off any transfers that may have queued while the device was not open
self._rd_xfer()
self._wr_xfer()
def on_interface_control_xfer(self, stage, request):
# Handle class-specific interface control transfers
bmRequestType, bRequest, wValue, wIndex, wLength = struct.unpack("BBHHH", request)
recipient, req_type, req_dir = split_bmRequestType(bmRequestType)
if wIndex != self._c_itf:
return False # Only for the control interface (may be redundant check?)
if req_type != _REQ_TYPE_CLASS:
return False # Unsupported request type
if stage == _STAGE_SETUP:
if bRequest in (_SET_LINE_CODING_REQ, _GET_LINE_CODING_REQ):
return self._line_coding # Buffer to read or write
# Continue on other supported requests, stall otherwise
return bRequest in (_SET_CONTROL_LINE_STATE, _SEND_BREAK_REQ)
if stage == _STAGE_ACK:
if bRequest == _SET_LINE_CODING_REQ:
if self.line_coding_cb:
self.line_coding_cb(self._line_coding)
elif bRequest == _SET_CONTROL_LINE_STATE:
self._line_state = wValue
if self.line_state_cb:
self.line_state_cb(wValue)
elif bRequest == _SEND_BREAK_REQ:
if self.break_cb:
self.break_cb(wValue)
return True # allow DATA/ACK stages to complete normally
def _wr_xfer(self):
# Submit a new data IN transfer from the _wb buffer, if needed
if self.is_open() and not self.xfer_pending(self.ep_d_in) and self._wb.readable():
self.submit_xfer(self.ep_d_in, self._wb.pend_read(), self._wr_cb)
def _wr_cb(self, ep, res, num_bytes):
# Whenever a data IN transfer ends
if res == 0:
self._wb.finish_read(num_bytes)
self._wr_xfer()
def _rd_xfer(self):
# Keep an active data OUT transfer to read data from the host,
# whenever the receive buffer has room for new data
if self.is_open() and not self.xfer_pending(self.ep_d_out) and self._rb.writable():
# Can only submit up to the endpoint length per transaction, otherwise we won't
# get any transfer callback until the full transaction completes.
self.submit_xfer(self.ep_d_out, self._rb.pend_write(_BULK_EP_LEN), self._rd_cb)
def _rd_cb(self, ep, res, num_bytes):
# Whenever a data OUT transfer ends
if res == 0:
self._rb.finish_write(num_bytes)
self._rd_xfer()
###
### io.IOBase stream implementation
###
def write(self, buf):
# use a memoryview to track how much of 'buf' we've written so far
# (unfortunately, this means a 1 block allocation for each write, but it's otherwise allocation free.)
start = time.ticks_ms()
mv = memoryview(buf)
while True:
# Keep pushing buf into _wb into it's all gone
nbytes = self._wb.write(mv)
self._wr_xfer() # make sure a transfer is running from _wb
if nbytes == len(mv):
return len(buf) # Success
mv = mv[nbytes:]
# check for timeout
if time.ticks_diff(time.ticks_ms(), start) >= self._timeout:
return len(buf) - len(mv)
machine.idle()
def read(self, size):
start = time.ticks_ms()
# Allocate a suitable buffer to read into
if size >= 0:
b = bytearray(size)
else:
# for size == -1, return however many bytes are ready
b = bytearray(self._rb.readable())
n = self._readinto(b, start)
if not n:
return None
if n < len(b):
return b[:n]
return b
def readinto(self, b):
return self._readinto(b, time.ticks_ms())
def _readinto(self, b, start):
if len(b) == 0:
return 0
n = 0
m = memoryview(b)
while n < len(b):
# copy out of the read buffer if there is anything available
if self._rb.readable():
n += self._rb.readinto(m if n == 0 else m[n:])
self._rd_xfer() # if _rd was previously full, no transfer will be running
if n == len(b):
break # Done, exit before we call machine.idle()
if time.ticks_diff(time.ticks_ms(), start) >= self._timeout:
break # Timed out
machine.idle()
return n or None
def ioctl(self, req, arg):
if req == _MP_STREAM_POLL:
return (
(_MP_STREAM_POLL_WR if (arg & _MP_STREAM_POLL_WR) and self._wb.writable() else 0)
| (_MP_STREAM_POLL_RD if (arg & _MP_STREAM_POLL_RD) and self._rb.readable() else 0)
|
# using the USB level "open" (i.e. connected to host) for !HUP, not !DTR (port is open)
(_MP_STREAM_POLL_HUP if (arg & _MP_STREAM_POLL_HUP) and not self.is_open() else 0)
)
elif req == _MP_STREAM_FLUSH:
start = time.ticks_ms()
# Wait until write buffer contains no bytes for the lower TinyUSB layer to "read"
while self._wb.readable():
if not self.is_open():
return _MP_EINVAL
if time.ticks_diff(time.ticks_ms(), start) > self._timeout:
return _MP_ETIMEDOUT
machine.idle()
return 0
return _MP_EINVAL
def flush(self):
# a C implementation of this exists in stream.c, but it's not in io.IOBase
# and can't immediately be called from here (AFAIK)
r = self.ioctl(_MP_STREAM_FLUSH, 0)
if r:
raise OSError(r)

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device")
package("usb")

Wyświetl plik

@ -0,0 +1,232 @@
# MicroPython USB hid module
#
# This implements a base HIDInterface class that can be used directly,
# or subclassed into more specific HID interface types.
#
# MIT license; Copyright (c) 2023 Angus Gratton
from micropython import const
import machine
import struct
import time
from .core import Interface, Descriptor, split_bmRequestType
_EP_IN_FLAG = const(1 << 7)
# Control transfer stages
_STAGE_IDLE = const(0)
_STAGE_SETUP = const(1)
_STAGE_DATA = const(2)
_STAGE_ACK = const(3)
# Request types
_REQ_TYPE_STANDARD = const(0x0)
_REQ_TYPE_CLASS = const(0x1)
_REQ_TYPE_VENDOR = const(0x2)
_REQ_TYPE_RESERVED = const(0x3)
# Descriptor types
_DESC_HID_TYPE = const(0x21)
_DESC_REPORT_TYPE = const(0x22)
_DESC_PHYSICAL_TYPE = const(0x23)
# Interface and protocol identifiers
_INTERFACE_CLASS = const(0x03)
_INTERFACE_SUBCLASS_NONE = const(0x00)
_INTERFACE_SUBCLASS_BOOT = const(0x01)
_INTERFACE_PROTOCOL_NONE = const(0x00)
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
_INTERFACE_PROTOCOL_MOUSE = const(0x02)
# bRequest values for HID control requests
_REQ_CONTROL_GET_REPORT = const(0x01)
_REQ_CONTROL_GET_IDLE = const(0x02)
_REQ_CONTROL_GET_PROTOCOL = const(0x03)
_REQ_CONTROL_GET_DESCRIPTOR = const(0x06)
_REQ_CONTROL_SET_REPORT = const(0x09)
_REQ_CONTROL_SET_IDLE = const(0x0A)
_REQ_CONTROL_SET_PROTOCOL = const(0x0B)
# Standard descriptor lengths
_STD_DESC_INTERFACE_LEN = const(9)
_STD_DESC_ENDPOINT_LEN = const(7)
class HIDInterface(Interface):
# Abstract base class to implement a USB device HID interface in Python.
def __init__(
self,
report_descriptor,
extra_descriptors=[],
set_report_buf=None,
protocol=_INTERFACE_PROTOCOL_NONE,
interface_str=None,
):
# Construct a new HID interface.
#
# - report_descriptor is the only mandatory argument, which is the binary
# data consisting of the HID Report Descriptor. See Device Class
# Definition for Human Interface Devices (HID) v1.11 section 6.2.2 Report
# Descriptor, p23.
#
# - extra_descriptors is an optional argument holding additional HID
# descriptors, to append after the mandatory report descriptor. Most
# HID devices do not use these.
#
# - set_report_buf is an optional writable buffer object (i.e.
# bytearray), where SET_REPORT requests from the host can be
# written. Only necessary if the report_descriptor contains Output
# entries. If set, the size must be at least the size of the largest
# Output entry.
#
# - protocol can be set to a specific value as per HID v1.11 section 4.3 Protocols, p9.
#
# - interface_str is an optional string descriptor to associate with the HID USB interface.
super().__init__()
self.report_descriptor = report_descriptor
self.extra_descriptors = extra_descriptors
self._set_report_buf = set_report_buf
self.protocol = protocol
self.interface_str = interface_str
self._int_ep = None # set during enumeration
def get_report(self):
return False
def on_set_report(self, report_data, report_id, report_type):
# Override this function in order to handle SET REPORT requests from the host,
# where it sends data to the HID device.
#
# This function will only be called if the Report descriptor contains at least one Output entry,
# and the set_report_buf argument is provided to the constructor.
#
# Return True to complete the control transfer normally, False to abort it.
return True
def busy(self):
# Returns True if the interrupt endpoint is busy (i.e. existing transfer is pending)
return self.is_open() and self.xfer_pending(self._int_ep)
def send_report(self, report_data, timeout_ms=100):
# Helper function to send a HID report in the typical USB interrupt
# endpoint associated with a HID interface.
#
# Returns True if successful, False if HID device is not active or timeout
# is reached without being able to queue the report for sending.
deadline = time.ticks_add(time.ticks_ms(), timeout_ms)
while self.busy():
if time.ticks_diff(deadline, time.ticks_ms()) <= 0:
return False
machine.idle()
if not self.is_open():
return False
self.submit_xfer(self._int_ep, report_data)
def desc_cfg(self, desc, itf_num, ep_num, strs):
# Add the standard interface descriptor
desc.interface(
itf_num,
1,
_INTERFACE_CLASS,
_INTERFACE_SUBCLASS_NONE,
self.protocol,
len(strs) if self.interface_str else 0,
)
if self.interface_str:
strs.append(self.interface_str)
# As per HID v1.11 section 7.1 Standard Requests, return the contents of
# the standard HID descriptor before the associated endpoint descriptor.
self.get_hid_descriptor(desc)
# Add the typical single USB interrupt endpoint descriptor associated
# with a HID interface.
self._int_ep = ep_num | _EP_IN_FLAG
desc.endpoint(self._int_ep, "interrupt", 8, 8)
self.idle_rate = 0
self.protocol = 0
def num_eps(self):
return 1
def get_hid_descriptor(self, desc=None):
# Append a full USB HID descriptor from the object's report descriptor
# and optional additional descriptors.
#
# See HID Specification Version 1.1, Section 6.2.1 HID Descriptor p22
l = 9 + 3 * len(self.extra_descriptors) # total length
if desc is None:
desc = Descriptor(bytearray(l))
desc.pack(
"<BBHBBBH",
l, # bLength
_DESC_HID_TYPE, # bDescriptorType
0x111, # bcdHID
0, # bCountryCode
len(self.extra_descriptors) + 1, # bNumDescriptors
0x22, # bDescriptorType, Report
len(self.report_descriptor), # wDescriptorLength, Report
)
# Fill in any additional descriptor type/length pairs
#
# TODO: unclear if this functionality is ever used, may be easier to not
# support in base class
for dt, dd in self.extra_descriptors:
desc.pack("<BH", dt, len(dd))
return desc.b
def on_interface_control_xfer(self, stage, request):
# Handle standard and class-specific interface control transfers for HID devices.
bmRequestType, bRequest, wValue, _, wLength = struct.unpack("BBHHH", request)
recipient, req_type, _ = split_bmRequestType(bmRequestType)
if stage == _STAGE_SETUP:
if req_type == _REQ_TYPE_STANDARD:
# HID Spec p48: 7.1 Standard Requests
if bRequest == _REQ_CONTROL_GET_DESCRIPTOR:
desc_type = wValue >> 8
if desc_type == _DESC_HID_TYPE:
return self.get_hid_descriptor()
if desc_type == _DESC_REPORT_TYPE:
return self.report_descriptor
elif req_type == _REQ_TYPE_CLASS:
# HID Spec p50: 7.2 Class-Specific Requests
if bRequest == _REQ_CONTROL_GET_REPORT:
print("GET_REPORT?")
return False # Unsupported for now
if bRequest == _REQ_CONTROL_GET_IDLE:
return bytes([self.idle_rate])
if bRequest == _REQ_CONTROL_GET_PROTOCOL:
return bytes([self.protocol])
if bRequest in (_REQ_CONTROL_SET_IDLE, _REQ_CONTROL_SET_PROTOCOL):
return True
if bRequest == _REQ_CONTROL_SET_REPORT:
return self._set_report_buf # If None, request will stall
return False # Unsupported request
if stage == _STAGE_ACK:
if req_type == _REQ_TYPE_CLASS:
if bRequest == _REQ_CONTROL_SET_IDLE:
self.idle_rate = wValue >> 8
elif bRequest == _REQ_CONTROL_SET_PROTOCOL:
self.protocol = wValue
elif bRequest == _REQ_CONTROL_SET_REPORT:
report_id = wValue & 0xFF
report_type = wValue >> 8
report_data = self._set_report_buf
if wLength < len(report_data):
# need to truncate the response in the callback if we got less bytes
# than allowed for in the buffer
report_data = memoryview(self._set_report_buf)[:wLength]
self.on_set_report(report_data, report_id, report_type)
return True # allow DATA/ACK stages to complete normally

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device-hid")
package("usb")

Wyświetl plik

@ -0,0 +1,233 @@
# MIT license; Copyright (c) 2023-2024 Angus Gratton
from micropython import const
import time
import usb.device
from usb.device.hid import HIDInterface
_INTERFACE_PROTOCOL_KEYBOARD = const(0x01)
_KEY_ARRAY_LEN = const(6) # Size of HID key array, must match report descriptor
_KEY_REPORT_LEN = const(_KEY_ARRAY_LEN + 2) # Modifier Byte + Reserved Byte + Array entries
class KeyboardInterface(HIDInterface):
# Synchronous USB keyboard HID interface
def __init__(self):
super().__init__(
_KEYBOARD_REPORT_DESC,
set_report_buf=bytearray(1),
protocol=_INTERFACE_PROTOCOL_KEYBOARD,
interface_str="MicroPython Keyboard",
)
self._key_reports = [
bytearray(_KEY_REPORT_LEN),
bytearray(_KEY_REPORT_LEN),
] # Ping/pong report buffers
self.numlock = False
def on_set_report(self, report_data, _report_id, _report_type):
self.on_led_update(report_data[0])
def on_led_update(self, led_mask):
# Override to handle keyboard LED updates. led_mask is bitwise ORed
# together values as defined in LEDCode.
pass
def send_keys(self, down_keys, timeout_ms=100):
# Update the state of the keyboard by sending a report with down_keys
# set, where down_keys is an iterable (list or similar) of integer
# values such as the values defined in KeyCode.
#
# Will block for up to timeout_ms if a previous report is still
# pending to be sent to the host. Returns True on success.
r, s = self._key_reports # next report buffer to send, spare report buffer
r[0] = 0 # modifier byte
i = 2 # index for next key array item to write to
for k in down_keys:
if k < 0: # Modifier key
r[0] |= -k
elif i < _KEY_REPORT_LEN:
r[i] = k
i += 1
else: # Excess rollover! Can't report
r[0] = 0
for i in range(2, _KEY_REPORT_LEN):
r[i] = 0xFF
break
while i < _KEY_REPORT_LEN:
r[i] = 0
i += 1
if self.send_report(r, timeout_ms):
# Swap buffers if the previous one is newly queued to send, so
# any subsequent call can't modify that buffer mid-send
self._key_reports[0] = s
self._key_reports[1] = r
return True
return False
# HID keyboard report descriptor
#
# From p69 of http://www.usb.org/developers/devclass_docs/HID1_11.pdf
#
# fmt: off
_KEYBOARD_REPORT_DESC = (
b'\x05\x01' # Usage Page (Generic Desktop),
b'\x09\x06' # Usage (Keyboard),
b'\xA1\x01' # Collection (Application),
b'\x05\x07' # Usage Page (Key Codes);
b'\x19\xE0' # Usage Minimum (224),
b'\x29\xE7' # Usage Maximum (231),
b'\x15\x00' # Logical Minimum (0),
b'\x25\x01' # Logical Maximum (1),
b'\x75\x01' # Report Size (1),
b'\x95\x08' # Report Count (8),
b'\x81\x02' # Input (Data, Variable, Absolute), ;Modifier byte
b'\x95\x01' # Report Count (1),
b'\x75\x08' # Report Size (8),
b'\x81\x01' # Input (Constant), ;Reserved byte
b'\x95\x05' # Report Count (5),
b'\x75\x01' # Report Size (1),
b'\x05\x08' # Usage Page (Page# for LEDs),
b'\x19\x01' # Usage Minimum (1),
b'\x29\x05' # Usage Maximum (5),
b'\x91\x02' # Output (Data, Variable, Absolute), ;LED report
b'\x95\x01' # Report Count (1),
b'\x75\x03' # Report Size (3),
b'\x91\x01' # Output (Constant), ;LED report padding
b'\x95\x06' # Report Count (6),
b'\x75\x08' # Report Size (8),
b'\x15\x00' # Logical Minimum (0),
b'\x25\x65' # Logical Maximum(101),
b'\x05\x07' # Usage Page (Key Codes),
b'\x19\x00' # Usage Minimum (0),
b'\x29\x65' # Usage Maximum (101),
b'\x81\x00' # Input (Data, Array), ;Key arrays (6 bytes)
b'\xC0' # End Collection
)
# fmt: on
# Standard HID keycodes, as a pseudo-enum class for easy access
#
# Modifier keys are encoded as negative values
class KeyCode:
A = 4
B = 5
C = 6
D = 7
E = 8
F = 9
G = 10
H = 11
I = 12
J = 13
K = 14
L = 15
M = 16
N = 17
O = 18
P = 19
Q = 20
R = 21
S = 22
T = 23
U = 24
V = 25
W = 26
X = 27
Y = 28
Z = 29
N1 = 30 # Standard number row keys
N2 = 31
N3 = 32
N4 = 33
N5 = 34
N6 = 35
N7 = 36
N8 = 37
N9 = 38
N0 = 39
ENTER = 40
ESCAPE = 41
BACKSPACE = 42
TAB = 43
SPACE = 44
MINUS = 45 # - _
EQUAL = 46 # = +
OPEN_BRACKET = 47 # [ {
CLOSE_BRACKET = 48 # ] }
BACKSLASH = 49 # \ |
HASH = 50 # # ~
COLON = 51 # ; :
QUOTE = 52 # ' "
TILDE = 53 # ` ~
COMMA = 54 # , <
DOT = 55 # . >
SLASH = 56 # / ?
CAPS_LOCK = 57
F1 = 58
F2 = 59
F3 = 60
F4 = 61
F5 = 62
F6 = 63
F7 = 64
F8 = 65
F9 = 66
F10 = 67
F11 = 68
F12 = 69
PRINTSCREEN = 70
SCROLL_LOCK = 71
PAUSE = 72
INSERT = 73
HOME = 74
PAGEUP = 75
DELETE = 76
END = 77
PAGEDOWN = 78
RIGHT = 79 # Arrow keys
LEFT = 80
DOWN = 81
UP = 82
KP_NUM_LOCK = 83
KP_DIVIDE = 84
KP_AT = 85
KP_MULTIPLY = 85
KP_MINUS = 86
KP_PLUS = 87
KP_ENTER = 88
KP_1 = 89
KP_2 = 90
KP_3 = 91
KP_4 = 92
KP_5 = 93
KP_6 = 94
KP_7 = 95
KP_8 = 96
KP_9 = 97
KP_0 = 98
# HID modifier values (negated to allow them to be passed along with the normal keys)
LEFT_CTRL = -0x01
LEFT_SHIFT = -0x02
LEFT_ALT = -0x04
LEFT_UI = -0x08
RIGHT_CTRL = -0x10
RIGHT_SHIFT = -0x20
RIGHT_ALT = -0x40
RIGHT_UI = -0x80
# HID LED values
class LEDCode:
NUM_LOCK = 0x01
CAPS_LOCK = 0x02
SCROLL_LOCK = 0x04
COMPOSE = 0x08
KANA = 0x10

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device")
package("usb")

Wyświetl plik

@ -0,0 +1,306 @@
# MicroPython USB MIDI module
# MIT license; Copyright (c) 2023 Paul Hamshere, 2023-2024 Angus Gratton
from micropython import const, schedule
import struct
from .core import Interface, Buffer
_EP_IN_FLAG = const(1 << 7)
_INTERFACE_CLASS_AUDIO = const(0x01)
_INTERFACE_SUBCLASS_AUDIO_CONTROL = const(0x01)
_INTERFACE_SUBCLASS_AUDIO_MIDISTREAMING = const(0x03)
# Audio subclass extends the standard endpoint descriptor
# with two extra bytes
_STD_DESC_AUDIO_ENDPOINT_LEN = const(9)
_CLASS_DESC_ENDPOINT_LEN = const(5)
_STD_DESC_ENDPOINT_TYPE = const(0x5)
_JACK_TYPE_EMBEDDED = const(0x01)
_JACK_TYPE_EXTERNAL = const(0x02)
_JACK_IN_DESC_LEN = const(6)
_JACK_OUT_DESC_LEN = const(9)
# MIDI Status bytes. For Channel messages these are only the upper 4 bits, ORed with the channel number.
# As per https://www.midi.org/specifications-old/item/table-1-summary-of-midi-message
_MIDI_NOTE_OFF = const(0x80)
_MIDI_NOTE_ON = const(0x90)
_MIDI_POLY_KEYPRESS = const(0xA0)
_MIDI_CONTROL_CHANGE = const(0xB0)
# USB-MIDI CINs (Code Index Numbers), as per USB MIDI Table 4-1
_CIN_SYS_COMMON_2BYTE = const(0x2)
_CIN_SYS_COMMON_3BYTE = const(0x3)
_CIN_SYSEX_START = const(0x4)
_CIN_SYSEX_END_1BYTE = const(0x5)
_CIN_SYSEX_END_2BYTE = const(0x6)
_CIN_SYSEX_END_3BYTE = const(0x7)
_CIN_NOTE_OFF = const(0x8)
_CIN_NOTE_ON = const(0x9)
_CIN_POLY_KEYPRESS = const(0xA)
_CIN_CONTROL_CHANGE = const(0xB)
_CIN_PROGRAM_CHANGE = const(0xC)
_CIN_CHANNEL_PRESSURE = const(0xD)
_CIN_PITCH_BEND = const(0xE)
_CIN_SINGLE_BYTE = const(0xF) # Not currently supported
# Jack IDs for a simple bidrectional MIDI device(!)
_EMB_IN_JACK_ID = const(1)
_EXT_IN_JACK_ID = const(2)
_EMB_OUT_JACK_ID = const(3)
_EXT_OUT_JACK_ID = const(4)
# Data flows, as modelled by USB-MIDI and this hypothetical interface, are as follows:
# Device RX = USB OUT EP => _EMB_IN_JACK => _EMB_OUT_JACK
# Device TX = _EXT_IN_JACK => _EMB_OUT_JACK => USB IN EP
class MIDIInterface(Interface):
# Base class to implement a USB MIDI device in Python.
#
# To be compliant this also regisers a dummy USB Audio interface, but that
# interface isn't otherwise used.
def __init__(self, rxlen=16, txlen=16):
# Arguments are size of transmit and receive buffers in bytes.
super().__init__()
self.ep_out = None # Set during enumeration. RX direction (host to device)
self.ep_in = None # TX direction (device to host)
self._rx = Buffer(rxlen)
self._tx = Buffer(txlen)
# Callbacks for handling received MIDI messages.
#
# Subclasses can choose between overriding on_midi_event
# and handling all MIDI events manually, or overriding the
# functions for note on/off and control change, only.
def on_midi_event(self, cin, midi0, midi1, midi2):
ch = midi0 & 0x0F
if cin == _CIN_NOTE_ON:
self.on_note_on(ch, midi1, midi2)
elif cin == _CIN_NOTE_OFF:
self.on_note_off(ch, midi1, midi2)
elif cin == _CIN_CONTROL_CHANGE:
self.on_control_change(ch, midi1, midi2)
def on_note_on(self, channel, pitch, vel):
pass # Override to handle Note On messages
def on_note_off(self, channel, pitch, vel):
pass # Override to handle Note On messages
def on_control_change(self, channel, controller, value):
pass # Override to handle Control Change messages
# Helper functions for sending common MIDI messages
def note_on(self, channel, pitch, vel=0x40):
self.send_event(_CIN_NOTE_ON, _MIDI_NOTE_ON | channel, pitch, vel)
def note_off(self, channel, pitch, vel=0x40):
self.send_event(_CIN_NOTE_OFF, _MIDI_NOTE_OFF | channel, pitch, vel)
def control_change(self, channel, controller, value):
self.send_event(_CIN_CONTROL_CHANGE, _MIDI_CONTROL_CHANGE | channel, controller, value)
def send_event(self, cin, midi0, midi1=0, midi2=0):
# Queue a MIDI Event Packet to send to the host.
#
# CIN = USB-MIDI Code Index Number, see USB MIDI 1.0 section 4 "USB-MIDI Event Packets"
#
# Remaining arguments are 0-3 MIDI data bytes.
#
# Note this function returns when the MIDI Event Packet has been queued,
# not when it's been received by the host.
#
# Returns False if the TX buffer is full and the MIDI Event could not be queued.
w = self._tx.pend_write()
if len(w) < 4:
return False # TX buffer is full. TODO: block here?
w[0] = cin # leave cable number as 0?
w[1] = midi0
w[2] = midi1
w[3] = midi2
self._tx.finish_write(4)
self._tx_xfer()
return True
def _tx_xfer(self):
# Keep an active IN transfer to send data to the host, whenever
# there is data to send.
if self.is_open() and not self.xfer_pending(self.ep_in) and self._tx.readable():
self.submit_xfer(self.ep_in, self._tx.pend_read(), self._tx_cb)
def _tx_cb(self, ep, res, num_bytes):
if res == 0:
self._tx.finish_read(num_bytes)
self._tx_xfer()
def _rx_xfer(self):
# Keep an active OUT transfer to receive MIDI events from the host
if self.is_open() and not self.xfer_pending(self.ep_out) and self._rx.writable():
self.submit_xfer(self.ep_out, self._rx.pend_write(), self._rx_cb)
def _rx_cb(self, ep, res, num_bytes):
if res == 0:
self._rx.finish_write(num_bytes)
schedule(self._on_rx, None)
self._rx_xfer()
def on_open(self):
super().on_open()
# kick off any transfers that may have queued while the device was not open
self._tx_xfer()
self._rx_xfer()
def _on_rx(self, _):
# Receive MIDI events. Called via micropython.schedule, outside of the USB callback function.
m = self._rx.pend_read()
i = 0
while i <= len(m) - 4:
cin = m[i] & 0x0F
self.on_midi_event(cin, m[i + 1], m[i + 2], m[i + 3])
i += 4
self._rx.finish_read(i)
def desc_cfg(self, desc, itf_num, ep_num, strs):
# Start by registering a USB Audio Control interface, that is required to point to the
# actual MIDI interface
desc.interface(itf_num, 0, _INTERFACE_CLASS_AUDIO, _INTERFACE_SUBCLASS_AUDIO_CONTROL)
# Append the class-specific AudioControl interface descriptor
desc.pack(
"<BBBHHBB",
9, # bLength
0x24, # bDescriptorType CS_INTERFACE
0x01, # bDescriptorSubtype MS_HEADER
0x0100, # BcdADC
0x0009, # wTotalLength
0x01, # bInCollection,
itf_num + 1, # baInterfaceNr - points to the MIDI Streaming interface
)
# Next add the MIDI Streaming interface descriptor
desc.interface(
itf_num + 1, 2, _INTERFACE_CLASS_AUDIO, _INTERFACE_SUBCLASS_AUDIO_MIDISTREAMING
)
# Append the class-specific interface descriptors
# Midi Streaming interface descriptor
desc.pack(
"<BBBHH",
7, # bLength
0x24, # bDescriptorType CS_INTERFACE
0x01, # bDescriptorSubtype MS_HEADER
0x0100, # BcdADC
# wTotalLength: of all class-specific descriptors
7
+ 2
* (
_JACK_IN_DESC_LEN
+ _JACK_OUT_DESC_LEN
+ _STD_DESC_AUDIO_ENDPOINT_LEN
+ _CLASS_DESC_ENDPOINT_LEN
),
)
# The USB MIDI standard 1.0 allows modelling a baffling range of MIDI
# devices with different permutations of Jack descriptors, with a lot of
# scope for indicating internal connections in the device (as
# "virtualised" by the USB MIDI standard). Much of the options don't
# really change the USB behaviour but provide metadata to the host.
#
# As observed elsewhere online, the standard ends up being pretty
# complex and unclear in parts, but there is a clear simple example in
# an Appendix. So nearly everyone implements the device from the
# Appendix as-is, even when it's not a good fit for their application,
# and ignores the rest of the standard.
#
# For now, this is what this class does as well.
_jack_in_desc(desc, _JACK_TYPE_EMBEDDED, _EMB_IN_JACK_ID)
_jack_in_desc(desc, _JACK_TYPE_EXTERNAL, _EXT_IN_JACK_ID)
_jack_out_desc(desc, _JACK_TYPE_EMBEDDED, _EMB_OUT_JACK_ID, _EXT_IN_JACK_ID, 1)
_jack_out_desc(desc, _JACK_TYPE_EXTERNAL, _EXT_OUT_JACK_ID, _EMB_IN_JACK_ID, 1)
# One MIDI endpoint in each direction, plus the
# associated CS descriptors
self.ep_out = ep_num
self.ep_in = ep_num | _EP_IN_FLAG
# rx side, USB "in" endpoint and embedded MIDI IN Jacks
_audio_endpoint(desc, self.ep_in, _EMB_OUT_JACK_ID)
# tx side, USB "out" endpoint and embedded MIDI OUT jacks
_audio_endpoint(desc, self.ep_out, _EMB_IN_JACK_ID)
def num_itfs(self):
return 2
def num_eps(self):
return 1
def _jack_in_desc(desc, bJackType, bJackID):
# Utility function appends a "JACK IN" descriptor with
# specified bJackType and bJackID
desc.pack(
"<BBBBBB",
_JACK_IN_DESC_LEN, # bLength
0x24, # bDescriptorType CS_INTERFACE
0x02, # bDescriptorSubtype MIDI_IN_JACK
bJackType,
bJackID,
0x00, # iJack, no string descriptor support yet
)
def _jack_out_desc(desc, bJackType, bJackID, bSourceId, bSourcePin):
# Utility function appends a "JACK IN" descriptor with
# specified bJackType and bJackID
desc.pack(
"<BBBBBBBBB",
_JACK_OUT_DESC_LEN, # bLength
0x24, # bDescriptorType CS_INTERFACE
0x03, # bDescriptorSubtype MIDI_OUT_JACK
bJackType,
bJackID,
0x01, # bNrInputPins
bSourceId, # baSourceID(1)
bSourcePin, # baSourcePin(1)
0x00, # iJack, no string descriptor support yet
)
def _audio_endpoint(desc, bEndpointAddress, emb_jack_id):
# Append a standard USB endpoint descriptor and the USB class endpoint descriptor
# for this endpoint.
#
# Audio Class devices extend the standard endpoint descriptor with two extra bytes,
# so we can't easily call desc.endpoint() for the first part.
desc.pack(
# Standard USB endpoint descriptor (plus audio tweaks)
"<BBBBHBBB"
# Class endpoint descriptor
"BBBBB",
_STD_DESC_AUDIO_ENDPOINT_LEN, # wLength
_STD_DESC_ENDPOINT_TYPE, # bDescriptorType
bEndpointAddress,
2, # bmAttributes, bulk
64, # wMaxPacketSize
0, # bInterval
0, # bRefresh (unused)
0, # bSynchInterval (unused)
_CLASS_DESC_ENDPOINT_LEN, # bLength
0x25, # bDescriptorType CS_ENDPOINT
0x01, # bDescriptorSubtype MS_GENERAL
1, # bNumEmbMIDIJack
emb_jack_id, # BaAssocJackID(1)
)

Wyświetl plik

@ -0,0 +1,3 @@
metadata(version="0.1.0")
require("usb-device-hid")
package("usb")

Wyświetl plik

@ -0,0 +1,100 @@
# MicroPython USB Mouse module
#
# MIT license; Copyright (c) 2023-2024 Angus Gratton
from micropython import const
import struct
import machine
from usb.device.hid import HIDInterface
_INTERFACE_PROTOCOL_MOUSE = const(0x02)
class MouseInterface(HIDInterface):
# A basic three button USB mouse HID interface
def __init__(self, interface_str="MicroPython Mouse"):
super().__init__(
_MOUSE_REPORT_DESC,
protocol=_INTERFACE_PROTOCOL_MOUSE,
interface_str=interface_str,
)
self._l = False # Left button
self._m = False # Middle button
self._r = False # Right button
self._buf = bytearray(3)
def send_report(self, dx=0, dy=0):
b = 0
if self._l:
b |= 1 << 0
if self._r:
b |= 1 << 1
if self._m:
b |= 1 << 2
# Wait for any pending report to be sent to the host
# before updating contents of _buf.
#
# This loop can be removed if you don't care about possibly missing a
# transient report, the final report buffer contents will always be the
# last one sent to the host (it just might lose one of the ones in the
# middle).
while self.busy():
machine.idle()
struct.pack_into("Bbb", self._buf, 0, b, dx, dy)
return super().send_report(self._buf)
def click_left(self, down=True):
self._l = down
return self.send_report()
def click_middle(self, down=True):
self._m = down
return self.send_report()
def click_right(self, down=True):
self._r = down
return self.send_report()
def move_by(self, dx, dy):
if not -127 <= dx <= 127:
raise ValueError("dx")
if not -127 <= dy <= 127:
raise ValueError("dy")
return self.send_report(dx, dy)
# Basic 3-button mouse HID Report Descriptor.
# This is based on Appendix E.10 of the HID v1.11 document.
# fmt: off
_MOUSE_REPORT_DESC = (
b'\x05\x01' # Usage Page (Generic Desktop)
b'\x09\x02' # Usage (Mouse)
b'\xA1\x01' # Collection (Application)
b'\x09\x01' # Usage (Pointer)
b'\xA1\x00' # Collection (Physical)
b'\x05\x09' # Usage Page (Buttons)
b'\x19\x01' # Usage Minimum (01),
b'\x29\x03' # Usage Maximun (03),
b'\x15\x00' # Logical Minimum (0),
b'\x25\x01' # Logical Maximum (1),
b'\x95\x03' # Report Count (3),
b'\x75\x01' # Report Size (1),
b'\x81\x02' # Input (Data, Variable, Absolute), ;3 button bits
b'\x95\x01' # Report Count (1),
b'\x75\x05' # Report Size (5),
b'\x81\x01' # Input (Constant), ;5 bit padding
b'\x05\x01' # Usage Page (Generic Desktop),
b'\x09\x30' # Usage (X),
b'\x09\x31' # Usage (Y),
b'\x15\x81' # Logical Minimum (-127),
b'\x25\x7F' # Logical Maximum (127),
b'\x75\x08' # Report Size (8),
b'\x95\x02' # Report Count (2),
b'\x81\x06' # Input (Data, Variable, Relative), ;2 position bytes (X & Y)
b'\xC0' # End Collection
b'\xC0' # End Collection
)
# fmt: on

Wyświetl plik

@ -0,0 +1,2 @@
metadata(version="0.1.0")
package("usb")

Wyświetl plik

@ -0,0 +1,98 @@
# Tests for the Buffer class included in usb.device.core
#
# The easiest way to run this is using unix port. From the top-level usb-device directory,
# run as:
#
# $ micropython -m tests.test_core_buffer
#
import micropython
from usb.device import core
from core import Buffer
if not hasattr(core.machine, "disable_irq"):
# Inject a fake machine module which allows testing on the unix port, and as
# a bonus have tests fail if the buffer allocates inside a critical section.
class FakeMachine:
def disable_irq(self):
return micropython.heap_lock()
def enable_irq(self, was_locked):
if not was_locked:
micropython.heap_unlock()
core.machine = FakeMachine()
b = Buffer(16)
# Check buffer is empty
assert b.readable() == 0
assert b.writable() == 16
# Single write then read
w = b.pend_write()
assert len(w) == 16
w[:8] = b"12345678"
b.finish_write(8)
# Empty again
assert b.readable() == 8
assert b.writable() == 8
r = b.pend_read()
assert len(r) == 8
assert r == b"12345678"
b.finish_read(8)
# Empty buffer again
assert b.readable() == 0
assert b.writable() == 16
# Single write then split reads
b.write(b"abcdefghijklmnop")
assert b.writable() == 0 # full buffer
r = b.pend_read()
assert r == b"abcdefghijklmnop"
b.finish_read(2)
r = b.pend_read()
assert r == b"cdefghijklmnop"
b.finish_read(3)
# write to end of buffer
b.write(b"AB")
r = b.pend_read()
assert r == b"fghijklmnopAB"
# write while a read is pending
b.write(b"XY")
# previous pend_read() memoryview should be the same
assert r == b"fghijklmnopAB"
b.finish_read(4)
r = b.pend_read()
assert r == b"jklmnopABXY" # four bytes consumed from head, one new byte at tail
# read while a write is pending
w = b.pend_write()
assert len(w) == 5
r = b.pend_read()
assert len(r) == 11
b.finish_read(3)
w[:2] = b"12"
b.finish_write(2)
# Expected final state of buffer
tmp = bytearray(b.readable())
assert b.readinto(tmp) == len(tmp)
assert tmp == b"mnopABXY12"
# Now buffer is empty again
assert b.readable() == 0
assert b.readinto(tmp) == 0
assert b.writable() == 16
print("All Buffer tests passed")

Wyświetl plik

@ -0,0 +1,2 @@
from . import core
from .core import get # Singleton _Device getter

Wyświetl plik

@ -0,0 +1,851 @@
# MicroPython Library runtime USB device implementation
#
# These contain the classes and utilities that are needed to
# implement a USB device, not any complete USB drivers.
#
# MIT license; Copyright (c) 2022-2024 Angus Gratton
from micropython import const
import machine
import struct
_EP_IN_FLAG = const(1 << 7)
# USB descriptor types
_STD_DESC_DEV_TYPE = const(0x1)
_STD_DESC_CONFIG_TYPE = const(0x2)
_STD_DESC_STRING_TYPE = const(0x3)
_STD_DESC_INTERFACE_TYPE = const(0x4)
_STD_DESC_ENDPOINT_TYPE = const(0x5)
_STD_DESC_INTERFACE_ASSOC = const(0xB)
_ITF_ASSOCIATION_DESC_TYPE = const(0xB) # Interface Association descriptor
# Standard USB descriptor lengths
_STD_DESC_CONFIG_LEN = const(9)
_STD_DESC_ENDPOINT_LEN = const(7)
_STD_DESC_INTERFACE_LEN = const(9)
_DESC_OFFSET_LEN = const(0)
_DESC_OFFSET_TYPE = const(1)
_DESC_OFFSET_INTERFACE_NUM = const(2) # for _STD_DESC_INTERFACE_TYPE
_DESC_OFFSET_ENDPOINT_NUM = const(2) # for _STD_DESC_ENDPOINT_TYPE
# Standard control request bmRequest fields, can extract by calling split_bmRequestType()
_REQ_RECIPIENT_DEVICE = const(0x0)
_REQ_RECIPIENT_INTERFACE = const(0x1)
_REQ_RECIPIENT_ENDPOINT = const(0x2)
_REQ_RECIPIENT_OTHER = const(0x3)
# Offsets into the standard configuration descriptor, to fixup
_OFFS_CONFIG_iConfiguration = const(6)
_INTERFACE_CLASS_VENDOR = const(0xFF)
_INTERFACE_SUBCLASS_NONE = const(0x00)
_PROTOCOL_NONE = const(0x00)
# These need to match the constants in tusb_config.h
_USB_STR_MANUF = const(0x01)
_USB_STR_PRODUCT = const(0x02)
_USB_STR_SERIAL = const(0x03)
# Error constant to match mperrno.h
_MP_EINVAL = const(22)
_dev = None # Singleton _Device instance
def get():
# Getter to access the singleton instance of the
# MicroPython _Device object
#
# (note this isn't the low-level machine.USBDevice object, the low-level object is
# get()._usbd.)
global _dev
if not _dev:
_dev = _Device()
return _dev
class _Device:
# Class that implements the Python parts of the MicroPython USBDevice.
#
# This class should only be instantiated by the singleton getter
# function usb.device.get(), never directly.
def __init__(self):
self._itfs = {} # Mapping from interface number to interface object, set by init()
self._eps = {} # Mapping from endpoint address to interface object, set by _open_cb()
self._ep_cbs = {} # Mapping from endpoint address to Optional[xfer callback]
self._usbd = machine.USBDevice() # low-level API
def init(self, *itfs, **kwargs):
# Helper function to configure the USB device and activate it in a single call
self.active(False)
self.config(*itfs, **kwargs)
self.active(True)
def config( # noqa: PLR0913
self,
*itfs,
builtin_driver=False,
manufacturer_str=None,
product_str=None,
serial_str=None,
configuration_str=None,
id_vendor=None,
id_product=None,
bcd_device=None,
device_class=0,
device_subclass=0,
device_protocol=0,
config_str=None,
max_power_ma=None,
):
# Configure the USB device with a set of interfaces, and optionally reconfiguring the
# device and configuration descriptor fields
_usbd = self._usbd
if self.active():
raise OSError(_MP_EINVAL) # Must set active(False) first
# Convenience: Allow builtin_driver to be True, False or one of
# the machine.USBDevice.BUILTIN_ constants
if isinstance(builtin_driver, bool):
builtin_driver = _usbd.BUILTIN_DEFAULT if builtin_driver else _usbd.BUILTIN_NONE
builtin = _usbd.builtin_driver = builtin_driver
# Putting None for any strings that should fall back to the "built-in" value
# Indexes in this list depends on _USB_STR_MANUF, _USB_STR_PRODUCT, _USB_STR_SERIAL
strs = [None, manufacturer_str, product_str, serial_str]
# Build the device descriptor
FMT = "<BBHBBBBHHHBBBB"
# read the static descriptor fields
f = struct.unpack(FMT, builtin_driver.desc_dev)
def maybe_set(value, idx):
# Override a numeric descriptor value or keep builtin value f[idx] if 'value' is None
if value is not None:
return value
return f[idx]
# Either copy each descriptor field directly from the builtin device descriptor, or 'maybe'
# set it to the custom value from the object
desc_dev = struct.pack(
FMT,
f[0], # bLength
f[1], # bDescriptorType
f[2], # bcdUSB
device_class, # bDeviceClass
device_subclass, # bDeviceSubClass
device_protocol, # bDeviceProtocol
f[6], # bMaxPacketSize0, TODO: allow overriding this value?
maybe_set(id_vendor, 7), # idVendor
maybe_set(id_product, 8), # idProduct
maybe_set(bcd_device, 9), # bcdDevice
_USB_STR_MANUF, # iManufacturer
_USB_STR_PRODUCT, # iProduct
_USB_STR_SERIAL, # iSerialNumber
1,
) # bNumConfigurations
# Iterate interfaces to build the configuration descriptor
# Keep track of the interface and endpoint indexes
itf_num = builtin_driver.itf_max
ep_num = max(builtin_driver.ep_max, 1) # Endpoint 0 always reserved for control
while len(strs) < builtin_driver.str_max:
strs.append(None) # Reserve other string indexes used by builtin drivers
initial_cfg = builtin_driver.desc_cfg or (b"\x00" * _STD_DESC_CONFIG_LEN)
self._itfs = {}
# Determine the total length of the configuration descriptor, by making dummy
# calls to build the config descriptor
desc = Descriptor(None)
desc.extend(initial_cfg)
for itf in itfs:
itf.desc_cfg(desc, 0, 0, [])
# Allocate the real Descriptor helper to write into it, starting
# after the standard configuration descriptor
desc = Descriptor(bytearray(desc.o))
desc.extend(initial_cfg)
for itf in itfs:
itf.desc_cfg(desc, itf_num, ep_num, strs)
for _ in range(itf.num_itfs()):
self._itfs[itf_num] = itf # Mapping from interface numbers to interfaces
itf_num += 1
ep_num += itf.num_eps()
# Go back and update the Standard Configuration Descriptor
# header at the start with values based on the complete
# descriptor.
#
# See USB 2.0 specification section 9.6.3 p264 for details.
bmAttributes = (
(1 << 7) # Reserved
| (0 if max_power_ma else (1 << 6)) # Self-Powered
# Remote Wakeup not currently supported
)
# Configuration string is optional but supported
iConfiguration = 0
if configuration_str:
iConfiguration = len(strs)
strs.append(configuration_str)
if max_power_ma is not None:
# Convert from mA to the units used in the descriptor
max_power_ma //= 2
else:
try:
# Default to whatever value the builtin driver reports
max_power_ma = _usbd.BUILTIN_DEFAULT.desc_cfg[8]
except IndexError:
# If no built-in driver, default to 250mA
max_power_ma = 125
desc.pack_into(
"<BBHBBBBB",
0,
_STD_DESC_CONFIG_LEN, # bLength
_STD_DESC_CONFIG_TYPE, # bDescriptorType
len(desc.b), # wTotalLength
itf_num,
1, # bConfigurationValue
iConfiguration,
bmAttributes,
max_power_ma,
)
_usbd.config(
desc_dev,
desc.b,
strs,
self._open_itf_cb,
self._reset_cb,
self._control_xfer_cb,
self._xfer_cb,
)
def active(self, *optional_value):
# Thin wrapper around the USBDevice active() function.
#
# Note: active only means the USB device is available, not that it has
# actually been connected to and configured by a USB host. Use the
# Interface.is_open() function to check if the host has configured an
# interface of the device.
return self._usbd.active(*optional_value)
def _open_itf_cb(self, desc):
# Singleton callback from TinyUSB custom class driver, when USB host does
# Set Configuration. Called once per interface or IAD.
# Note that even if the configuration descriptor contains an IAD, 'desc'
# starts from the first interface descriptor in the IAD and not the IAD
# descriptor.
itf_num = desc[_DESC_OFFSET_INTERFACE_NUM]
itf = self._itfs[itf_num]
# Scan the full descriptor:
# - Build _eps and _ep_addr from the endpoint descriptors
# - Find the highest numbered interface provided to the callback
# (which will be the first interface, unless we're scanning
# multiple interfaces inside an IAD.)
offs = 0
max_itf = itf_num
while offs < len(desc):
dl = desc[offs + _DESC_OFFSET_LEN]
dt = desc[offs + _DESC_OFFSET_TYPE]
if dt == _STD_DESC_ENDPOINT_TYPE:
ep_addr = desc[offs + _DESC_OFFSET_ENDPOINT_NUM]
self._eps[ep_addr] = itf
self._ep_cbs[ep_addr] = None
elif dt == _STD_DESC_INTERFACE_TYPE:
max_itf = max(max_itf, desc[offs + _DESC_OFFSET_INTERFACE_NUM])
offs += dl
# If 'desc' is not the inside of an Interface Association Descriptor but
# 'itf' object still represents multiple USB interfaces (i.e. MIDI),
# defer calling 'itf.on_open()' until this callback fires for the
# highest numbered USB interface.
#
# This means on_open() is only called once, and that it can
# safely submit transfers on any of the USB interfaces' endpoints.
if self._itfs.get(max_itf + 1, None) != itf:
itf.on_open()
def _reset_cb(self):
# Callback when the USB device is reset by the host
# Allow interfaces to respond to the reset
for itf in self._itfs.values():
itf.on_reset()
# Rebuilt when host re-enumerates
self._eps = {}
self._ep_cbs = {}
def _submit_xfer(self, ep_addr, data, done_cb=None):
# Singleton function to submit a USB transfer (of any type except control).
#
# Generally, drivers should call Interface.submit_xfer() instead. See
# that function for documentation about the possible parameter values.
if ep_addr not in self._eps:
raise ValueError("ep_addr")
if self._ep_cbs[ep_addr]:
raise RuntimeError("xfer_pending")
# USBDevice callback may be called immediately, before Python execution
# continues, so set it first.
#
# To allow xfer_pending checks to work, store True instead of None.
self._ep_cbs[ep_addr] = done_cb or True
return self._usbd.submit_xfer(ep_addr, data)
def _xfer_cb(self, ep_addr, result, xferred_bytes):
# Singleton callback from TinyUSB custom class driver when a transfer completes.
cb = self._ep_cbs.get(ep_addr, None)
self._ep_cbs[ep_addr] = None
if callable(cb):
cb(ep_addr, result, xferred_bytes)
def _control_xfer_cb(self, stage, request):
# Singleton callback from TinyUSB custom class driver when a control
# transfer is in progress.
#
# stage determines appropriate responses (possible values
# utils.STAGE_SETUP, utils.STAGE_DATA, utils.STAGE_ACK).
#
# The TinyUSB class driver framework only calls this function for
# particular types of control transfer, other standard control transfers
# are handled by TinyUSB itself.
wIndex = request[4] + (request[5] << 8)
recipient, _, _ = split_bmRequestType(request[0])
itf = None
result = None
if recipient == _REQ_RECIPIENT_DEVICE:
itf = self._itfs.get(wIndex & 0xFFFF, None)
if itf:
result = itf.on_device_control_xfer(stage, request)
elif recipient == _REQ_RECIPIENT_INTERFACE:
itf = self._itfs.get(wIndex & 0xFFFF, None)
if itf:
result = itf.on_interface_control_xfer(stage, request)
elif recipient == _REQ_RECIPIENT_ENDPOINT:
ep_num = wIndex & 0xFFFF
itf = self._eps.get(ep_num, None)
if itf:
result = itf.on_endpoint_control_xfer(stage, request)
if not itf:
# At time this code was written, only the control transfers shown
# above are passed to the class driver callback. See
# invoke_class_control() in tinyusb usbd.c
raise RuntimeError(f"Unexpected control request type {request[0]:#x}")
# Expecting any of the following possible replies from
# on_NNN_control_xfer():
#
# True - Continue transfer, no data
# False - STALL transfer
# Object with buffer interface - submit this data for the control transfer
return result
class Interface:
# Abstract base class to implement USB Interface (and associated endpoints),
# or a collection of USB Interfaces, in Python
#
# (Despite the name an object of type Interface can represent multiple
# associated interfaces, with or without an Interface Association Descriptor
# prepended to them. Override num_itfs() if assigning >1 USB interface.)
def __init__(self):
self._open = False
def desc_cfg(self, desc, itf_num, ep_num, strs):
# Function to build configuration descriptor contents for this interface
# or group of interfaces. This is called on each interface from
# USBDevice.init().
#
# This function should insert:
#
# - At least one standard Interface descriptor (can call
# - desc.interface()).
#
# Plus, optionally:
#
# - One or more endpoint descriptors (can call desc.endpoint()).
# - An Interface Association Descriptor, prepended before.
# - Other class-specific configuration descriptor data.
#
# This function is called twice per call to USBDevice.init(). The first
# time the values of all arguments are dummies that are used only to
# calculate the total length of the descriptor. Therefore, anything this
# function does should be idempotent and it should add the same
# descriptors each time. If saving interface numbers or endpoint numbers
# for later
#
# Parameters:
#
# - desc - Descriptor helper to write the configuration descriptor bytes into.
# The first time this function is called 'desc' is a dummy object
# with no backing buffer (exists to count the number of bytes needed).
#
# - itf_num - First bNumInterfaces value to assign. The descriptor
# should contain the same number of interfaces returned by num_itfs(),
# starting from this value.
#
# - ep_num - Address of the first available endpoint number to use for
# endpoint descriptor addresses. Subclasses should save the
# endpoint addresses selected, to look up later (although note the first
# time this function is called, the values will be dummies.)
#
# - strs - list of string descriptors for this USB device. This function
# can append to this list, and then insert the index of the new string
# in the list into the configuration descriptor.
raise NotImplementedError
def num_itfs(self):
# Return the number of actual USB Interfaces represented by this object
# (as set in desc_cfg().)
#
# Only needs to be overriden if implementing a Interface class that
# represents more than one USB Interface descriptor (i.e. MIDI), or an
# Interface Association Descriptor (i.e. USB-CDC).
return 1
def num_eps(self):
# Return the number of USB Endpoint numbers represented by this object
# (as set in desc_cfg().)
#
# Note for each count returned by this function, the interface may
# choose to have both an IN and OUT endpoint (i.e. IN flag is not
# considered a value here.)
#
# This value can be zero, if the USB Host only communicates with this
# interface using control transfers.
return 0
def on_open(self):
# Callback called when the USB host accepts the device configuration.
#
# Override this function to initiate any operations that the USB interface
# should do when the USB device is configured to the host.
self._open = True
def on_reset(self):
# Callback called on every registered interface when the USB device is
# reset by the host. This can happen when the USB device is unplugged,
# or if the host triggers a reset for some other reason.
#
# Override this function to cancel any pending operations specific to
# the interface (outstanding USB transfers are already cancelled).
#
# At this point, no USB functionality is available - on_open() will
# be called later if/when the USB host re-enumerates and configures the
# interface.
self._open = False
def is_open(self):
# Returns True if the interface has been configured by the host and is in
# active use.
return self._open
def on_device_control_xfer(self, stage, request):
# Control transfer callback. Override to handle a non-standard device
# control transfer where bmRequestType Recipient is Device, Type is
# utils.REQ_TYPE_CLASS, and the lower byte of wIndex indicates this interface.
#
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
#
# This particular request type seems pretty uncommon for a device class
# driver to need to handle, most hosts will not send this so most
# implementations won't need to override it.
#
# Parameters:
#
# - stage is one of utils.STAGE_SETUP, utils.STAGE_DATA, utils.STAGE_ACK.
#
# - request is a memoryview into a USB request packet, as per USB 2.0
# specification 9.3 USB Device Requests, p250. the memoryview is only
# valid while the callback is running.
#
# The function can call split_bmRequestType(request[0]) to split
# bmRequestType into (Recipient, Type, Direction).
#
# Result, any of:
#
# - True to continue the request, False to STALL the endpoint.
# - Buffer interface object to provide a buffer to the host as part of the
# transfer, if applicable.
return False
def on_interface_control_xfer(self, stage, request):
# Control transfer callback. Override to handle a device control
# transfer where bmRequestType Recipient is Interface, and the lower byte
# of wIndex indicates this interface.
#
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
#
# bmRequestType Type field may have different values. It's not necessary
# to handle the mandatory Standard requests (bmRequestType Type ==
# utils.REQ_TYPE_STANDARD), if the driver returns False in these cases then
# TinyUSB will provide the necessary responses.
#
# See on_device_control_xfer() for a description of the arguments and
# possible return values.
return False
def on_endpoint_control_xfer(self, stage, request):
# Control transfer callback. Override to handle a device
# control transfer where bmRequestType Recipient is Endpoint and
# the lower byte of wIndex indicates an endpoint address associated
# with this interface.
#
# bmRequestType Type will generally have any value except
# utils.REQ_TYPE_STANDARD, as Standard endpoint requests are handled by
# TinyUSB. The exception is the the Standard "Set Feature" request. This
# is handled by Tiny USB but also passed through to the driver in case it
# needs to change any internal state, but most drivers can ignore and
# return False in this case.
#
# (See USB 2.0 specification 9.4 Standard Device Requests, p250).
#
# See on_device_control_xfer() for a description of the parameters and
# possible return values.
return False
def xfer_pending(self, ep_addr):
# Return True if a transfer is already pending on ep_addr.
#
# Only one transfer can be submitted at a time.
return _dev and bool(_dev._ep_cbs[ep_addr])
def submit_xfer(self, ep_addr, data, done_cb=None):
# Submit a USB transfer (of any type except control)
#
# Parameters:
#
# - ep_addr. Address of the endpoint to submit the transfer on. Caller is
# responsible for ensuring that ep_addr is correct and belongs to this
# interface. Only one transfer can be active at a time on each endpoint.
#
# - data. Buffer containing data to send, or for data to be read into
# (depending on endpoint direction).
#
# - done_cb. Optional callback function for when the transfer
# completes. The callback is called with arguments (ep_addr, result,
# xferred_bytes) where result is one of xfer_result_t enum (see top of
# this file), and xferred_bytes is an integer.
#
# If the function returns, the transfer is queued.
#
# The function will raise RuntimeError under the following conditions:
#
# - The interface is not "open" (i.e. has not been enumerated and configured
# by the host yet.)
#
# - A transfer is already pending on this endpoint (use xfer_pending() to check
# before sending if needed.)
#
# - A DCD error occurred when queueing the transfer on the hardware.
#
#
# Will raise TypeError if 'data' isn't he correct type of buffer for the
# endpoint transfer direction.
#
# Note that done_cb may be called immediately, possibly before this
# function has returned to the caller.
if not self._open:
raise RuntimeError("Not open")
_dev._submit_xfer(ep_addr, data, done_cb)
def stall(self, ep_addr, *args):
# Set or get the endpoint STALL state.
#
# To get endpoint stall stage, call with a single argument.
# To set endpoint stall state, call with an additional boolean
# argument to set or clear.
#
# Generally endpoint STALL is handled automatically, but there are some
# device classes that need to explicitly stall or unstall an endpoint
# under certain conditions.
if not self._open or ep_addr not in self._eps:
raise RuntimeError
_dev._usbd.stall(ep_addr, *args)
class Descriptor:
# Wrapper class for writing a descriptor in-place into a provided buffer
#
# Doesn't resize the buffer.
#
# Can be initialised with b=None to perform a dummy pass that calculates the
# length needed for the buffer.
def __init__(self, b):
self.b = b
self.o = 0 # offset of data written to the buffer
def pack(self, fmt, *args):
# Utility function to pack new data into the descriptor
# buffer, starting at the current offset.
#
# Arguments are the same as struct.pack(), but it fills the
# pre-allocated descriptor buffer (growing if needed), instead of
# returning anything.
self.pack_into(fmt, self.o, *args)
def pack_into(self, fmt, offs, *args):
# Utility function to pack new data into the descriptor at offset 'offs'.
#
# If the data written is before 'offs' then self.o isn't incremented,
# otherwise it's incremented to point at the end of the written data.
end = offs + struct.calcsize(fmt)
if self.b:
struct.pack_into(fmt, self.b, offs, *args)
self.o = max(self.o, end)
def extend(self, a):
# Extend the descriptor with some bytes-like data
if self.b:
self.b[self.o : self.o + len(a)] = a
self.o += len(a)
# TODO: At the moment many of these arguments are named the same as the relevant field
# in the spec, as this is easier to understand. Can save some code size by collapsing them
# down.
def interface(
self,
bInterfaceNumber,
bNumEndpoints,
bInterfaceClass=_INTERFACE_CLASS_VENDOR,
bInterfaceSubClass=_INTERFACE_SUBCLASS_NONE,
bInterfaceProtocol=_PROTOCOL_NONE,
iInterface=0,
):
# Utility function to append a standard Interface descriptor, with
# the properties specified in the parameter list.
#
# Defaults for bInterfaceClass, SubClass and Protocol are a "vendor"
# device.
#
# Note that iInterface is a string index number. If set, it should be set
# by the caller Interface to the result of self._get_str_index(s),
# where 's' is a string found in self.strs.
self.pack(
"BBBBBBBBB",
_STD_DESC_INTERFACE_LEN, # bLength
_STD_DESC_INTERFACE_TYPE, # bDescriptorType
bInterfaceNumber,
0, # bAlternateSetting, not currently supported
bNumEndpoints,
bInterfaceClass,
bInterfaceSubClass,
bInterfaceProtocol,
iInterface,
)
def endpoint(self, bEndpointAddress, bmAttributes, wMaxPacketSize, bInterval=1):
# Utility function to append a standard Endpoint descriptor, with
# the properties specified in the parameter list.
#
# See USB 2.0 specification section 9.6.6 Endpoint p269
#
# As well as a numeric value, bmAttributes can be a string value to represent
# common endpoint types: "control", "bulk", "interrupt".
if bmAttributes == "control":
bmAttributes = 0
elif bmAttributes == "bulk":
bmAttributes = 2
elif bmAttributes == "interrupt":
bmAttributes = 3
self.pack(
"<BBBBHB",
_STD_DESC_ENDPOINT_LEN,
_STD_DESC_ENDPOINT_TYPE,
bEndpointAddress,
bmAttributes,
wMaxPacketSize,
bInterval,
)
def interface_assoc(
self,
bFirstInterface,
bInterfaceCount,
bFunctionClass,
bFunctionSubClass,
bFunctionProtocol=_PROTOCOL_NONE,
iFunction=0,
):
# Utility function to append an Interface Association descriptor,
# with the properties specified in the parameter list.
#
# See USB ECN: Interface Association Descriptor.
self.pack(
"<BBBBBBBB",
8,
_ITF_ASSOCIATION_DESC_TYPE,
bFirstInterface,
bInterfaceCount,
bFunctionClass,
bFunctionSubClass,
bFunctionProtocol,
iFunction,
)
def split_bmRequestType(bmRequestType):
# Utility function to split control transfer field bmRequestType into a tuple of 3 fields:
#
# Recipient
# Type
# Data transfer direction
#
# See USB 2.0 specification section 9.3 USB Device Requests and 9.3.1 bmRequestType, p248.
return (
bmRequestType & 0x1F,
(bmRequestType >> 5) & 0x03,
(bmRequestType >> 7) & 0x01,
)
class Buffer:
# An interrupt-safe producer/consumer buffer that wraps a bytearray object.
#
# Kind of like a ring buffer, but supports the idea of returning a
# memoryview for either read or write of multiple bytes (suitable for
# passing to a buffer function without needing to allocate another buffer to
# read into.)
#
# Consumer can call pend_read() to get a memoryview to read from, and then
# finish_read(n) when done to indicate it read 'n' bytes from the
# memoryview. There is also a readinto() convenience function.
#
# Producer must call pend_write() to get a memorybuffer to write into, and
# then finish_write(n) when done to indicate it wrote 'n' bytes into the
# memoryview. There is also a normal write() convenience function.
#
# - Only one producer and one consumer is supported.
#
# - Calling pend_read() and pend_write() is effectively idempotent, they can be
# called more than once without a corresponding finish_x() call if necessary
# (provided only one thread does this, as per the previous point.)
#
# - Calling finish_write() and finish_read() is hard interrupt safe (does
# not allocate). pend_read() and pend_write() each allocate 1 block for
# the memoryview that is returned.
#
# The buffer contents are always laid out as:
#
# - Slice [:_n] = bytes of valid data waiting to read
# - Slice [_n:_w] = unused space
# - Slice [_w:] = bytes of pending write buffer waiting to be written
#
# This buffer should be fast when most reads and writes are balanced and use
# the whole buffer. When this doesn't happen, performance degrades to
# approximate a Python-based single byte ringbuffer.
#
def __init__(self, length):
self._b = memoryview(bytearray(length))
# number of bytes in buffer read to read, starting at index 0. Updated
# by both producer & consumer.
self._n = 0
# start index of a pending write into the buffer, if any. equals
# len(self._b) if no write is pending. Updated by producer only.
self._w = length
def writable(self):
# Number of writable bytes in the buffer. Assumes no pending write is outstanding.
return len(self._b) - self._n
def readable(self):
# Number of readable bytes in the buffer. Assumes no pending read is outstanding.
return self._n
def pend_write(self, wmax=None):
# Returns a memoryview that the producer can write bytes into.
# start the write at self._n, the end of data waiting to read
#
# If wmax is set then the memoryview is pre-sliced to be at most
# this many bytes long.
#
# (No critical section needed as self._w is only updated by the producer.)
self._w = self._n
end = (self._w + wmax) if wmax else len(self._b)
return self._b[self._w : end]
def finish_write(self, nbytes):
# Called by the producer to indicate it wrote nbytes into the buffer.
ist = machine.disable_irq()
try:
assert nbytes <= len(self._b) - self._w # can't say we wrote more than was pended
if self._n == self._w:
# no data was read while the write was happening, so the buffer is already in place
# (this is the fast path)
self._n += nbytes
else:
# Slow path: data was read while the write was happening, so
# shuffle the newly written bytes back towards index 0 to avoid fragmentation
#
# As this updates self._n we have to do it in the critical
# section, so do it byte by byte to avoid allocating.
while nbytes > 0:
self._b[self._n] = self._b[self._w]
self._n += 1
self._w += 1
nbytes -= 1
self._w = len(self._b)
finally:
machine.enable_irq(ist)
def write(self, w):
# Helper method for the producer to write into the buffer in one call
pw = self.pend_write()
to_w = min(len(w), len(pw))
if to_w:
pw[:to_w] = w[:to_w]
self.finish_write(to_w)
return to_w
def pend_read(self):
# Return a memoryview slice that the consumer can read bytes from
return self._b[: self._n]
def finish_read(self, nbytes):
# Called by the consumer to indicate it read nbytes from the buffer.
if not nbytes:
return
ist = machine.disable_irq()
try:
assert nbytes <= self._n # can't say we read more than was available
i = 0
self._n -= nbytes
while i < self._n:
# consumer only read part of the buffer, so shuffle remaining
# read data back towards index 0 to avoid fragmentation
self._b[i] = self._b[i + nbytes]
i += 1
finally:
machine.enable_irq(ist)
def readinto(self, b):
# Helper method for the consumer to read out of the buffer in one call
pr = self.pend_read()
to_r = min(len(pr), len(b))
if to_r:
b[:to_r] = pr[:to_r]
self.finish_read(to_r)
return to_r

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.2.0")
metadata(version="0.2.1")
module("ssl.py", opt=3)

Wyświetl plik

@ -1,12 +1,5 @@
import tls
from tls import (
CERT_NONE,
CERT_OPTIONAL,
CERT_REQUIRED,
MBEDTLS_VERSION,
PROTOCOL_TLS_CLIENT,
PROTOCOL_TLS_SERVER,
)
from tls import *
class SSLContext: