micropython-samples/resilient/primitives.py

260 wiersze
7.4 KiB
Python

# primitives.py A stripped-down verion of asyn.py with Lock and Event only.
# Save RAM on ESP8266
# Released under the MIT licence.
# Copyright (C) Peter Hinch 2018
import uasyncio as asyncio
class Lock():
def __init__(self, delay_ms=0):
self._locked = False
self.delay_ms = delay_ms
def locked(self):
return self._locked
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
self.release()
await asyncio.sleep(0)
async def acquire(self):
while True:
if self._locked:
await asyncio.sleep_ms(self.delay_ms)
else:
self._locked = True
break
def release(self):
if not self._locked:
raise RuntimeError('Attempt to release a lock which has not been set')
self._locked = False
class Event():
def __init__(self, delay_ms=0):
self.delay_ms = delay_ms
self.clear()
def clear(self):
self._flag = False
self._data = None
def __await__(self):
while not self._flag:
await asyncio.sleep_ms(self.delay_ms)
__iter__ = __await__
def is_set(self):
return self._flag
def set(self, data=None):
self._flag = True
self._data = data
def value(self):
return self._data
class Barrier():
def __init__(self, participants, func=None, args=()):
self._participants = participants
self._func = func
self._args = args
self._reset(True)
def __await__(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
return
direction = self._down
while True: # Wait until last waiting thread changes the direction
if direction != self._down:
return
yield
__iter__ = __await__
def trigger(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
def _reset(self, down):
self._down = down
self._count = self._participants if down else 0
def busy(self):
if self._down:
done = self._count == self._participants
else:
done = self._count == 0
return not done
def _at_limit(self): # Has count reached up or down limit?
limit = 0 if self._down else self._participants
return self._count == limit
def _update(self):
self._count += -1 if self._down else 1
if self._count < 0 or self._count > self._participants:
raise ValueError('Too many tasks accessing Barrier')
# Task Cancellation
try:
StopTask = asyncio.CancelledError # More descriptive name
except AttributeError:
raise OSError('asyn.py requires uasyncio V1.7.1 or above.')
class TaskId():
def __init__(self, taskid):
self.taskid = taskid
def __call__(self):
return self.taskid
# Sleep coro breaks up a sleep into shorter intervals to ensure a rapid
# response to StopTask exceptions
async def sleep(t, granularity=100): # 100ms default
if granularity <= 0:
raise ValueError('sleep granularity must be > 0')
t = int(t * 1000) # ms
if t <= granularity:
await asyncio.sleep_ms(t)
else:
n, rem = divmod(t, granularity)
for _ in range(n):
await asyncio.sleep_ms(granularity)
await asyncio.sleep_ms(rem)
class Cancellable():
task_no = 0 # Generated task ID, index of tasks dict
tasks = {} # Value is [coro, group, barrier] indexed by integer task_no
@classmethod
def _cancel(cls, task_no):
task = cls.tasks[task_no][0]
asyncio.cancel(task)
@classmethod
async def cancel_all(cls, group=0, nowait=False):
tokill = cls._get_task_nos(group)
barrier = Barrier(len(tokill) + 1) # Include this task
for task_no in tokill:
cls.tasks[task_no][2] = barrier
cls._cancel(task_no)
if nowait:
barrier.trigger()
else:
await barrier
@classmethod
def _is_running(cls, group=0):
tasks = cls._get_task_nos(group)
if tasks == []:
return False
for task_no in tasks:
barrier = cls.tasks[task_no][2]
if barrier is None: # Running, not yet cancelled
return True
if barrier.busy():
return True
return False
@classmethod
def _get_task_nos(cls, group): # Return task nos in a group
return [task_no for task_no in cls.tasks if cls.tasks[task_no][1] == group]
@classmethod
def _get_group(cls, task_no): # Return group given a task_no
return cls.tasks[task_no][1]
@classmethod
def _stopped(cls, task_no):
if task_no in cls.tasks:
barrier = cls.tasks[task_no][2]
if barrier is not None: # Cancellation in progress
barrier.trigger()
del cls.tasks[task_no]
def __init__(self, gf, *args, group=0, **kwargs):
task = gf(TaskId(Cancellable.task_no), *args, **kwargs)
if task in self.tasks:
raise ValueError('Task already exists.')
self.tasks[Cancellable.task_no] = [task, group, None]
self.task_no = Cancellable.task_no # For subclass
Cancellable.task_no += 1
self.task = task
def __call__(self):
return self.task
def __await__(self): # Return any value returned by task.
return (yield from self.task)
__iter__ = __await__
# @cancellable decorator
def cancellable(f):
def new_gen(*args, **kwargs):
if isinstance(args[0], TaskId): # Not a bound method
task_id = args[0]
g = f(*args[1:], **kwargs)
else: # Task ID is args[1] if a bound method
task_id = args[1]
args = (args[0],) + args[2:]
g = f(*args, **kwargs)
try:
res = await g
return res
finally:
NamedTask._stopped(task_id)
return new_gen
class NamedTask(Cancellable):
instances = {}
@classmethod
async def cancel(cls, name, nowait=True):
if name in cls.instances:
await cls.cancel_all(group=name, nowait=nowait)
return True
return False
@classmethod
def is_running(cls, name):
return cls._is_running(group=name)
@classmethod
def _stopped(cls, task_id): # On completion remove it
name = cls._get_group(task_id()) # Convert task_id to task_no
if name in cls.instances:
instance = cls.instances[name]
barrier = instance.barrier
if barrier is not None:
barrier.trigger()
del cls.instances[name]
Cancellable._stopped(task_id())
def __init__(self, name, gf, *args, barrier=None, **kwargs):
if name in self.instances:
raise ValueError('Task name "{}" already exists.'.format(name))
super().__init__(gf, *args, group=name, **kwargs)
self.barrier = barrier
self.instances[name] = self