From 8e3767393f3ef48fa03fbc3156c9c12e9cea68c1 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Tue, 7 Apr 2020 18:21:49 +0100 Subject: [PATCH] Add uasyncio_iostream/v3/primitives/ - stopgap primitives for uasyncio V3 --- uasyncio_iostream/v3/__init__.py | 0 uasyncio_iostream/v3/primitives/__init__.py | 19 + uasyncio_iostream/v3/primitives/barrier.py | 68 +++ uasyncio_iostream/v3/primitives/condition.py | 63 +++ uasyncio_iostream/v3/primitives/delay_ms.py | 60 +++ uasyncio_iostream/v3/primitives/message.py | 64 +++ uasyncio_iostream/v3/primitives/pushbutton.py | 97 +++++ uasyncio_iostream/v3/primitives/queue.py | 78 ++++ uasyncio_iostream/v3/primitives/semaphore.py | 37 ++ uasyncio_iostream/v3/primitives/switch.py | 37 ++ .../v3/primitives/tests/__init__.py | 0 .../v3/primitives/tests/asyntest.py | 404 ++++++++++++++++++ .../v3/primitives/tests/switches.py | 137 ++++++ 13 files changed, 1064 insertions(+) create mode 100644 uasyncio_iostream/v3/__init__.py create mode 100644 uasyncio_iostream/v3/primitives/__init__.py create mode 100644 uasyncio_iostream/v3/primitives/barrier.py create mode 100644 uasyncio_iostream/v3/primitives/condition.py create mode 100644 uasyncio_iostream/v3/primitives/delay_ms.py create mode 100644 uasyncio_iostream/v3/primitives/message.py create mode 100644 uasyncio_iostream/v3/primitives/pushbutton.py create mode 100644 uasyncio_iostream/v3/primitives/queue.py create mode 100644 uasyncio_iostream/v3/primitives/semaphore.py create mode 100644 uasyncio_iostream/v3/primitives/switch.py create mode 100644 uasyncio_iostream/v3/primitives/tests/__init__.py create mode 100644 uasyncio_iostream/v3/primitives/tests/asyntest.py create mode 100644 uasyncio_iostream/v3/primitives/tests/switches.py diff --git a/uasyncio_iostream/v3/__init__.py b/uasyncio_iostream/v3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uasyncio_iostream/v3/primitives/__init__.py b/uasyncio_iostream/v3/primitives/__init__.py new file mode 100644 index 0000000..fb4bd3c --- /dev/null +++ b/uasyncio_iostream/v3/primitives/__init__.py @@ -0,0 +1,19 @@ +try: + import uasyncio as asyncio +except ImportError: + import asyncio + + +async def _g(): + pass +type_coro = type(_g()) + +# If a callback is passed, run it and return. +# If a coro is passed initiate it and return. +# coros are passed by name i.e. not using function call syntax. +def launch(func, tup_args): + res = func(*tup_args) + if isinstance(res, type_coro): + loop = asyncio.get_event_loop() + loop.create_task(res) + diff --git a/uasyncio_iostream/v3/primitives/barrier.py b/uasyncio_iostream/v3/primitives/barrier.py new file mode 100644 index 0000000..5aa73a9 --- /dev/null +++ b/uasyncio_iostream/v3/primitives/barrier.py @@ -0,0 +1,68 @@ +try: + import uasyncio as asyncio +except ImportError: + import asyncio + +from . import launch + +# A Barrier synchronises N coros. Each issues await barrier. +# Execution pauses until all other participant coros are waiting on it. +# At that point the callback is executed. Then the barrier is 'opened' and +# execution of all participants resumes. + +# The nowait arg is to support task cancellation. It enables usage where one or +# more coros can register that they have reached the barrier without waiting +# for it. Any coros waiting normally on the barrier will pause until all +# non-waiting coros have passed the barrier and all waiting ones have reached +# it. The use of nowait promotes efficiency by enabling tasks which have been +# cancelled to leave the task queue as soon as possible. + +class Barrier(): + def __init__(self, participants, func=None, args=()): + self._participants = participants + self._func = func + self._args = args + self._reset(True) + + def __await__(self): + self._update() + if self._at_limit(): # All other threads are also at limit + if self._func is not None: + launch(self._func, self._args) + self._reset(not self._down) # Toggle direction to release others + return + + direction = self._down + while True: # Wait until last waiting thread changes the direction + if direction != self._down: + return + await asyncio.sleep_ms(0) + + __iter__ = __await__ + + def trigger(self): + self._update() + if self._at_limit(): # All other threads are also at limit + if self._func is not None: + launch(self._func, self._args) + self._reset(not self._down) # Toggle direction to release others + + def _reset(self, down): + self._down = down + self._count = self._participants if down else 0 + + def busy(self): + if self._down: + done = self._count == self._participants + else: + done = self._count == 0 + return not done + + def _at_limit(self): # Has count reached up or down limit? + limit = 0 if self._down else self._participants + return self._count == limit + + def _update(self): + self._count += -1 if self._down else 1 + if self._count < 0 or self._count > self._participants: + raise ValueError('Too many tasks accessing Barrier') diff --git a/uasyncio_iostream/v3/primitives/condition.py b/uasyncio_iostream/v3/primitives/condition.py new file mode 100644 index 0000000..3172642 --- /dev/null +++ b/uasyncio_iostream/v3/primitives/condition.py @@ -0,0 +1,63 @@ +try: + import uasyncio as asyncio +except ImportError: + import asyncio + +# Condition class +# from primitives.condition import Condition + +class Condition(): + def __init__(self, lock=None): + self.lock = asyncio.Lock() if lock is None else lock + self.events = [] + + async def acquire(self): + await self.lock.acquire() + +# enable this syntax: +# with await condition [as cond]: + def __await__(self): + await self.lock.acquire() + return self + + __iter__ = __await__ + + def __enter__(self): + return self + + def __exit__(self, *_): + self.lock.release() + + def locked(self): + return self.lock.locked() + + def release(self): + self.lock.release() # Will raise RuntimeError if not locked + + def notify(self, n=1): # Caller controls lock + if not self.lock.locked(): + raise RuntimeError('Condition notify with lock not acquired.') + for _ in range(min(n, len(self.events))): + ev = self.events.pop() + ev.set() + + def notify_all(self): + self.notify(len(self.events)) + + async def wait(self): + if not self.lock.locked(): + raise RuntimeError('Condition wait with lock not acquired.') + ev = asyncio.Event() + self.events.append(ev) + self.lock.release() + await ev.wait() + await self.lock.acquire() + assert ev not in self.events, 'condition wait assertion fail' + return True # CPython compatibility + + async def wait_for(self, predicate): + result = predicate() + while not result: + await self.wait() + result = predicate() + return result diff --git a/uasyncio_iostream/v3/primitives/delay_ms.py b/uasyncio_iostream/v3/primitives/delay_ms.py new file mode 100644 index 0000000..f7593b9 --- /dev/null +++ b/uasyncio_iostream/v3/primitives/delay_ms.py @@ -0,0 +1,60 @@ +import uasyncio as asyncio +import utime as time +from . import launch +# Usage: +# from primitives.delay_ms import Delay_ms + +class Delay_ms: + verbose = False + def __init__(self, func=None, args=(), can_alloc=True, duration=1000): + self.func = func + self.args = args + self.can_alloc = can_alloc + self.duration = duration # Default duration + self._tstop = None # Killer not running + self._running = False # Timer not running + if not can_alloc: + asyncio.create_task(self._run()) + + async def _run(self): + while True: + if not self._running: # timer not running + await asyncio.sleep_ms(0) + else: + await self._killer() + + def stop(self): + self._running = False + # If uasyncio is ever fixed we should cancel .killer + + def trigger(self, duration=0): # Update end time + self._running = True + if duration <= 0: + duration = self.duration + tn = time.ticks_add(time.ticks_ms(), duration) # new end time + self.verbose and self._tstop is not None and self._tstop > tn \ + and print("Warning: can't reduce Delay_ms time.") + # Start killer if can allocate and killer is not running + sk = self.can_alloc and self._tstop is None + # The following indicates ._killer is running: it will be + # started either here or in ._run + self._tstop = tn + if sk: # ._killer stops the delay when its period has elapsed + asyncio.create_task(self._killer()) + + def running(self): + return self._running + + __call__ = running + + async def _killer(self): + twait = time.ticks_diff(self._tstop, time.ticks_ms()) + while twait > 0: # Must loop here: might be retriggered + await asyncio.sleep_ms(twait) + if self._tstop is None: + break # Return if stop() called during wait + twait = time.ticks_diff(self._tstop, time.ticks_ms()) + if self._running and self.func is not None: + launch(self.func, self.args) # Timed out: execute callback + self._tstop = None # killer not running + self._running = False # timer is stopped diff --git a/uasyncio_iostream/v3/primitives/message.py b/uasyncio_iostream/v3/primitives/message.py new file mode 100644 index 0000000..d685758 --- /dev/null +++ b/uasyncio_iostream/v3/primitives/message.py @@ -0,0 +1,64 @@ +try: + import uasyncio as asyncio +except ImportError: + import asyncio +# Usage: +# from primitives.message import Message + +# A coro waiting on a message issues await message +# A coro rasing the message issues message.set(payload) +# When all waiting coros have run +# message.clear() should be issued + +# This more efficient version is commented out because Event.set is not ISR +# friendly. TODO If it gets fixed, reinstate this (tested) version. +#class Message(asyncio.Event): + #def __init__(self, _=0): + #self._data = None + #super().__init__() + + #def clear(self): + #self._data = None + #super().clear() + + #def __await__(self): + #await super().wait() + + #__iter__ = __await__ + + #def set(self, data=None): + #self._data = data + #super().set() + + #def value(self): + #return self._data + +# Has an ISR-friendly .set() +class Message(): + def __init__(self, delay_ms=0): + self.delay_ms = delay_ms + self.clear() + + def clear(self): + self._flag = False + self._data = None + + async def wait(self): # CPython comptaibility + while not self._flag: + await asyncio.sleep_ms(self.delay_ms) + + def __await__(self): + while not self._flag: + await asyncio.sleep_ms(self.delay_ms) + + __iter__ = __await__ + + def is_set(self): + return self._flag + + def set(self, data=None): + self._flag = True + self._data = data + + def value(self): + return self._data diff --git a/uasyncio_iostream/v3/primitives/pushbutton.py b/uasyncio_iostream/v3/primitives/pushbutton.py new file mode 100644 index 0000000..a22d231 --- /dev/null +++ b/uasyncio_iostream/v3/primitives/pushbutton.py @@ -0,0 +1,97 @@ +import uasyncio as asyncio +import utime as time +from . import launch +from primitives.delay_ms import Delay_ms + + +# An alternative Pushbutton solution with lower RAM use is available here +# https://github.com/kevinkk525/pysmartnode/blob/dev/pysmartnode/utils/abutton.py +class Pushbutton: + debounce_ms = 50 + long_press_ms = 1000 + double_click_ms = 400 + def __init__(self, pin, suppress=False): + self.pin = pin # Initialise for input + self._supp = suppress + self._dblpend = False # Doubleclick waiting for 2nd click + self._dblran = False # Doubleclick executed user function + self._tf = False + self._ff = False + self._df = False + self._lf = False + self._ld = False # Delay_ms instance for long press + self._dd = False # Ditto for doubleclick + self.sense = pin.value() # Convert from electrical to logical value + self.state = self.rawstate() # Initial state + asyncio.create_task(self.buttoncheck()) # Thread runs forever + + def press_func(self, func, args=()): + self._tf = func + self._ta = args + + def release_func(self, func, args=()): + self._ff = func + self._fa = args + + def double_func(self, func, args=()): + self._df = func + self._da = args + + def long_func(self, func, args=()): + self._lf = func + self._la = args + + # Current non-debounced logical button state: True == pressed + def rawstate(self): + return bool(self.pin.value() ^ self.sense) + + # Current debounced state of button (True == pressed) + def __call__(self): + return self.state + + def _ddto(self): # Doubleclick timeout: no doubleclick occurred + self._dblpend = False + if self._supp and not self.state: + if not self._ld or (self._ld and not self._ld()): + launch(self._ff, self._fa) + + async def buttoncheck(self): + if self._lf: # Instantiate timers if funcs exist + self._ld = Delay_ms(self._lf, self._la) + if self._df: + self._dd = Delay_ms(self._ddto) + while True: + state = self.rawstate() + # State has changed: act on it now. + if state != self.state: + self.state = state + if state: # Button pressed: launch pressed func + if self._tf: + launch(self._tf, self._ta) + if self._lf: # There's a long func: start long press delay + self._ld.trigger(Pushbutton.long_press_ms) + if self._df: + if self._dd(): # Second click: timer running + self._dd.stop() + self._dblpend = False + self._dblran = True # Prevent suppressed launch on release + launch(self._df, self._da) + else: + # First click: start doubleclick timer + self._dd.trigger(Pushbutton.double_click_ms) + self._dblpend = True # Prevent suppressed launch on release + else: # Button release. Is there a release func? + if self._ff: + if self._supp: + d = self._ld + # If long delay exists, is running and doubleclick status is OK + if not self._dblpend and not self._dblran: + if (d and d()) or not d: + launch(self._ff, self._fa) + else: + launch(self._ff, self._fa) + if self._ld: + self._ld.stop() # Avoid interpreting a second click as a long push + self._dblran = False + # Ignore state changes until switch has settled + await asyncio.sleep_ms(Pushbutton.debounce_ms) diff --git a/uasyncio_iostream/v3/primitives/queue.py b/uasyncio_iostream/v3/primitives/queue.py new file mode 100644 index 0000000..6d60557 --- /dev/null +++ b/uasyncio_iostream/v3/primitives/queue.py @@ -0,0 +1,78 @@ +# queue.py: adapted from uasyncio V2 +# Code is based on Paul Sokolovsky's work. +# This is a temporary solution until uasyncio V3 gets an efficient official version + +from ucollections import deque +import uasyncio as asyncio + + +# Exception raised by get_nowait(). +class QueueEmpty(Exception): + pass + + +# Exception raised by put_nowait(). +class QueueFull(Exception): + pass + +# A queue, useful for coordinating producer and consumer coroutines. + +# If maxsize is less than or equal to zero, the queue size is infinite. If it +# is an integer greater than 0, then "await put()" will block when the +# queue reaches maxsize, until an item is removed by get(). + +# Unlike the standard library Queue, you can reliably know this Queue's size +# with qsize(), since your single-threaded uasyncio application won't be +# interrupted between calling qsize() and doing an operation on the Queue. + + +class Queue: + + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._queue = deque((), maxsize) + + def _get(self): + return self._queue.popleft() + + async def get(self): # Usage: item = await queue.get() + while not self._queue: + # Queue is empty, put the calling Task on the waiting queue + await asyncio.sleep_ms(0) + return self._get() + + def get_nowait(self): # Remove and return an item from the queue. + # Return an item if one is immediately available, else raise QueueEmpty. + if not self._queue: + raise QueueEmpty() + return self._get() + + def _put(self, val): + self._queue.append(val) + + async def put(self, val): # Usage: await queue.put(item) + while self.qsize() >= self.maxsize and self.maxsize: + # Queue full + await asyncio.sleep_ms(0) + # Task(s) waiting to get from queue, schedule first Task + self._put(val) + + def put_nowait(self, val): # Put an item into the queue without blocking. + if self.qsize() >= self.maxsize and self.maxsize: + raise QueueFull() + self._put(val) + + def qsize(self): # Number of items in the queue. + return len(self._queue) + + def empty(self): # Return True if the queue is empty, False otherwise. + return not self._queue + + def full(self): # Return True if there are maxsize items in the queue. + # Note: if the Queue was initialized with maxsize=0 (the default), + # then full() is never True. + + if self.maxsize <= 0: + return False + else: + return self.qsize() >= self.maxsize diff --git a/uasyncio_iostream/v3/primitives/semaphore.py b/uasyncio_iostream/v3/primitives/semaphore.py new file mode 100644 index 0000000..99f3a66 --- /dev/null +++ b/uasyncio_iostream/v3/primitives/semaphore.py @@ -0,0 +1,37 @@ +try: + import uasyncio as asyncio +except ImportError: + import asyncio + +# A Semaphore is typically used to limit the number of coros running a +# particular piece of code at once. The number is defined in the constructor. +class Semaphore(): + def __init__(self, value=1): + self._count = value + + async def __aenter__(self): + await self.acquire() + return self + + async def __aexit__(self, *args): + self.release() + await asyncio.sleep(0) + + async def acquire(self): + while self._count == 0: + await asyncio.sleep_ms(0) + self._count -= 1 + + def release(self): + self._count += 1 + +class BoundedSemaphore(Semaphore): + def __init__(self, value=1): + super().__init__(value) + self._initial_value = value + + def release(self): + if self._count < self._initial_value: + self._count += 1 + else: + raise ValueError('Semaphore released more than acquired') diff --git a/uasyncio_iostream/v3/primitives/switch.py b/uasyncio_iostream/v3/primitives/switch.py new file mode 100644 index 0000000..e25d65b --- /dev/null +++ b/uasyncio_iostream/v3/primitives/switch.py @@ -0,0 +1,37 @@ +import uasyncio as asyncio +import utime as time +from . import launch + +class Switch: + debounce_ms = 50 + def __init__(self, pin): + self.pin = pin # Should be initialised for input with pullup + self._open_func = False + self._close_func = False + self.switchstate = self.pin.value() # Get initial state + asyncio.create_task(self.switchcheck()) # Thread runs forever + + def open_func(self, func, args=()): + self._open_func = func + self._open_args = args + + def close_func(self, func, args=()): + self._close_func = func + self._close_args = args + + # Return current state of switch (0 = pressed) + def __call__(self): + return self.switchstate + + async def switchcheck(self): + while True: + state = self.pin.value() + if state != self.switchstate: + # State has changed: act on it now. + self.switchstate = state + if state == 0 and self._close_func: + launch(self._close_func, self._close_args) + elif state == 1 and self._open_func: + launch(self._open_func, self._open_args) + # Ignore further state changes until switch has settled + await asyncio.sleep_ms(Switch.debounce_ms) diff --git a/uasyncio_iostream/v3/primitives/tests/__init__.py b/uasyncio_iostream/v3/primitives/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uasyncio_iostream/v3/primitives/tests/asyntest.py b/uasyncio_iostream/v3/primitives/tests/asyntest.py new file mode 100644 index 0000000..e6fcb17 --- /dev/null +++ b/uasyncio_iostream/v3/primitives/tests/asyntest.py @@ -0,0 +1,404 @@ +# asyntest.py Test/demo of the 'micro' Event, Barrier and Semaphore classes +# Test/demo of official asyncio library and official Lock class + +# The MIT License (MIT) +# +# Copyright (c) 2017-2018 Peter Hinch +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +# CPython 3.5 compatibility +# (ignore RuntimeWarning: coroutine '_g' was never awaited) +# To run: +# from primitives.tests.asyntest import test + +try: + import uasyncio as asyncio +except ImportError: + import asyncio + +from primitives.message import Message +from primitives.barrier import Barrier +from primitives.semaphore import Semaphore, BoundedSemaphore +from primitives.condition import Condition + +def print_tests(): + st = '''Available functions: +test(0) Print this list. +test(1) Test message acknowledge. +test(2) Test Messge and Lock objects. +test(3) Test the Barrier class. +test(4) Test Semaphore +test(5) Test BoundedSemaphore. +test(6) Test the Condition class. +test(7) Test the Queue class. + +Recommended to issue ctrl-D after running each test. +''' + print('\x1b[32m') + print(st) + print('\x1b[39m') + +print_tests() + +def printexp(exp, runtime=0): + print('Expected output:') + print('\x1b[32m') + print(exp) + print('\x1b[39m') + if runtime: + print('Running (runtime = {}s):'.format(runtime)) + else: + print('Running (runtime < 1s):') + +# ************ Test Message class ************ +# Demo use of acknowledge message + +async def message_wait(message, ack_message, n): + await message + print('message_wait {} got message with value {}'.format(n, message.value())) + ack_message.set() + +async def run_ack(): + message = Message() + ack1 = Message() + ack2 = Message() + count = 0 + while True: + asyncio.create_task(message_wait(message, ack1, 1)) + asyncio.create_task(message_wait(message, ack2, 2)) + message.set(count) + count += 1 + print('message was set') + await ack1 + ack1.clear() + print('Cleared ack1') + await ack2 + ack2.clear() + print('Cleared ack2') + message.clear() + print('Cleared message') + await asyncio.sleep(1) + +async def ack_coro(delay): + print('Started ack coro with delay', delay) + await asyncio.sleep(delay) + print("I've seen attack ships burn on the shoulder of Orion...") + print("Time to die...") + +def ack_test(): + printexp('''message was set +message_wait 1 got message with value 0 +message_wait 2 got message with value 0 +Cleared ack1 +Cleared ack2 +Cleared message +message was set +message_wait 1 got message with value 1 +message_wait 2 got message with value 1 + +... text omitted ... + +message_wait 1 got message with value 5 +message_wait 2 got message with value 5 +Cleared ack1 +Cleared ack2 +Cleared message +I've seen attack ships burn on the shoulder of Orion... +Time to die... +''', 10) + asyncio.create_task(run_ack()) + asyncio.run(ack_coro(6)) + +# ************ Test Lock and Message classes ************ + +async def run_lock(n, lock): + print('run_lock {} waiting for lock'.format(n)) + await lock.acquire() + print('run_lock {} acquired lock'.format(n)) + await asyncio.sleep(1) # Delay to demo other coros waiting for lock + lock.release() + print('run_lock {} released lock'.format(n)) + +async def messageset(message): + print('Waiting 5 secs before setting message') + await asyncio.sleep(5) + message.set() + print('message was set') + +async def messagewait(message): + print('waiting for message') + await message + print('got message') + message.clear() + +async def run_message_test(): + print('Test Lock class') + lock = asyncio.Lock() + asyncio.create_task(run_lock(1, lock)) + asyncio.create_task(run_lock(2, lock)) + asyncio.create_task(run_lock(3, lock)) + print('Test Message class') + message = Message() + asyncio.create_task(messageset(message)) + await messagewait(message) # run_message_test runs fast until this point + print('Message status {}'.format('Incorrect' if message.is_set() else 'OK')) + print('Tasks complete') + +def msg_test(): + printexp('''Test Lock class +Test Message class +waiting for message +run_lock 1 waiting for lock +run_lock 1 acquired lock +run_lock 2 waiting for lock +run_lock 3 waiting for lock +Waiting 5 secs before setting message +run_lock 1 released lock +run_lock 2 acquired lock +run_lock 2 released lock +run_lock 3 acquired lock +run_lock 3 released lock +message was set +got message +Message status OK +Tasks complete +''', 5) + asyncio.run(run_message_test()) + +# ************ Barrier test ************ + +async def killer(duration): + await asyncio.sleep(duration) + +def callback(text): + print(text) + +async def report(barrier): + for i in range(5): + print('{} '.format(i), end='') + await barrier + +def barrier_test(): + printexp('''0 0 0 Synch +1 1 1 Synch +2 2 2 Synch +3 3 3 Synch +4 4 4 Synch +''') + barrier = Barrier(3, callback, ('Synch',)) + for _ in range(3): + asyncio.create_task(report(barrier)) + asyncio.run(killer(2)) + +# ************ Semaphore test ************ + +async def run_sema(n, sema, barrier): + print('run_sema {} trying to access semaphore'.format(n)) + async with sema: + print('run_sema {} acquired semaphore'.format(n)) + # Delay demonstrates other coros waiting for semaphore + await asyncio.sleep(1 + n/10) # n/10 ensures deterministic printout + print('run_sema {} has released semaphore'.format(n)) + barrier.trigger() + +async def run_sema_test(bounded): + num_coros = 5 + barrier = Barrier(num_coros + 1) + if bounded: + semaphore = BoundedSemaphore(3) + else: + semaphore = Semaphore(3) + for n in range(num_coros): + asyncio.create_task(run_sema(n, semaphore, barrier)) + await barrier # Quit when all coros complete + try: + semaphore.release() + except ValueError: + print('Bounded semaphore exception test OK') + +def semaphore_test(bounded=False): + if bounded: + exp = '''run_sema 0 trying to access semaphore +run_sema 0 acquired semaphore +run_sema 1 trying to access semaphore +run_sema 1 acquired semaphore +run_sema 2 trying to access semaphore +run_sema 2 acquired semaphore +run_sema 3 trying to access semaphore +run_sema 4 trying to access semaphore +run_sema 0 has released semaphore +run_sema 4 acquired semaphore +run_sema 1 has released semaphore +run_sema 3 acquired semaphore +run_sema 2 has released semaphore +run_sema 4 has released semaphore +run_sema 3 has released semaphore +Bounded semaphore exception test OK + +Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.''' + else: + exp = '''run_sema 0 trying to access semaphore +run_sema 0 acquired semaphore +run_sema 1 trying to access semaphore +run_sema 1 acquired semaphore +run_sema 2 trying to access semaphore +run_sema 2 acquired semaphore +run_sema 3 trying to access semaphore +run_sema 4 trying to access semaphore +run_sema 0 has released semaphore +run_sema 3 acquired semaphore +run_sema 1 has released semaphore +run_sema 4 acquired semaphore +run_sema 2 has released semaphore +run_sema 3 has released semaphore +run_sema 4 has released semaphore + +Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.''' + printexp(exp, 3) + asyncio.run(run_sema_test(bounded)) + +# ************ Condition test ************ + +cond = Condition() +tim = 0 + +async def cond01(): + while True: + await asyncio.sleep(2) + with await cond: + cond.notify(2) # Notify 2 tasks + +async def cond03(): # Maintain a count of seconds + global tim + await asyncio.sleep(0.5) + while True: + await asyncio.sleep(1) + tim += 1 + +async def cond02(n, barrier): + with await cond: + print('cond02', n, 'Awaiting notification.') + await cond.wait() + print('cond02', n, 'triggered. tim =', tim) + barrier.trigger() + +def predicate(): + return tim >= 8 # 12 + +async def cond04(n, barrier): + with await cond: + print('cond04', n, 'Awaiting notification and predicate.') + await cond.wait_for(predicate) + print('cond04', n, 'triggered. tim =', tim) + barrier.trigger() + +async def cond_go(): + ntasks = 7 + barrier = Barrier(ntasks + 1) + t1 = asyncio.create_task(cond01()) + t3 = asyncio.create_task(cond03()) + for n in range(ntasks): + asyncio.create_task(cond02(n, barrier)) + await barrier # All instances of cond02 have completed + # Test wait_for + barrier = Barrier(2) + asyncio.create_task(cond04(99, barrier)) + await barrier + # cancel continuously running coros. + t1.cancel() + t3.cancel() + await asyncio.sleep_ms(0) + print('Done.') + +def condition_test(): + printexp('''cond02 0 Awaiting notification. +cond02 1 Awaiting notification. +cond02 2 Awaiting notification. +cond02 3 Awaiting notification. +cond02 4 Awaiting notification. +cond02 5 Awaiting notification. +cond02 6 Awaiting notification. +cond02 5 triggered. tim = 1 +cond02 6 triggered. tim = 1 +cond02 3 triggered. tim = 3 +cond02 4 triggered. tim = 3 +cond02 1 triggered. tim = 5 +cond02 2 triggered. tim = 5 +cond02 0 triggered. tim = 7 +cond04 99 Awaiting notification and predicate. +cond04 99 triggered. tim = 9 +Done. +''', 13) + asyncio.run(cond_go()) + +# ************ Queue test ************ + +from primitives.queue import Queue +q = Queue() + +async def slow_process(): + await asyncio.sleep(2) + return 42 + +async def bar(): + print('Waiting for slow process.') + result = await slow_process() + print('Putting result onto queue') + await q.put(result) # Put result on q + +async def foo(): + print("Running foo()") + result = await q.get() + print('Result was {}'.format(result)) + +async def queue_go(delay): + asyncio.create_task(foo()) + asyncio.create_task(bar()) + await asyncio.sleep(delay) + print("I've seen starships burn off the shoulder of Orion...") + print("Time to die...") + +def queue_test(): + printexp('''Running (runtime = 3s): +Running foo() +Waiting for slow process. +Putting result onto queue +I've seen starships burn off the shoulder of Orion... +Time to die... +''', 3) + asyncio.run(queue_go(3)) + +def test(n): + if n == 0: + print_tests() # Print this list. + elif n == 1: + ack_test() # Test message acknowledge. + elif n == 2: + msg_test() # Test Messge and Lock objects. + elif n == 3: + barrier_test() # Test the Barrier class. + elif n == 4: + semaphore_test(False) # Test Semaphore + elif n == 5: + semaphore_test(True) # Test BoundedSemaphore. + elif n == 6: + condition_test() # Test the Condition class. + elif n == 7: + queue_test() # Test the Queue class. diff --git a/uasyncio_iostream/v3/primitives/tests/switches.py b/uasyncio_iostream/v3/primitives/tests/switches.py new file mode 100644 index 0000000..b90f92a --- /dev/null +++ b/uasyncio_iostream/v3/primitives/tests/switches.py @@ -0,0 +1,137 @@ +# Test/demo programs for Switch and Pushbutton classes +# Tested on Pyboard but should run on other microcontroller platforms +# running MicroPython with uasyncio library. +# Author: Peter Hinch. +# Copyright Peter Hinch 2017-2020 Released under the MIT license. + +# To run: +# from primitives.tests.switches import * +# test_sw() # For example + +from machine import Pin +from pyb import LED +from primitives.switch import Switch +from primitives.pushbutton import Pushbutton +import uasyncio as asyncio + +helptext = ''' +Test using switch or pushbutton between X1 and gnd. +Ground pin X2 to terminate test. +Soft reset (ctrl-D) after each test. + +''' +tests = ''' +Available tests: +test_sw Switch test +test_swcb Switch with callback +test_btn Pushutton launching coros +test_btncb Pushbutton launching callbacks +''' +print(tests) + +# Pulse an LED (coroutine) +async def pulse(led, ms): + led.on() + await asyncio.sleep_ms(ms) + led.off() + +# Toggle an LED (callback) +def toggle(led): + led.toggle() + +# Quit test by connecting X2 to ground +async def killer(): + pin = Pin('X2', Pin.IN, Pin.PULL_UP) + while pin.value(): + await asyncio.sleep_ms(50) + +# Test for the Switch class passing coros +def test_sw(): + s = ''' +close pulses green +open pulses red +''' + print('Test of switch scheduling coroutines.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + sw = Switch(pin) + # Register coros to launch on contact close and open + sw.close_func(pulse, (green, 1000)) + sw.open_func(pulse, (red, 1000)) + loop = asyncio.get_event_loop() + loop.run_until_complete(killer()) + +# Test for the switch class with a callback +def test_swcb(): + s = ''' +close toggles red +open toggles green +''' + print('Test of switch executing callbacks.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + sw = Switch(pin) + # Register a coro to launch on contact close + sw.close_func(toggle, (red,)) + sw.open_func(toggle, (green,)) + loop = asyncio.get_event_loop() + loop.run_until_complete(killer()) + +# Test for the Pushbutton class (coroutines) +# Pass True to test suppress +def test_btn(suppress=False, lf=True, df=True): + s = ''' +press pulses red +release pulses green +double click pulses yellow +long press pulses blue +''' + print('Test of pushbutton scheduling coroutines.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + yellow = LED(3) + blue = LED(4) + pb = Pushbutton(pin, suppress) + pb.press_func(pulse, (red, 1000)) + pb.release_func(pulse, (green, 1000)) + if df: + print('Doubleclick enabled') + pb.double_func(pulse, (yellow, 1000)) + if lf: + print('Long press enabled') + pb.long_func(pulse, (blue, 1000)) + loop = asyncio.get_event_loop() + loop.run_until_complete(killer()) + +# Test for the Pushbutton class (callbacks) +def test_btncb(): + s = ''' +press toggles red +release toggles green +double click toggles yellow +long press toggles blue +''' + print('Test of pushbutton executing callbacks.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + yellow = LED(3) + blue = LED(4) + pb = Pushbutton(pin) + pb.press_func(toggle, (red,)) + pb.release_func(toggle, (green,)) + pb.double_func(toggle, (yellow,)) + pb.long_func(toggle, (blue,)) + loop = asyncio.get_event_loop() + loop.run_until_complete(killer())