From e691437f8a432713c8ce2e73b6da4ae0d6b91fea Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 2 Dec 2018 13:18:02 +0000 Subject: [PATCH] primitives.py Add NamedTask because of possible failure mode. --- resilient/primitives.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/resilient/primitives.py b/resilient/primitives.py index 0ac2b2e..03f814b 100644 --- a/resilient/primitives.py +++ b/resilient/primitives.py @@ -226,3 +226,34 @@ def cancellable(f): NamedTask._stopped(task_id) return new_gen +class NamedTask(Cancellable): + instances = {} + + @classmethod + async def cancel(cls, name, nowait=True): + if name in cls.instances: + await cls.cancel_all(group=name, nowait=nowait) + return True + return False + + @classmethod + def is_running(cls, name): + return cls._is_running(group=name) + + @classmethod + def _stopped(cls, task_id): # On completion remove it + name = cls._get_group(task_id()) # Convert task_id to task_no + if name in cls.instances: + instance = cls.instances[name] + barrier = instance.barrier + if barrier is not None: + barrier.trigger() + del cls.instances[name] + Cancellable._stopped(task_id()) + + def __init__(self, name, gf, *args, barrier=None, **kwargs): + if name in self.instances: + raise ValueError('Task name "{}" already exists.'.format(name)) + super().__init__(gf, *args, group=name, **kwargs) + self.barrier = barrier + self.instances[name] = self