Add uasyncio_iostream/v3/primitives/ - stopgap primitives for uasyncio V3

pull/15/head
Peter Hinch 2020-04-07 18:21:49 +01:00
rodzic 7a7ab42dd7
commit 8e3767393f
13 zmienionych plików z 1064 dodań i 0 usunięć

Wyświetl plik

@ -0,0 +1,19 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
async def _g():
pass
type_coro = type(_g())
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, type_coro):
loop = asyncio.get_event_loop()
loop.create_task(res)

Wyświetl plik

@ -0,0 +1,68 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from . import launch
# A Barrier synchronises N coros. Each issues await barrier.
# Execution pauses until all other participant coros are waiting on it.
# At that point the callback is executed. Then the barrier is 'opened' and
# execution of all participants resumes.
# The nowait arg is to support task cancellation. It enables usage where one or
# more coros can register that they have reached the barrier without waiting
# for it. Any coros waiting normally on the barrier will pause until all
# non-waiting coros have passed the barrier and all waiting ones have reached
# it. The use of nowait promotes efficiency by enabling tasks which have been
# cancelled to leave the task queue as soon as possible.
class Barrier():
def __init__(self, participants, func=None, args=()):
self._participants = participants
self._func = func
self._args = args
self._reset(True)
def __await__(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
return
direction = self._down
while True: # Wait until last waiting thread changes the direction
if direction != self._down:
return
await asyncio.sleep_ms(0)
__iter__ = __await__
def trigger(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
def _reset(self, down):
self._down = down
self._count = self._participants if down else 0
def busy(self):
if self._down:
done = self._count == self._participants
else:
done = self._count == 0
return not done
def _at_limit(self): # Has count reached up or down limit?
limit = 0 if self._down else self._participants
return self._count == limit
def _update(self):
self._count += -1 if self._down else 1
if self._count < 0 or self._count > self._participants:
raise ValueError('Too many tasks accessing Barrier')

Wyświetl plik

@ -0,0 +1,63 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# Condition class
# from primitives.condition import Condition
class Condition():
def __init__(self, lock=None):
self.lock = asyncio.Lock() if lock is None else lock
self.events = []
async def acquire(self):
await self.lock.acquire()
# enable this syntax:
# with await condition [as cond]:
def __await__(self):
await self.lock.acquire()
return self
__iter__ = __await__
def __enter__(self):
return self
def __exit__(self, *_):
self.lock.release()
def locked(self):
return self.lock.locked()
def release(self):
self.lock.release() # Will raise RuntimeError if not locked
def notify(self, n=1): # Caller controls lock
if not self.lock.locked():
raise RuntimeError('Condition notify with lock not acquired.')
for _ in range(min(n, len(self.events))):
ev = self.events.pop()
ev.set()
def notify_all(self):
self.notify(len(self.events))
async def wait(self):
if not self.lock.locked():
raise RuntimeError('Condition wait with lock not acquired.')
ev = asyncio.Event()
self.events.append(ev)
self.lock.release()
await ev.wait()
await self.lock.acquire()
assert ev not in self.events, 'condition wait assertion fail'
return True # CPython compatibility
async def wait_for(self, predicate):
result = predicate()
while not result:
await self.wait()
result = predicate()
return result

Wyświetl plik

@ -0,0 +1,60 @@
import uasyncio as asyncio
import utime as time
from . import launch
# Usage:
# from primitives.delay_ms import Delay_ms
class Delay_ms:
verbose = False
def __init__(self, func=None, args=(), can_alloc=True, duration=1000):
self.func = func
self.args = args
self.can_alloc = can_alloc
self.duration = duration # Default duration
self._tstop = None # Killer not running
self._running = False # Timer not running
if not can_alloc:
asyncio.create_task(self._run())
async def _run(self):
while True:
if not self._running: # timer not running
await asyncio.sleep_ms(0)
else:
await self._killer()
def stop(self):
self._running = False
# If uasyncio is ever fixed we should cancel .killer
def trigger(self, duration=0): # Update end time
self._running = True
if duration <= 0:
duration = self.duration
tn = time.ticks_add(time.ticks_ms(), duration) # new end time
self.verbose and self._tstop is not None and self._tstop > tn \
and print("Warning: can't reduce Delay_ms time.")
# Start killer if can allocate and killer is not running
sk = self.can_alloc and self._tstop is None
# The following indicates ._killer is running: it will be
# started either here or in ._run
self._tstop = tn
if sk: # ._killer stops the delay when its period has elapsed
asyncio.create_task(self._killer())
def running(self):
return self._running
__call__ = running
async def _killer(self):
twait = time.ticks_diff(self._tstop, time.ticks_ms())
while twait > 0: # Must loop here: might be retriggered
await asyncio.sleep_ms(twait)
if self._tstop is None:
break # Return if stop() called during wait
twait = time.ticks_diff(self._tstop, time.ticks_ms())
if self._running and self.func is not None:
launch(self.func, self.args) # Timed out: execute callback
self._tstop = None # killer not running
self._running = False # timer is stopped

Wyświetl plik

@ -0,0 +1,64 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# Usage:
# from primitives.message import Message
# A coro waiting on a message issues await message
# A coro rasing the message issues message.set(payload)
# When all waiting coros have run
# message.clear() should be issued
# This more efficient version is commented out because Event.set is not ISR
# friendly. TODO If it gets fixed, reinstate this (tested) version.
#class Message(asyncio.Event):
#def __init__(self, _=0):
#self._data = None
#super().__init__()
#def clear(self):
#self._data = None
#super().clear()
#def __await__(self):
#await super().wait()
#__iter__ = __await__
#def set(self, data=None):
#self._data = data
#super().set()
#def value(self):
#return self._data
# Has an ISR-friendly .set()
class Message():
def __init__(self, delay_ms=0):
self.delay_ms = delay_ms
self.clear()
def clear(self):
self._flag = False
self._data = None
async def wait(self): # CPython comptaibility
while not self._flag:
await asyncio.sleep_ms(self.delay_ms)
def __await__(self):
while not self._flag:
await asyncio.sleep_ms(self.delay_ms)
__iter__ = __await__
def is_set(self):
return self._flag
def set(self, data=None):
self._flag = True
self._data = data
def value(self):
return self._data

Wyświetl plik

@ -0,0 +1,97 @@
import uasyncio as asyncio
import utime as time
from . import launch
from primitives.delay_ms import Delay_ms
# An alternative Pushbutton solution with lower RAM use is available here
# https://github.com/kevinkk525/pysmartnode/blob/dev/pysmartnode/utils/abutton.py
class Pushbutton:
debounce_ms = 50
long_press_ms = 1000
double_click_ms = 400
def __init__(self, pin, suppress=False):
self.pin = pin # Initialise for input
self._supp = suppress
self._dblpend = False # Doubleclick waiting for 2nd click
self._dblran = False # Doubleclick executed user function
self._tf = False
self._ff = False
self._df = False
self._lf = False
self._ld = False # Delay_ms instance for long press
self._dd = False # Ditto for doubleclick
self.sense = pin.value() # Convert from electrical to logical value
self.state = self.rawstate() # Initial state
asyncio.create_task(self.buttoncheck()) # Thread runs forever
def press_func(self, func, args=()):
self._tf = func
self._ta = args
def release_func(self, func, args=()):
self._ff = func
self._fa = args
def double_func(self, func, args=()):
self._df = func
self._da = args
def long_func(self, func, args=()):
self._lf = func
self._la = args
# Current non-debounced logical button state: True == pressed
def rawstate(self):
return bool(self.pin.value() ^ self.sense)
# Current debounced state of button (True == pressed)
def __call__(self):
return self.state
def _ddto(self): # Doubleclick timeout: no doubleclick occurred
self._dblpend = False
if self._supp and not self.state:
if not self._ld or (self._ld and not self._ld()):
launch(self._ff, self._fa)
async def buttoncheck(self):
if self._lf: # Instantiate timers if funcs exist
self._ld = Delay_ms(self._lf, self._la)
if self._df:
self._dd = Delay_ms(self._ddto)
while True:
state = self.rawstate()
# State has changed: act on it now.
if state != self.state:
self.state = state
if state: # Button pressed: launch pressed func
if self._tf:
launch(self._tf, self._ta)
if self._lf: # There's a long func: start long press delay
self._ld.trigger(Pushbutton.long_press_ms)
if self._df:
if self._dd(): # Second click: timer running
self._dd.stop()
self._dblpend = False
self._dblran = True # Prevent suppressed launch on release
launch(self._df, self._da)
else:
# First click: start doubleclick timer
self._dd.trigger(Pushbutton.double_click_ms)
self._dblpend = True # Prevent suppressed launch on release
else: # Button release. Is there a release func?
if self._ff:
if self._supp:
d = self._ld
# If long delay exists, is running and doubleclick status is OK
if not self._dblpend and not self._dblran:
if (d and d()) or not d:
launch(self._ff, self._fa)
else:
launch(self._ff, self._fa)
if self._ld:
self._ld.stop() # Avoid interpreting a second click as a long push
self._dblran = False
# Ignore state changes until switch has settled
await asyncio.sleep_ms(Pushbutton.debounce_ms)

Wyświetl plik

@ -0,0 +1,78 @@
# queue.py: adapted from uasyncio V2
# Code is based on Paul Sokolovsky's work.
# This is a temporary solution until uasyncio V3 gets an efficient official version
from ucollections import deque
import uasyncio as asyncio
# Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
# Exception raised by put_nowait().
class QueueFull(Exception):
pass
# A queue, useful for coordinating producer and consumer coroutines.
# If maxsize is less than or equal to zero, the queue size is infinite. If it
# is an integer greater than 0, then "await put()" will block when the
# queue reaches maxsize, until an item is removed by get().
# Unlike the standard library Queue, you can reliably know this Queue's size
# with qsize(), since your single-threaded uasyncio application won't be
# interrupted between calling qsize() and doing an operation on the Queue.
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = deque((), maxsize)
def _get(self):
return self._queue.popleft()
async def get(self): # Usage: item = await queue.get()
while not self._queue:
# Queue is empty, put the calling Task on the waiting queue
await asyncio.sleep_ms(0)
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if not self._queue:
raise QueueEmpty()
return self._get()
def _put(self, val):
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
while self.qsize() >= self.maxsize and self.maxsize:
# Queue full
await asyncio.sleep_ms(0)
# Task(s) waiting to get from queue, schedule first Task
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.qsize() >= self.maxsize and self.maxsize:
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return not self._queue
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default),
# then full() is never True.
if self.maxsize <= 0:
return False
else:
return self.qsize() >= self.maxsize

Wyświetl plik

@ -0,0 +1,37 @@
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# A Semaphore is typically used to limit the number of coros running a
# particular piece of code at once. The number is defined in the constructor.
class Semaphore():
def __init__(self, value=1):
self._count = value
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
self.release()
await asyncio.sleep(0)
async def acquire(self):
while self._count == 0:
await asyncio.sleep_ms(0)
self._count -= 1
def release(self):
self._count += 1
class BoundedSemaphore(Semaphore):
def __init__(self, value=1):
super().__init__(value)
self._initial_value = value
def release(self):
if self._count < self._initial_value:
self._count += 1
else:
raise ValueError('Semaphore released more than acquired')

Wyświetl plik

@ -0,0 +1,37 @@
import uasyncio as asyncio
import utime as time
from . import launch
class Switch:
debounce_ms = 50
def __init__(self, pin):
self.pin = pin # Should be initialised for input with pullup
self._open_func = False
self._close_func = False
self.switchstate = self.pin.value() # Get initial state
asyncio.create_task(self.switchcheck()) # Thread runs forever
def open_func(self, func, args=()):
self._open_func = func
self._open_args = args
def close_func(self, func, args=()):
self._close_func = func
self._close_args = args
# Return current state of switch (0 = pressed)
def __call__(self):
return self.switchstate
async def switchcheck(self):
while True:
state = self.pin.value()
if state != self.switchstate:
# State has changed: act on it now.
self.switchstate = state
if state == 0 and self._close_func:
launch(self._close_func, self._close_args)
elif state == 1 and self._open_func:
launch(self._open_func, self._open_args)
# Ignore further state changes until switch has settled
await asyncio.sleep_ms(Switch.debounce_ms)

Wyświetl plik

@ -0,0 +1,404 @@
# asyntest.py Test/demo of the 'micro' Event, Barrier and Semaphore classes
# Test/demo of official asyncio library and official Lock class
# The MIT License (MIT)
#
# Copyright (c) 2017-2018 Peter Hinch
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# CPython 3.5 compatibility
# (ignore RuntimeWarning: coroutine '_g' was never awaited)
# To run:
# from primitives.tests.asyntest import test
try:
import uasyncio as asyncio
except ImportError:
import asyncio
from primitives.message import Message
from primitives.barrier import Barrier
from primitives.semaphore import Semaphore, BoundedSemaphore
from primitives.condition import Condition
def print_tests():
st = '''Available functions:
test(0) Print this list.
test(1) Test message acknowledge.
test(2) Test Messge and Lock objects.
test(3) Test the Barrier class.
test(4) Test Semaphore
test(5) Test BoundedSemaphore.
test(6) Test the Condition class.
test(7) Test the Queue class.
Recommended to issue ctrl-D after running each test.
'''
print('\x1b[32m')
print(st)
print('\x1b[39m')
print_tests()
def printexp(exp, runtime=0):
print('Expected output:')
print('\x1b[32m')
print(exp)
print('\x1b[39m')
if runtime:
print('Running (runtime = {}s):'.format(runtime))
else:
print('Running (runtime < 1s):')
# ************ Test Message class ************
# Demo use of acknowledge message
async def message_wait(message, ack_message, n):
await message
print('message_wait {} got message with value {}'.format(n, message.value()))
ack_message.set()
async def run_ack():
message = Message()
ack1 = Message()
ack2 = Message()
count = 0
while True:
asyncio.create_task(message_wait(message, ack1, 1))
asyncio.create_task(message_wait(message, ack2, 2))
message.set(count)
count += 1
print('message was set')
await ack1
ack1.clear()
print('Cleared ack1')
await ack2
ack2.clear()
print('Cleared ack2')
message.clear()
print('Cleared message')
await asyncio.sleep(1)
async def ack_coro(delay):
print('Started ack coro with delay', delay)
await asyncio.sleep(delay)
print("I've seen attack ships burn on the shoulder of Orion...")
print("Time to die...")
def ack_test():
printexp('''message was set
message_wait 1 got message with value 0
message_wait 2 got message with value 0
Cleared ack1
Cleared ack2
Cleared message
message was set
message_wait 1 got message with value 1
message_wait 2 got message with value 1
... text omitted ...
message_wait 1 got message with value 5
message_wait 2 got message with value 5
Cleared ack1
Cleared ack2
Cleared message
I've seen attack ships burn on the shoulder of Orion...
Time to die...
''', 10)
asyncio.create_task(run_ack())
asyncio.run(ack_coro(6))
# ************ Test Lock and Message classes ************
async def run_lock(n, lock):
print('run_lock {} waiting for lock'.format(n))
await lock.acquire()
print('run_lock {} acquired lock'.format(n))
await asyncio.sleep(1) # Delay to demo other coros waiting for lock
lock.release()
print('run_lock {} released lock'.format(n))
async def messageset(message):
print('Waiting 5 secs before setting message')
await asyncio.sleep(5)
message.set()
print('message was set')
async def messagewait(message):
print('waiting for message')
await message
print('got message')
message.clear()
async def run_message_test():
print('Test Lock class')
lock = asyncio.Lock()
asyncio.create_task(run_lock(1, lock))
asyncio.create_task(run_lock(2, lock))
asyncio.create_task(run_lock(3, lock))
print('Test Message class')
message = Message()
asyncio.create_task(messageset(message))
await messagewait(message) # run_message_test runs fast until this point
print('Message status {}'.format('Incorrect' if message.is_set() else 'OK'))
print('Tasks complete')
def msg_test():
printexp('''Test Lock class
Test Message class
waiting for message
run_lock 1 waiting for lock
run_lock 1 acquired lock
run_lock 2 waiting for lock
run_lock 3 waiting for lock
Waiting 5 secs before setting message
run_lock 1 released lock
run_lock 2 acquired lock
run_lock 2 released lock
run_lock 3 acquired lock
run_lock 3 released lock
message was set
got message
Message status OK
Tasks complete
''', 5)
asyncio.run(run_message_test())
# ************ Barrier test ************
async def killer(duration):
await asyncio.sleep(duration)
def callback(text):
print(text)
async def report(barrier):
for i in range(5):
print('{} '.format(i), end='')
await barrier
def barrier_test():
printexp('''0 0 0 Synch
1 1 1 Synch
2 2 2 Synch
3 3 3 Synch
4 4 4 Synch
''')
barrier = Barrier(3, callback, ('Synch',))
for _ in range(3):
asyncio.create_task(report(barrier))
asyncio.run(killer(2))
# ************ Semaphore test ************
async def run_sema(n, sema, barrier):
print('run_sema {} trying to access semaphore'.format(n))
async with sema:
print('run_sema {} acquired semaphore'.format(n))
# Delay demonstrates other coros waiting for semaphore
await asyncio.sleep(1 + n/10) # n/10 ensures deterministic printout
print('run_sema {} has released semaphore'.format(n))
barrier.trigger()
async def run_sema_test(bounded):
num_coros = 5
barrier = Barrier(num_coros + 1)
if bounded:
semaphore = BoundedSemaphore(3)
else:
semaphore = Semaphore(3)
for n in range(num_coros):
asyncio.create_task(run_sema(n, semaphore, barrier))
await barrier # Quit when all coros complete
try:
semaphore.release()
except ValueError:
print('Bounded semaphore exception test OK')
def semaphore_test(bounded=False):
if bounded:
exp = '''run_sema 0 trying to access semaphore
run_sema 0 acquired semaphore
run_sema 1 trying to access semaphore
run_sema 1 acquired semaphore
run_sema 2 trying to access semaphore
run_sema 2 acquired semaphore
run_sema 3 trying to access semaphore
run_sema 4 trying to access semaphore
run_sema 0 has released semaphore
run_sema 4 acquired semaphore
run_sema 1 has released semaphore
run_sema 3 acquired semaphore
run_sema 2 has released semaphore
run_sema 4 has released semaphore
run_sema 3 has released semaphore
Bounded semaphore exception test OK
Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
else:
exp = '''run_sema 0 trying to access semaphore
run_sema 0 acquired semaphore
run_sema 1 trying to access semaphore
run_sema 1 acquired semaphore
run_sema 2 trying to access semaphore
run_sema 2 acquired semaphore
run_sema 3 trying to access semaphore
run_sema 4 trying to access semaphore
run_sema 0 has released semaphore
run_sema 3 acquired semaphore
run_sema 1 has released semaphore
run_sema 4 acquired semaphore
run_sema 2 has released semaphore
run_sema 3 has released semaphore
run_sema 4 has released semaphore
Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
printexp(exp, 3)
asyncio.run(run_sema_test(bounded))
# ************ Condition test ************
cond = Condition()
tim = 0
async def cond01():
while True:
await asyncio.sleep(2)
with await cond:
cond.notify(2) # Notify 2 tasks
async def cond03(): # Maintain a count of seconds
global tim
await asyncio.sleep(0.5)
while True:
await asyncio.sleep(1)
tim += 1
async def cond02(n, barrier):
with await cond:
print('cond02', n, 'Awaiting notification.')
await cond.wait()
print('cond02', n, 'triggered. tim =', tim)
barrier.trigger()
def predicate():
return tim >= 8 # 12
async def cond04(n, barrier):
with await cond:
print('cond04', n, 'Awaiting notification and predicate.')
await cond.wait_for(predicate)
print('cond04', n, 'triggered. tim =', tim)
barrier.trigger()
async def cond_go():
ntasks = 7
barrier = Barrier(ntasks + 1)
t1 = asyncio.create_task(cond01())
t3 = asyncio.create_task(cond03())
for n in range(ntasks):
asyncio.create_task(cond02(n, barrier))
await barrier # All instances of cond02 have completed
# Test wait_for
barrier = Barrier(2)
asyncio.create_task(cond04(99, barrier))
await barrier
# cancel continuously running coros.
t1.cancel()
t3.cancel()
await asyncio.sleep_ms(0)
print('Done.')
def condition_test():
printexp('''cond02 0 Awaiting notification.
cond02 1 Awaiting notification.
cond02 2 Awaiting notification.
cond02 3 Awaiting notification.
cond02 4 Awaiting notification.
cond02 5 Awaiting notification.
cond02 6 Awaiting notification.
cond02 5 triggered. tim = 1
cond02 6 triggered. tim = 1
cond02 3 triggered. tim = 3
cond02 4 triggered. tim = 3
cond02 1 triggered. tim = 5
cond02 2 triggered. tim = 5
cond02 0 triggered. tim = 7
cond04 99 Awaiting notification and predicate.
cond04 99 triggered. tim = 9
Done.
''', 13)
asyncio.run(cond_go())
# ************ Queue test ************
from primitives.queue import Queue
q = Queue()
async def slow_process():
await asyncio.sleep(2)
return 42
async def bar():
print('Waiting for slow process.')
result = await slow_process()
print('Putting result onto queue')
await q.put(result) # Put result on q
async def foo():
print("Running foo()")
result = await q.get()
print('Result was {}'.format(result))
async def queue_go(delay):
asyncio.create_task(foo())
asyncio.create_task(bar())
await asyncio.sleep(delay)
print("I've seen starships burn off the shoulder of Orion...")
print("Time to die...")
def queue_test():
printexp('''Running (runtime = 3s):
Running foo()
Waiting for slow process.
Putting result onto queue
I've seen starships burn off the shoulder of Orion...
Time to die...
''', 3)
asyncio.run(queue_go(3))
def test(n):
if n == 0:
print_tests() # Print this list.
elif n == 1:
ack_test() # Test message acknowledge.
elif n == 2:
msg_test() # Test Messge and Lock objects.
elif n == 3:
barrier_test() # Test the Barrier class.
elif n == 4:
semaphore_test(False) # Test Semaphore
elif n == 5:
semaphore_test(True) # Test BoundedSemaphore.
elif n == 6:
condition_test() # Test the Condition class.
elif n == 7:
queue_test() # Test the Queue class.

Wyświetl plik

@ -0,0 +1,137 @@
# Test/demo programs for Switch and Pushbutton classes
# Tested on Pyboard but should run on other microcontroller platforms
# running MicroPython with uasyncio library.
# Author: Peter Hinch.
# Copyright Peter Hinch 2017-2020 Released under the MIT license.
# To run:
# from primitives.tests.switches import *
# test_sw() # For example
from machine import Pin
from pyb import LED
from primitives.switch import Switch
from primitives.pushbutton import Pushbutton
import uasyncio as asyncio
helptext = '''
Test using switch or pushbutton between X1 and gnd.
Ground pin X2 to terminate test.
Soft reset (ctrl-D) after each test.
'''
tests = '''
Available tests:
test_sw Switch test
test_swcb Switch with callback
test_btn Pushutton launching coros
test_btncb Pushbutton launching callbacks
'''
print(tests)
# Pulse an LED (coroutine)
async def pulse(led, ms):
led.on()
await asyncio.sleep_ms(ms)
led.off()
# Toggle an LED (callback)
def toggle(led):
led.toggle()
# Quit test by connecting X2 to ground
async def killer():
pin = Pin('X2', Pin.IN, Pin.PULL_UP)
while pin.value():
await asyncio.sleep_ms(50)
# Test for the Switch class passing coros
def test_sw():
s = '''
close pulses green
open pulses red
'''
print('Test of switch scheduling coroutines.')
print(helptext)
print(s)
pin = Pin('X1', Pin.IN, Pin.PULL_UP)
red = LED(1)
green = LED(2)
sw = Switch(pin)
# Register coros to launch on contact close and open
sw.close_func(pulse, (green, 1000))
sw.open_func(pulse, (red, 1000))
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())
# Test for the switch class with a callback
def test_swcb():
s = '''
close toggles red
open toggles green
'''
print('Test of switch executing callbacks.')
print(helptext)
print(s)
pin = Pin('X1', Pin.IN, Pin.PULL_UP)
red = LED(1)
green = LED(2)
sw = Switch(pin)
# Register a coro to launch on contact close
sw.close_func(toggle, (red,))
sw.open_func(toggle, (green,))
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())
# Test for the Pushbutton class (coroutines)
# Pass True to test suppress
def test_btn(suppress=False, lf=True, df=True):
s = '''
press pulses red
release pulses green
double click pulses yellow
long press pulses blue
'''
print('Test of pushbutton scheduling coroutines.')
print(helptext)
print(s)
pin = Pin('X1', Pin.IN, Pin.PULL_UP)
red = LED(1)
green = LED(2)
yellow = LED(3)
blue = LED(4)
pb = Pushbutton(pin, suppress)
pb.press_func(pulse, (red, 1000))
pb.release_func(pulse, (green, 1000))
if df:
print('Doubleclick enabled')
pb.double_func(pulse, (yellow, 1000))
if lf:
print('Long press enabled')
pb.long_func(pulse, (blue, 1000))
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())
# Test for the Pushbutton class (callbacks)
def test_btncb():
s = '''
press toggles red
release toggles green
double click toggles yellow
long press toggles blue
'''
print('Test of pushbutton executing callbacks.')
print(helptext)
print(s)
pin = Pin('X1', Pin.IN, Pin.PULL_UP)
red = LED(1)
green = LED(2)
yellow = LED(3)
blue = LED(4)
pb = Pushbutton(pin)
pb.press_func(toggle, (red,))
pb.release_func(toggle, (green,))
pb.double_func(toggle, (yellow,))
pb.long_func(toggle, (blue,))
loop = asyncio.get_event_loop()
loop.run_until_complete(killer())