diff --git a/bluetooth/ble_advertising.py b/bluetooth/ble_advertising.py new file mode 100644 index 0000000..a50582f --- /dev/null +++ b/bluetooth/ble_advertising.py @@ -0,0 +1,93 @@ +# Helpers for generating BLE advertising payloads. + +from micropython import const +import struct +import bluetooth + +# Advertising payloads are repeated packets of the following form: +# 1 byte data length (N + 1) +# 1 byte type (see constants below) +# N bytes type-specific data + +_ADV_TYPE_FLAGS = const(0x01) +_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_UUID16_COMPLETE = const(0x3) +_ADV_TYPE_UUID32_COMPLETE = const(0x5) +_ADV_TYPE_UUID128_COMPLETE = const(0x7) +_ADV_TYPE_UUID16_MORE = const(0x2) +_ADV_TYPE_UUID32_MORE = const(0x4) +_ADV_TYPE_UUID128_MORE = const(0x6) +_ADV_TYPE_APPEARANCE = const(0x19) + + +# Generate a payload to be passed to gap_advertise(adv_data=...). +def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): + payload = bytearray() + + def _append(adv_type, value): + nonlocal payload + payload += struct.pack("BB", len(value) + 1, adv_type) + value + + _append( + _ADV_TYPE_FLAGS, + struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), + ) + + if name: + _append(_ADV_TYPE_NAME, name) + + if services: + for uuid in services: + b = bytes(uuid) + if len(b) == 2: + _append(_ADV_TYPE_UUID16_COMPLETE, b) + elif len(b) == 4: + _append(_ADV_TYPE_UUID32_COMPLETE, b) + elif len(b) == 16: + _append(_ADV_TYPE_UUID128_COMPLETE, b) + + # See org.bluetooth.characteristic.gap.appearance.xml + if appearance: + _append(_ADV_TYPE_APPEARANCE, struct.pack(" 0): + for i in range(flash_count): + self._led.on() + time.sleep_ms(100) + self._led.off() + time.sleep_ms(100) + delay_ms -= 200 + time.sleep_ms(1000) + delay_ms -= 1000 + +def print_temp(result): + print("read temp: %.2f degc" % result) + +def demo(ble, central): + not_found = False + + def on_scan(addr_type, addr, name): + if addr_type is not None: + print("Found sensor: %s" % name) + central.connect() + else: + nonlocal not_found + not_found = True + print("No sensor found.") + + central.scan(callback=on_scan) + + # Wait for connection... + while not central.is_connected(): + time.sleep_ms(100) + if not_found: + return + + print("Connected") + + # Explicitly issue reads + while central.is_connected(): + central.read(callback=print_temp) + sleep_ms_flash_led(central, 2, 2000) + + print("Disconnected") + +if __name__ == "__main__": + ble = bluetooth.BLE() + central = BLETemperatureCentral(ble) + while(True): + demo(ble, central) + sleep_ms_flash_led(central, 1, 10000) \ No newline at end of file diff --git a/bluetooth/picow_ble_temp_sensor.py b/bluetooth/picow_ble_temp_sensor.py new file mode 100644 index 0000000..c55dcc8 --- /dev/null +++ b/bluetooth/picow_ble_temp_sensor.py @@ -0,0 +1,107 @@ +# This example demonstrates a simple temperature sensor peripheral. +# +# The sensor's local value is updated, and it will notify +# any connected central every 10 seconds. + +import bluetooth +import random +import struct +import time +import machine +import ubinascii +from ble_advertising import advertising_payload +from micropython import const +from machine import Pin + +_IRQ_CENTRAL_CONNECT = const(1) +_IRQ_CENTRAL_DISCONNECT = const(2) +_IRQ_GATTS_INDICATE_DONE = const(20) + +_FLAG_READ = const(0x0002) +_FLAG_NOTIFY = const(0x0010) +_FLAG_INDICATE = const(0x0020) + +# org.bluetooth.service.environmental_sensing +_ENV_SENSE_UUID = bluetooth.UUID(0x181A) +# org.bluetooth.characteristic.temperature +_TEMP_CHAR = ( + bluetooth.UUID(0x2A6E), + _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE, +) +_ENV_SENSE_SERVICE = ( + _ENV_SENSE_UUID, + (_TEMP_CHAR,), +) + +# org.bluetooth.characteristic.gap.appearance.xml +_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768) + +class BLETemperature: + def __init__(self, ble, name=""): + self._sensor_temp = machine.ADC(4) + self._ble = ble + self._ble.active(True) + self._ble.irq(self._irq) + ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,)) + self._connections = set() + if len(name) == 0: + name = 'Pico %s' % ubinascii.hexlify(self._ble.config('mac')[1],':').decode().upper() + print('Sensor name %s' % name) + self._payload = advertising_payload( + name=name, services=[_ENV_SENSE_UUID] + ) + self._advertise() + + def _irq(self, event, data): + # Track connections so we can send notifications. + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, _, _ = data + self._connections.add(conn_handle) + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _ = data + self._connections.remove(conn_handle) + # Start advertising again to allow a new connection. + self._advertise() + elif event == _IRQ_GATTS_INDICATE_DONE: + conn_handle, value_handle, status = data + + def update_temperature(self, notify=False, indicate=False): + # Write the local value, ready for a central to read. + temp_deg_c = self._get_temp() + print("write temp %.2f degc" % temp_deg_c); + self._ble.gatts_write(self._handle, struct.pack("