diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py index 8aad234514..72d96479e9 100644 --- a/extmod/asyncio/core.py +++ b/extmod/asyncio/core.py @@ -3,6 +3,7 @@ from time import ticks_ms as ticks, ticks_diff, ticks_add import sys, select +from select import POLLIN, POLLOUT # Import TaskQueue and Task, preferring built-in C code over Python code try: @@ -54,7 +55,8 @@ class SingletonGenerator: # Use a SingletonGenerator to do it without allocating on the heap def sleep_ms(t, sgen=SingletonGenerator()): assert sgen.state is None - sgen.state = ticks_add(ticks(), max(0, t)) + now = ticks() + sgen.state = ticks_add(now, t) if t > 0 else now return sgen @@ -66,6 +68,8 @@ def sleep(t): ################################################################################ # Queue and poller for stream IO +nPOLLIN = ~POLLIN +nPOLLOUT = ~POLLOUT class IOQueue: def __init__(self): @@ -77,13 +81,13 @@ class IOQueue: entry = [None, None, s] entry[idx] = cur_task self.map[id(s)] = entry - self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) + self.poller.register(s, POLLIN if idx == 0 else POLLOUT) else: sm = self.map[id(s)] assert sm[idx] is None assert sm[1 - idx] is not None sm[idx] = cur_task - self.poller.modify(s, select.POLLIN | select.POLLOUT) + self.poller.modify(s, POLLIN | POLLOUT) # Link task to this IOQueue so it can be removed if needed cur_task.data = self @@ -113,21 +117,22 @@ class IOQueue: def wait_io_event(self, dt): for s, ev in self.poller.ipoll(dt): sm = self.map[id(s)] + q0, q1, _ = sm # print('poll', s, sm, ev) - if ev & ~select.POLLOUT and sm[0] is not None: + if ev & nPOLLOUT and q0 is not None: # POLLIN or error - _task_queue.push(sm[0]) + _task_queue.push(q0) sm[0] = None - if ev & ~select.POLLIN and sm[1] is not None: + if ev & nPOLLIN and q1 is not None: # POLLOUT or error - _task_queue.push(sm[1]) + _task_queue.push(q1) sm[1] = None - if sm[0] is None and sm[1] is None: + if q0 is None and q1 is None: self._dequeue(s) - elif sm[0] is None: - self.poller.modify(s, select.POLLOUT) + elif q0 is None: + self.poller.modify(s, POLLOUT) else: - self.poller.modify(s, select.POLLIN) + self.poller.modify(s, POLLIN) ################################################################################ @@ -147,27 +152,35 @@ def create_task(coro): _task_queue.push(t) return t - # Keep scheduling tasks until there are none left to schedule def run_until_complete(main_task=None): global cur_task excs_all = (CancelledError, Exception) # To prevent heap allocation in loop excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop while True: - # Wait until the head of _task_queue is ready to run - dt = 1 - while dt > 0: - dt = -1 - t = _task_queue.peek() - if t: - # A task waiting on _task_queue; "ph_key" is time to schedule task at - dt = max(0, ticks_diff(t.ph_key, ticks())) - elif not _io_queue.map: - # No tasks can be woken so finished running - cur_task = None - return - # print('(poll {})'.format(dt), len(_io_queue.map)) - _io_queue.wait_io_event(dt) + try: + while True: + # Wait until the head of _task_queue is ready to run + t = _task_queue.peek() + if t: + # A task waiting on _task_queue; "ph_key" is time to schedule task at + dt = ticks_diff(t.ph_key, ticks()) + _io_queue.wait_io_event(dt if dt > 0 else 0) + if dt <= 0: + break + elif not _io_queue.map: + # No tasks can be woken so finished running + cur_task = None + return + else: + _io_queue.wait_io_event(-1) + except BaseException as exc: + try: + if main_task: + main_task.coro.throw(exc) + except StopIteration: + pass + raise # Get next task to run and continue it t = _task_queue.pop()