kopia lustrzana https://github.com/peterhinch/micropython-samples
260 wiersze
7.4 KiB
Python
260 wiersze
7.4 KiB
Python
# primitives.py A stripped-down verion of asyn.py with Lock and Event only.
|
|
# Save RAM on ESP8266
|
|
|
|
# Released under the MIT licence.
|
|
# Copyright (C) Peter Hinch 2018
|
|
|
|
import uasyncio as asyncio
|
|
|
|
class Lock():
|
|
def __init__(self, delay_ms=0):
|
|
self._locked = False
|
|
self.delay_ms = delay_ms
|
|
|
|
def locked(self):
|
|
return self._locked
|
|
|
|
async def __aenter__(self):
|
|
await self.acquire()
|
|
return self
|
|
|
|
async def __aexit__(self, *args):
|
|
self.release()
|
|
await asyncio.sleep(0)
|
|
|
|
async def acquire(self):
|
|
while True:
|
|
if self._locked:
|
|
await asyncio.sleep_ms(self.delay_ms)
|
|
else:
|
|
self._locked = True
|
|
break
|
|
|
|
def release(self):
|
|
if not self._locked:
|
|
raise RuntimeError('Attempt to release a lock which has not been set')
|
|
self._locked = False
|
|
|
|
|
|
class Event():
|
|
def __init__(self, delay_ms=0):
|
|
self.delay_ms = delay_ms
|
|
self.clear()
|
|
|
|
def clear(self):
|
|
self._flag = False
|
|
self._data = None
|
|
|
|
def __await__(self):
|
|
while not self._flag:
|
|
await asyncio.sleep_ms(self.delay_ms)
|
|
|
|
__iter__ = __await__
|
|
|
|
def is_set(self):
|
|
return self._flag
|
|
|
|
def set(self, data=None):
|
|
self._flag = True
|
|
self._data = data
|
|
|
|
def value(self):
|
|
return self._data
|
|
|
|
|
|
class Barrier():
|
|
def __init__(self, participants, func=None, args=()):
|
|
self._participants = participants
|
|
self._func = func
|
|
self._args = args
|
|
self._reset(True)
|
|
|
|
def __await__(self):
|
|
self._update()
|
|
if self._at_limit(): # All other threads are also at limit
|
|
if self._func is not None:
|
|
launch(self._func, self._args)
|
|
self._reset(not self._down) # Toggle direction to release others
|
|
return
|
|
|
|
direction = self._down
|
|
while True: # Wait until last waiting thread changes the direction
|
|
if direction != self._down:
|
|
return
|
|
yield
|
|
|
|
__iter__ = __await__
|
|
|
|
def trigger(self):
|
|
self._update()
|
|
if self._at_limit(): # All other threads are also at limit
|
|
if self._func is not None:
|
|
launch(self._func, self._args)
|
|
self._reset(not self._down) # Toggle direction to release others
|
|
|
|
def _reset(self, down):
|
|
self._down = down
|
|
self._count = self._participants if down else 0
|
|
|
|
def busy(self):
|
|
if self._down:
|
|
done = self._count == self._participants
|
|
else:
|
|
done = self._count == 0
|
|
return not done
|
|
|
|
def _at_limit(self): # Has count reached up or down limit?
|
|
limit = 0 if self._down else self._participants
|
|
return self._count == limit
|
|
|
|
def _update(self):
|
|
self._count += -1 if self._down else 1
|
|
if self._count < 0 or self._count > self._participants:
|
|
raise ValueError('Too many tasks accessing Barrier')
|
|
|
|
# Task Cancellation
|
|
try:
|
|
StopTask = asyncio.CancelledError # More descriptive name
|
|
except AttributeError:
|
|
raise OSError('asyn.py requires uasyncio V1.7.1 or above.')
|
|
|
|
class TaskId():
|
|
def __init__(self, taskid):
|
|
self.taskid = taskid
|
|
|
|
def __call__(self):
|
|
return self.taskid
|
|
|
|
# Sleep coro breaks up a sleep into shorter intervals to ensure a rapid
|
|
# response to StopTask exceptions
|
|
async def sleep(t, granularity=100): # 100ms default
|
|
if granularity <= 0:
|
|
raise ValueError('sleep granularity must be > 0')
|
|
t = int(t * 1000) # ms
|
|
if t <= granularity:
|
|
await asyncio.sleep_ms(t)
|
|
else:
|
|
n, rem = divmod(t, granularity)
|
|
for _ in range(n):
|
|
await asyncio.sleep_ms(granularity)
|
|
await asyncio.sleep_ms(rem)
|
|
|
|
|
|
class Cancellable():
|
|
task_no = 0 # Generated task ID, index of tasks dict
|
|
tasks = {} # Value is [coro, group, barrier] indexed by integer task_no
|
|
|
|
@classmethod
|
|
def _cancel(cls, task_no):
|
|
task = cls.tasks[task_no][0]
|
|
asyncio.cancel(task)
|
|
|
|
@classmethod
|
|
async def cancel_all(cls, group=0, nowait=False):
|
|
tokill = cls._get_task_nos(group)
|
|
barrier = Barrier(len(tokill) + 1) # Include this task
|
|
for task_no in tokill:
|
|
cls.tasks[task_no][2] = barrier
|
|
cls._cancel(task_no)
|
|
if nowait:
|
|
barrier.trigger()
|
|
else:
|
|
await barrier
|
|
|
|
@classmethod
|
|
def _is_running(cls, group=0):
|
|
tasks = cls._get_task_nos(group)
|
|
if tasks == []:
|
|
return False
|
|
for task_no in tasks:
|
|
barrier = cls.tasks[task_no][2]
|
|
if barrier is None: # Running, not yet cancelled
|
|
return True
|
|
if barrier.busy():
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def _get_task_nos(cls, group): # Return task nos in a group
|
|
return [task_no for task_no in cls.tasks if cls.tasks[task_no][1] == group]
|
|
|
|
@classmethod
|
|
def _get_group(cls, task_no): # Return group given a task_no
|
|
return cls.tasks[task_no][1]
|
|
|
|
@classmethod
|
|
def _stopped(cls, task_no):
|
|
if task_no in cls.tasks:
|
|
barrier = cls.tasks[task_no][2]
|
|
if barrier is not None: # Cancellation in progress
|
|
barrier.trigger()
|
|
del cls.tasks[task_no]
|
|
|
|
def __init__(self, gf, *args, group=0, **kwargs):
|
|
task = gf(TaskId(Cancellable.task_no), *args, **kwargs)
|
|
if task in self.tasks:
|
|
raise ValueError('Task already exists.')
|
|
self.tasks[Cancellable.task_no] = [task, group, None]
|
|
self.task_no = Cancellable.task_no # For subclass
|
|
Cancellable.task_no += 1
|
|
self.task = task
|
|
|
|
def __call__(self):
|
|
return self.task
|
|
|
|
def __await__(self): # Return any value returned by task.
|
|
return (yield from self.task)
|
|
|
|
__iter__ = __await__
|
|
|
|
|
|
# @cancellable decorator
|
|
|
|
def cancellable(f):
|
|
def new_gen(*args, **kwargs):
|
|
if isinstance(args[0], TaskId): # Not a bound method
|
|
task_id = args[0]
|
|
g = f(*args[1:], **kwargs)
|
|
else: # Task ID is args[1] if a bound method
|
|
task_id = args[1]
|
|
args = (args[0],) + args[2:]
|
|
g = f(*args, **kwargs)
|
|
try:
|
|
res = await g
|
|
return res
|
|
finally:
|
|
NamedTask._stopped(task_id)
|
|
return new_gen
|
|
|
|
class NamedTask(Cancellable):
|
|
instances = {}
|
|
|
|
@classmethod
|
|
async def cancel(cls, name, nowait=True):
|
|
if name in cls.instances:
|
|
await cls.cancel_all(group=name, nowait=nowait)
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def is_running(cls, name):
|
|
return cls._is_running(group=name)
|
|
|
|
@classmethod
|
|
def _stopped(cls, task_id): # On completion remove it
|
|
name = cls._get_group(task_id()) # Convert task_id to task_no
|
|
if name in cls.instances:
|
|
instance = cls.instances[name]
|
|
barrier = instance.barrier
|
|
if barrier is not None:
|
|
barrier.trigger()
|
|
del cls.instances[name]
|
|
Cancellable._stopped(task_id())
|
|
|
|
def __init__(self, name, gf, *args, barrier=None, **kwargs):
|
|
if name in self.instances:
|
|
raise ValueError('Task name "{}" already exists.'.format(name))
|
|
super().__init__(gf, *args, group=name, **kwargs)
|
|
self.barrier = barrier
|
|
self.instances[name] = self
|