diff --git a/ports/webassembly/Makefile b/ports/webassembly/Makefile index d0a8aa9924..93b92ef583 100644 --- a/ports/webassembly/Makefile +++ b/ports/webassembly/Makefile @@ -22,6 +22,9 @@ BUILD ?= build-$(VARIANT) include ../../py/mkenv.mk include $(VARIANT_DIR)/mpconfigvariant.mk +# Use the default frozen manifest, variants may override this. +FROZEN_MANIFEST ?= variants/manifest.py + # Qstr definitions (must come before including py.mk). QSTR_DEFS = qstrdefsport.h diff --git a/ports/webassembly/asyncio/__init__.py b/ports/webassembly/asyncio/__init__.py new file mode 100644 index 0000000000..ba1ca63514 --- /dev/null +++ b/ports/webassembly/asyncio/__init__.py @@ -0,0 +1,9 @@ +# MicroPython asyncio module, for use with webassembly port +# MIT license; Copyright (c) 2024 Damien P. George + +from .core import * +from .funcs import wait_for, wait_for_ms, gather +from .event import Event +from .lock import Lock + +__version__ = (3, 0, 0) diff --git a/ports/webassembly/asyncio/core.py b/ports/webassembly/asyncio/core.py new file mode 100644 index 0000000000..a128bb6055 --- /dev/null +++ b/ports/webassembly/asyncio/core.py @@ -0,0 +1,249 @@ +# MicroPython asyncio module, for use with webassembly port +# MIT license; Copyright (c) 2019-2024 Damien P. George + +from time import ticks_ms as ticks, ticks_diff, ticks_add +import sys, js, jsffi + +# Import TaskQueue and Task from built-in C code. +from _asyncio import TaskQueue, Task + + +################################################################################ +# Exceptions + + +class CancelledError(BaseException): + pass + + +class TimeoutError(Exception): + pass + + +# Used when calling Loop.call_exception_handler. +_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None} + + +################################################################################ +# Sleep functions + + +# "Yield" once, then raise StopIteration +class SingletonGenerator: + def __init__(self): + self.state = None + self.exc = StopIteration() + + def __iter__(self): + return self + + def __next__(self): + if self.state is not None: + _task_queue.push(cur_task, self.state) + self.state = None + return None + else: + self.exc.__traceback__ = None + raise self.exc + + +# Pause task execution for the given time (integer in milliseconds, uPy extension) +# Use a SingletonGenerator to do it without allocating on the heap +def sleep_ms(t, sgen=SingletonGenerator()): + if cur_task is None: + # Support top-level asyncio.sleep, via a JavaScript Promise. + return jsffi.async_timeout_ms(t) + assert sgen.state is None + sgen.state = ticks_add(ticks(), max(0, t)) + return sgen + + +# Pause task execution for the given time (in seconds) +def sleep(t): + return sleep_ms(int(t * 1000)) + + +################################################################################ +# Main run loop + +asyncio_timer = None + + +class ThenableEvent: + def __init__(self, thenable): + self.result = None # Result of the thenable + self.waiting = None # Task waiting on completion of this thenable + thenable.then(self.set) + + def set(self, value): + # Thenable/Promise is fulfilled, set result and schedule any waiting task. + self.result = value + if self.waiting: + _task_queue.push(self.waiting) + self.waiting = None + _schedule_run_iter(0) + + def remove(self, task): + self.waiting = None + + # async + def wait(self): + # Set the calling task as the task waiting on this thenable. + self.waiting = cur_task + # Set calling task's data to this object so it can be removed if needed. + cur_task.data = self + # Wait for the thenable to fulfill. + yield + # Return the result of the thenable. + return self.result + + +# Ensure the awaitable is a task +def _promote_to_task(aw): + return aw if isinstance(aw, Task) else create_task(aw) + + +def _schedule_run_iter(dt): + global asyncio_timer + if asyncio_timer is not None: + js.clearTimeout(asyncio_timer) + asyncio_timer = js.setTimeout(_run_iter, dt) + + +def _run_iter(): + global cur_task + excs_all = (CancelledError, Exception) # To prevent heap allocation in loop + excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop + while True: + # Wait until the head of _task_queue is ready to run + t = _task_queue.peek() + if t: + # A task waiting on _task_queue; "ph_key" is time to schedule task at + dt = max(0, ticks_diff(t.ph_key, ticks())) + else: + # No tasks can be woken so finished running + cur_task = None + return + + if dt > 0: + # schedule to call again later + cur_task = None + _schedule_run_iter(dt) + return + + # Get next task to run and continue it + t = _task_queue.pop() + cur_task = t + try: + # Continue running the coroutine, it's responsible for rescheduling itself + exc = t.data + if not exc: + t.coro.send(None) + else: + # If the task is finished and on the run queue and gets here, then it + # had an exception and was not await'ed on. Throwing into it now will + # raise StopIteration and the code below will catch this and run the + # call_exception_handler function. + t.data = None + t.coro.throw(exc) + except excs_all as er: + # Check the task is not on any event queue + assert t.data is None + # This task is done. + if t.state: + # Task was running but is now finished. + waiting = False + if t.state is True: + # "None" indicates that the task is complete and not await'ed on (yet). + t.state = None + elif callable(t.state): + # The task has a callback registered to be called on completion. + t.state(t, er) + t.state = False + waiting = True + else: + # Schedule any other tasks waiting on the completion of this task. + while t.state.peek(): + _task_queue.push(t.state.pop()) + waiting = True + # "False" indicates that the task is complete and has been await'ed on. + t.state = False + if not waiting and not isinstance(er, excs_stop): + # An exception ended this detached task, so queue it for later + # execution to handle the uncaught exception if no other task retrieves + # the exception in the meantime (this is handled by Task.throw). + _task_queue.push(t) + # Save return value of coro to pass up to caller. + t.data = er + elif t.state is None: + # Task is already finished and nothing await'ed on the task, + # so call the exception handler. + + # Save exception raised by the coro for later use. + t.data = exc + + # Create exception context and call the exception handler. + _exc_context["exception"] = exc + _exc_context["future"] = t + Loop.call_exception_handler(_exc_context) + + +# Create and schedule a new task from a coroutine. +def create_task(coro): + if not hasattr(coro, "send"): + raise TypeError("coroutine expected") + t = Task(coro, globals()) + _task_queue.push(t) + _schedule_run_iter(0) + return t + + +################################################################################ +# Event loop wrapper + + +cur_task = None + + +class Loop: + _exc_handler = None + + def create_task(coro): + return create_task(coro) + + def close(): + pass + + def set_exception_handler(handler): + Loop._exc_handler = handler + + def get_exception_handler(): + return Loop._exc_handler + + def default_exception_handler(loop, context): + print(context["message"], file=sys.stderr) + print("future:", context["future"], "coro=", context["future"].coro, file=sys.stderr) + sys.print_exception(context["exception"], sys.stderr) + + def call_exception_handler(context): + (Loop._exc_handler or Loop.default_exception_handler)(Loop, context) + + +def get_event_loop(): + return Loop + + +def current_task(): + if cur_task is None: + raise RuntimeError("no running event loop") + return cur_task + + +def new_event_loop(): + global _task_queue + _task_queue = TaskQueue() # TaskQueue of Task instances. + return Loop + + +# Initialise default event loop. +new_event_loop() diff --git a/ports/webassembly/objjsproxy.c b/ports/webassembly/objjsproxy.c index 65c806536d..15fbb57523 100644 --- a/ports/webassembly/objjsproxy.c +++ b/ports/webassembly/objjsproxy.c @@ -474,9 +474,29 @@ static mp_obj_t jsproxy_new_gen(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) { /******************************************************************************/ +#if MICROPY_PY_ASYNCIO +extern mp_obj_t mp_asyncio_context; +#endif + static mp_obj_t jsproxy_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) { mp_obj_jsproxy_t *self = MP_OBJ_TO_PTR(self_in); if (has_attr(self->ref, "then")) { + #if MICROPY_PY_ASYNCIO + // When asyncio is running and the caller here is a task, wrap the JavaScript + // thenable in a ThenableEvent, and get the task to wait on that event. This + // decouples the task from the thenable and allows cancelling the task. + if (mp_asyncio_context != MP_OBJ_NULL) { + mp_obj_t cur_task = mp_obj_dict_get(mp_asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task)); + if (cur_task != mp_const_none) { + mp_obj_t thenable_event_class = mp_obj_dict_get(mp_asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_ThenableEvent)); + mp_obj_t thenable_event = mp_call_function_1(thenable_event_class, self_in); + mp_obj_t dest[2]; + mp_load_method(thenable_event, MP_QSTR_wait, dest); + mp_obj_t wait_gen = mp_call_method_n_kw(0, 0, dest); + return mp_getiter(wait_gen, iter_buf); + } + } + #endif return jsproxy_new_gen(self_in, iter_buf); } else { return jsproxy_new_it(self_in, iter_buf); diff --git a/ports/webassembly/variants/manifest.py b/ports/webassembly/variants/manifest.py new file mode 100644 index 0000000000..b2ee8cd640 --- /dev/null +++ b/ports/webassembly/variants/manifest.py @@ -0,0 +1,24 @@ +# The asyncio package is built from the standard implementation but with the +# core scheduler replaced with a custom scheduler that uses the JavaScript +# runtime (with setTimeout an Promise's) to contrtol the scheduling. + +package( + "asyncio", + ( + "event.py", + "funcs.py", + "lock.py", + ), + base_path="$(MPY_DIR)/extmod", + opt=3, +) + +package( + "asyncio", + ( + "__init__.py", + "core.py", + ), + base_path="$(PORT_DIR)", + opt=3, +) diff --git a/ports/webassembly/variants/pyscript/manifest.py b/ports/webassembly/variants/pyscript/manifest.py index 0646e1d897..db088e70d0 100644 --- a/ports/webassembly/variants/pyscript/manifest.py +++ b/ports/webassembly/variants/pyscript/manifest.py @@ -1,3 +1,5 @@ +include("$(PORT_DIR)/variants/manifest.py") + require("abc") require("base64") require("collections") diff --git a/tests/ports/webassembly/asyncio_create_task.mjs b/tests/ports/webassembly/asyncio_create_task.mjs new file mode 100644 index 0000000000..e388ade4f1 --- /dev/null +++ b/tests/ports/webassembly/asyncio_create_task.mjs @@ -0,0 +1,44 @@ +// Test asyncio.create_task(), and tasks waiting on a Promise. + +const mp = await (await import(process.argv[2])).loadMicroPython(); + +globalThis.p0 = new Promise((resolve, reject) => { + resolve(123); +}); + +globalThis.p1 = new Promise((resolve, reject) => { + setTimeout(() => { + console.log("setTimeout resolved"); + resolve(456); + }, 200); +}); + +mp.runPython(` +import js +import asyncio + +async def task(id, promise): + print("task start", id) + print("task await", id, await promise) + print("task await", id, await promise) + print("task end", id) + +print("start") +t1 = asyncio.create_task(task(1, js.p0)) +t2 = asyncio.create_task(task(2, js.p1)) +print("t1", t1.done(), t2.done()) +print("end") +`); + +// Wait for p1 to fulfill so t2 can continue. +await globalThis.p1; + +// Wait a little longer so t2 can complete. +await new Promise((resolve, reject) => { + setTimeout(resolve, 10); +}); + +mp.runPython(` +print("restart") +print("t1", t1.done(), t2.done()) +`); diff --git a/tests/ports/webassembly/asyncio_create_task.mjs.exp b/tests/ports/webassembly/asyncio_create_task.mjs.exp new file mode 100644 index 0000000000..c1958bba59 --- /dev/null +++ b/tests/ports/webassembly/asyncio_create_task.mjs.exp @@ -0,0 +1,14 @@ +start +t1 False False +end +task start 1 +task start 2 +task await 1 123 +task await 1 123 +task end 1 +setTimeout resolved +task await 2 456 +task await 2 456 +task end 2 +restart +t1 True True diff --git a/tests/ports/webassembly/asyncio_sleep.mjs b/tests/ports/webassembly/asyncio_sleep.mjs new file mode 100644 index 0000000000..74d22ee1f8 --- /dev/null +++ b/tests/ports/webassembly/asyncio_sleep.mjs @@ -0,0 +1,25 @@ +// Test asyncio.sleep(), both at the top level and within a task. + +const mp = await (await import(process.argv[2])).loadMicroPython(); + +await mp.runPythonAsync(` +import time +import asyncio + +print("main start") +t0 = time.time() +await asyncio.sleep(0.25) +dt = time.time() - t0 +print(0.2 <= dt <= 0.3) + +async def task(): + print("task start") + t0 = time.time() + await asyncio.sleep(0.25) + dt = time.time() - t0 + print(0.2 <= dt <= 0.3) + print("task end") + +asyncio.create_task(task()) +print("main end") +`); diff --git a/tests/ports/webassembly/asyncio_sleep.mjs.exp b/tests/ports/webassembly/asyncio_sleep.mjs.exp new file mode 100644 index 0000000000..619ba175f6 --- /dev/null +++ b/tests/ports/webassembly/asyncio_sleep.mjs.exp @@ -0,0 +1,6 @@ +main start +True +main end +task start +True +task end diff --git a/tests/run-tests.py b/tests/run-tests.py index 4f55cdd398..8acdcd2b36 100755 --- a/tests/run-tests.py +++ b/tests/run-tests.py @@ -681,6 +681,17 @@ def run_tests(pyb, tests, args, result_dir, num_threads=1): elif args.target == "webassembly": skip_tests.add("basics/string_format_modulo.py") # can't print nulls to stdout skip_tests.add("basics/string_strip.py") # can't print nulls to stdout + skip_tests.add("extmod/asyncio_basic2.py") + skip_tests.add("extmod/asyncio_cancel_self.py") + skip_tests.add("extmod/asyncio_current_task.py") + skip_tests.add("extmod/asyncio_exception.py") + skip_tests.add("extmod/asyncio_gather_finished_early.py") + skip_tests.add("extmod/asyncio_get_event_loop.py") + skip_tests.add("extmod/asyncio_heaplock.py") + skip_tests.add("extmod/asyncio_loop_stop.py") + skip_tests.add("extmod/asyncio_new_event_loop.py") + skip_tests.add("extmod/asyncio_threadsafeflag.py") + skip_tests.add("extmod/asyncio_wait_for_fwd.py") skip_tests.add("extmod/binascii_a2b_base64.py") skip_tests.add("extmod/re_stack_overflow.py") skip_tests.add("extmod/time_res.py")