From 0b80a693440bf1dbc3eb5f66c1b2376df6cee06d Mon Sep 17 00:00:00 2001 From: Jared Hancock Date: Mon, 7 Nov 2022 08:50:01 -0600 Subject: [PATCH 1/5] asyncio: Properly cancel the main task on exception If the main task is interrupted by e.g. a KeyboardInterrupt, then the main task needs to have the exception injected into it so it will run the exception handlers and contextmanager __aexit__ methods. --- extmod/asyncio/core.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py index 8aad234514..5e15f448c6 100644 --- a/extmod/asyncio/core.py +++ b/extmod/asyncio/core.py @@ -147,27 +147,34 @@ def create_task(coro): _task_queue.push(t) return t - # Keep scheduling tasks until there are none left to schedule def run_until_complete(main_task=None): global cur_task excs_all = (CancelledError, Exception) # To prevent heap allocation in loop excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop + _io_queue.wait_io_event(0) while True: - # Wait until the head of _task_queue is ready to run - dt = 1 - while dt > 0: - dt = -1 + try: + # Wait until the head of _task_queue is ready to run t = _task_queue.peek() if t: # A task waiting on _task_queue; "ph_key" is time to schedule task at - dt = max(0, ticks_diff(t.ph_key, ticks())) + dt = ticks_diff(t.ph_key, ticks()) + if dt > 0: + _io_queue.wait_io_event(dt) elif not _io_queue.map: # No tasks can be woken so finished running cur_task = None return - # print('(poll {})'.format(dt), len(_io_queue.map)) - _io_queue.wait_io_event(dt) + else: + _io_queue.wait_io_event(-1) + except BaseException as exc: + try: + if main_task: + main_task.coro.throw(exc) + except StopIteration: + pass + raise # Get next task to run and continue it t = _task_queue.pop() From 0cb54ddfab252d8984a7710fab04ab7cd15a3373 Mon Sep 17 00:00:00 2001 From: Jared Hancock Date: Fri, 2 Dec 2022 22:49:18 -0600 Subject: [PATCH 2/5] uasyncio: Avoid call to max() in sleep_ms This seems to additionally reduce the overhead of uasyncio.sleep(). --- extmod/asyncio/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py index 5e15f448c6..2c1283e00a 100644 --- a/extmod/asyncio/core.py +++ b/extmod/asyncio/core.py @@ -54,7 +54,8 @@ class SingletonGenerator: # Use a SingletonGenerator to do it without allocating on the heap def sleep_ms(t, sgen=SingletonGenerator()): assert sgen.state is None - sgen.state = ticks_add(ticks(), max(0, t)) + now = ticks() + sgen.state = ticks_add(now, t) if t > 0 else now return sgen From 0e5e1ffaa53b220b20fcd65706d076aec41b26e6 Mon Sep 17 00:00:00 2001 From: Jared Hancock Date: Tue, 24 Jan 2023 11:01:55 -0600 Subject: [PATCH 3/5] uasyncio: Always call wait_io_queue --- extmod/asyncio/core.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py index 2c1283e00a..79e8647cb4 100644 --- a/extmod/asyncio/core.py +++ b/extmod/asyncio/core.py @@ -161,8 +161,7 @@ def run_until_complete(main_task=None): if t: # A task waiting on _task_queue; "ph_key" is time to schedule task at dt = ticks_diff(t.ph_key, ticks()) - if dt > 0: - _io_queue.wait_io_event(dt) + _io_queue.wait_io_event(dt if dt > 0 else 0) elif not _io_queue.map: # No tasks can be woken so finished running cur_task = None From c4b3fb43a0ef0cb4ab83cd6aa7af63fd9acec4ec Mon Sep 17 00:00:00 2001 From: Jared Hancock Date: Sat, 9 Mar 2024 10:44:08 -0600 Subject: [PATCH 4/5] asyncio: Handle case where waio_io_event returns early --- extmod/asyncio/core.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py index 79e8647cb4..598f248137 100644 --- a/extmod/asyncio/core.py +++ b/extmod/asyncio/core.py @@ -153,21 +153,23 @@ def run_until_complete(main_task=None): global cur_task excs_all = (CancelledError, Exception) # To prevent heap allocation in loop excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop - _io_queue.wait_io_event(0) while True: try: - # Wait until the head of _task_queue is ready to run - t = _task_queue.peek() - if t: - # A task waiting on _task_queue; "ph_key" is time to schedule task at - dt = ticks_diff(t.ph_key, ticks()) - _io_queue.wait_io_event(dt if dt > 0 else 0) - elif not _io_queue.map: - # No tasks can be woken so finished running - cur_task = None - return - else: - _io_queue.wait_io_event(-1) + while True: + # Wait until the head of _task_queue is ready to run + t = _task_queue.peek() + if t: + # A task waiting on _task_queue; "ph_key" is time to schedule task at + dt = ticks_diff(t.ph_key, ticks()) + _io_queue.wait_io_event(dt if dt > 0 else 0) + if dt <= 0: + break + elif not _io_queue.map: + # No tasks can be woken so finished running + cur_task = None + return + else: + _io_queue.wait_io_event(-1) except BaseException as exc: try: if main_task: From f74001c8a268790097108856bd2365867747d16f Mon Sep 17 00:00:00 2001 From: Jared Hancock Date: Sat, 9 Mar 2024 10:45:08 -0600 Subject: [PATCH 5/5] asyncio: Make slight optimizations for IOQueue.wait_io_event Calculate ~POLLIN and ~POLLOUT as constants to remove the runtime cost of continuously calculating them. And unpack the queue entry rather than using repeated item lookups. --- extmod/asyncio/core.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py index 598f248137..72d96479e9 100644 --- a/extmod/asyncio/core.py +++ b/extmod/asyncio/core.py @@ -3,6 +3,7 @@ from time import ticks_ms as ticks, ticks_diff, ticks_add import sys, select +from select import POLLIN, POLLOUT # Import TaskQueue and Task, preferring built-in C code over Python code try: @@ -67,6 +68,8 @@ def sleep(t): ################################################################################ # Queue and poller for stream IO +nPOLLIN = ~POLLIN +nPOLLOUT = ~POLLOUT class IOQueue: def __init__(self): @@ -78,13 +81,13 @@ class IOQueue: entry = [None, None, s] entry[idx] = cur_task self.map[id(s)] = entry - self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) + self.poller.register(s, POLLIN if idx == 0 else POLLOUT) else: sm = self.map[id(s)] assert sm[idx] is None assert sm[1 - idx] is not None sm[idx] = cur_task - self.poller.modify(s, select.POLLIN | select.POLLOUT) + self.poller.modify(s, POLLIN | POLLOUT) # Link task to this IOQueue so it can be removed if needed cur_task.data = self @@ -114,21 +117,22 @@ class IOQueue: def wait_io_event(self, dt): for s, ev in self.poller.ipoll(dt): sm = self.map[id(s)] + q0, q1, _ = sm # print('poll', s, sm, ev) - if ev & ~select.POLLOUT and sm[0] is not None: + if ev & nPOLLOUT and q0 is not None: # POLLIN or error - _task_queue.push(sm[0]) + _task_queue.push(q0) sm[0] = None - if ev & ~select.POLLIN and sm[1] is not None: + if ev & nPOLLIN and q1 is not None: # POLLOUT or error - _task_queue.push(sm[1]) + _task_queue.push(q1) sm[1] = None - if sm[0] is None and sm[1] is None: + if q0 is None and q1 is None: self._dequeue(s) - elif sm[0] is None: - self.poller.modify(s, select.POLLOUT) + elif q0 is None: + self.poller.modify(s, POLLOUT) else: - self.poller.modify(s, select.POLLIN) + self.poller.modify(s, POLLIN) ################################################################################