diff --git a/extmod/moduasyncio.c b/extmod/moduasyncio.c index b0921b6eb1..229a146c1a 100644 --- a/extmod/moduasyncio.c +++ b/extmod/moduasyncio.c @@ -103,7 +103,7 @@ STATIC mp_obj_t task_queue_peek(mp_obj_t self_in) { } STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_peek_obj, task_queue_peek); -STATIC mp_obj_t task_queue_push_sorted(size_t n_args, const mp_obj_t *args) { +STATIC mp_obj_t task_queue_push(size_t n_args, const mp_obj_t *args) { mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(args[0]); mp_obj_task_t *task = MP_OBJ_TO_PTR(args[1]); task->data = mp_const_none; @@ -116,9 +116,9 @@ STATIC mp_obj_t task_queue_push_sorted(size_t n_args, const mp_obj_t *args) { self->heap = (mp_obj_task_t *)mp_pairheap_push(task_lt, TASK_PAIRHEAP(self->heap), TASK_PAIRHEAP(task)); return mp_const_none; } -STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(task_queue_push_sorted_obj, 2, 3, task_queue_push_sorted); +STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(task_queue_push_obj, 2, 3, task_queue_push); -STATIC mp_obj_t task_queue_pop_head(mp_obj_t self_in) { +STATIC mp_obj_t task_queue_pop(mp_obj_t self_in) { mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in); mp_obj_task_t *head = (mp_obj_task_t *)mp_pairheap_peek(task_lt, &self->heap->pairheap); if (head == NULL) { @@ -127,7 +127,7 @@ STATIC mp_obj_t task_queue_pop_head(mp_obj_t self_in) { self->heap = (mp_obj_task_t *)mp_pairheap_pop(task_lt, &self->heap->pairheap); return MP_OBJ_FROM_PTR(head); } -STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_pop_head_obj, task_queue_pop_head); +STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_pop_obj, task_queue_pop); STATIC mp_obj_t task_queue_remove(mp_obj_t self_in, mp_obj_t task_in) { mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in); @@ -139,9 +139,8 @@ STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_queue_remove_obj, task_queue_remove); STATIC const mp_rom_map_elem_t task_queue_locals_dict_table[] = { { MP_ROM_QSTR(MP_QSTR_peek), MP_ROM_PTR(&task_queue_peek_obj) }, - { MP_ROM_QSTR(MP_QSTR_push_sorted), MP_ROM_PTR(&task_queue_push_sorted_obj) }, - { MP_ROM_QSTR(MP_QSTR_push_head), MP_ROM_PTR(&task_queue_push_sorted_obj) }, - { MP_ROM_QSTR(MP_QSTR_pop_head), MP_ROM_PTR(&task_queue_pop_head_obj) }, + { MP_ROM_QSTR(MP_QSTR_push), MP_ROM_PTR(&task_queue_push_obj) }, + { MP_ROM_QSTR(MP_QSTR_pop), MP_ROM_PTR(&task_queue_pop_obj) }, { MP_ROM_QSTR(MP_QSTR_remove), MP_ROM_PTR(&task_queue_remove_obj) }, }; STATIC MP_DEFINE_CONST_DICT(task_queue_locals_dict, task_queue_locals_dict_table); @@ -205,18 +204,18 @@ STATIC mp_obj_t task_cancel(mp_obj_t self_in) { // Not on the main running queue, remove the task from the queue it's on. dest[2] = MP_OBJ_FROM_PTR(self); mp_call_method_n_kw(1, 0, dest); - // _task_queue.push_head(self) + // _task_queue.push(self) dest[0] = _task_queue; dest[1] = MP_OBJ_FROM_PTR(self); - task_queue_push_sorted(2, dest); + task_queue_push(2, dest); } else if (ticks_diff(self->ph_key, ticks()) > 0) { // On the main running queue but scheduled in the future, so bring it forward to now. // _task_queue.remove(self) task_queue_remove(_task_queue, MP_OBJ_FROM_PTR(self)); - // _task_queue.push_head(self) + // _task_queue.push(self) dest[0] = _task_queue; dest[1] = MP_OBJ_FROM_PTR(self); - task_queue_push_sorted(2, dest); + task_queue_push(2, dest); } self->data = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError)); @@ -281,7 +280,7 @@ STATIC mp_obj_t task_iternext(mp_obj_t self_in) { // Put calling task on waiting queue. mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task)); mp_obj_t args[2] = { self->state, cur_task }; - task_queue_push_sorted(2, args); + task_queue_push(2, args); // Set calling task's data to this task that it waits on, to double-link it. ((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in; } diff --git a/extmod/uasyncio/core.py b/extmod/uasyncio/core.py index 28b5e960ac..10a310809c 100644 --- a/extmod/uasyncio/core.py +++ b/extmod/uasyncio/core.py @@ -41,7 +41,7 @@ class SingletonGenerator: def __next__(self): if self.state is not None: - _task_queue.push_sorted(cur_task, self.state) + _task_queue.push(cur_task, self.state) self.state = None return None else: @@ -115,11 +115,11 @@ class IOQueue: # print('poll', s, sm, ev) if ev & ~select.POLLOUT and sm[0] is not None: # POLLIN or error - _task_queue.push_head(sm[0]) + _task_queue.push(sm[0]) sm[0] = None if ev & ~select.POLLIN and sm[1] is not None: # POLLOUT or error - _task_queue.push_head(sm[1]) + _task_queue.push(sm[1]) sm[1] = None if sm[0] is None and sm[1] is None: self._dequeue(s) @@ -142,7 +142,7 @@ def create_task(coro): if not hasattr(coro, "send"): raise TypeError("coroutine expected") t = Task(coro, globals()) - _task_queue.push_head(t) + _task_queue.push(t) return t @@ -167,7 +167,7 @@ def run_until_complete(main_task=None): _io_queue.wait_io_event(dt) # Get next task to run and continue it - t = _task_queue.pop_head() + t = _task_queue.pop() cur_task = t try: # Continue running the coroutine, it's responsible for rescheduling itself @@ -203,7 +203,7 @@ def run_until_complete(main_task=None): else: # Schedule any other tasks waiting on the completion of this task. while t.state.peek(): - _task_queue.push_head(t.state.pop_head()) + _task_queue.push(t.state.pop()) waiting = True # "False" indicates that the task is complete and has been await'ed on. t.state = False @@ -211,7 +211,7 @@ def run_until_complete(main_task=None): # An exception ended this detached task, so queue it for later # execution to handle the uncaught exception if no other task retrieves # the exception in the meantime (this is handled by Task.throw). - _task_queue.push_head(t) + _task_queue.push(t) # Save return value of coro to pass up to caller. t.data = er elif t.state is None: @@ -256,7 +256,7 @@ class Loop: def stop(): global _stop_task if _stop_task is not None: - _task_queue.push_head(_stop_task) + _task_queue.push(_stop_task) # If stop() is called again, do nothing _stop_task = None diff --git a/extmod/uasyncio/event.py b/extmod/uasyncio/event.py index 1954c80f54..c48904b983 100644 --- a/extmod/uasyncio/event.py +++ b/extmod/uasyncio/event.py @@ -17,7 +17,7 @@ class Event: # Note: This must not be called from anything except the thread running # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). while self.waiting.peek(): - core._task_queue.push_head(self.waiting.pop_head()) + core._task_queue.push(self.waiting.pop()) self.state = True def clear(self): @@ -26,7 +26,7 @@ class Event: async def wait(self): if not self.state: # Event not set, put the calling task on the event's waiting queue - self.waiting.push_head(core.cur_task) + self.waiting.push(core.cur_task) # Set calling task's data to the event's queue so it can be removed if needed core.cur_task.data = self.waiting yield diff --git a/extmod/uasyncio/funcs.py b/extmod/uasyncio/funcs.py index a76ab0d321..258948f73e 100644 --- a/extmod/uasyncio/funcs.py +++ b/extmod/uasyncio/funcs.py @@ -78,7 +78,7 @@ async def gather(*aws, return_exceptions=False): # Still some sub-tasks running. return # Gather waiting is done, schedule the main gather task. - core._task_queue.push_head(gather_task) + core._task_queue.push(gather_task) ts = [core._promote_to_task(aw) for aw in aws] for i in range(len(ts)): diff --git a/extmod/uasyncio/lock.py b/extmod/uasyncio/lock.py index bddca295b6..f50213d7c1 100644 --- a/extmod/uasyncio/lock.py +++ b/extmod/uasyncio/lock.py @@ -22,8 +22,8 @@ class Lock: raise RuntimeError("Lock not acquired") if self.waiting.peek(): # Task(s) waiting on lock, schedule next Task - self.state = self.waiting.pop_head() - core._task_queue.push_head(self.state) + self.state = self.waiting.pop() + core._task_queue.push(self.state) else: # No Task waiting so unlock self.state = 0 @@ -31,7 +31,7 @@ class Lock: async def acquire(self): if self.state != 0: # Lock unavailable, put the calling Task on the waiting queue - self.waiting.push_head(core.cur_task) + self.waiting.push(core.cur_task) # Set calling task's data to the lock's queue so it can be removed if needed core.cur_task.data = self.waiting try: diff --git a/extmod/uasyncio/task.py b/extmod/uasyncio/task.py index 94768b95a4..4ead2a1308 100644 --- a/extmod/uasyncio/task.py +++ b/extmod/uasyncio/task.py @@ -99,17 +99,14 @@ class TaskQueue: def peek(self): return self.heap - def push_sorted(self, v, key): + def push(self, v, key=None): assert v.ph_child is None assert v.ph_next is None v.data = None - v.ph_key = key + v.ph_key = key if key is not None else core.ticks() self.heap = ph_meld(v, self.heap) - def push_head(self, v): - self.push_sorted(v, core.ticks()) - - def pop_head(self): + def pop(self): v = self.heap assert v.ph_next is None self.heap = ph_pairing(v.ph_child) @@ -150,7 +147,7 @@ class Task: raise self.data else: # Put calling task on waiting queue. - self.state.push_head(core.cur_task) + self.state.push(core.cur_task) # Set calling task's data to this task that it waits on, to double-link it. core.cur_task.data = self @@ -171,10 +168,10 @@ class Task: if hasattr(self.data, "remove"): # Not on the main running queue, remove the task from the queue it's on. self.data.remove(self) - core._task_queue.push_head(self) + core._task_queue.push(self) elif core.ticks_diff(self.ph_key, core.ticks()) > 0: # On the main running queue but scheduled in the future, so bring it forward to now. core._task_queue.remove(self) - core._task_queue.push_head(self) + core._task_queue.push(self) self.data = core.CancelledError return True