From a4390647749a5e9092f2639877546b61fc268766 Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sun, 10 Jun 2018 11:24:08 +0100 Subject: [PATCH] Add temporary dir uasyncio_iostream. --- uasyncio_iostream/__init__.py | 284 +++++++++++++++++++++ uasyncio_iostream/moduselect.c | 378 ++++++++++++++++++++++++++++ uasyncio_iostream/tests/aswitch.py | 191 ++++++++++++++ uasyncio_iostream/tests/auart.py | 25 ++ uasyncio_iostream/tests/auart_hd.py | 106 ++++++++ uasyncio_iostream/tests/iomiss.py | 54 ++++ uasyncio_iostream/tests/iotest1.py | 93 +++++++ uasyncio_iostream/tests/iotest2.py | 92 +++++++ uasyncio_iostream/tests/iotest3.py | 111 ++++++++ uasyncio_iostream/tests/iotest4.py | 108 ++++++++ 10 files changed, 1442 insertions(+) create mode 100644 uasyncio_iostream/__init__.py create mode 100644 uasyncio_iostream/moduselect.c create mode 100644 uasyncio_iostream/tests/aswitch.py create mode 100644 uasyncio_iostream/tests/auart.py create mode 100644 uasyncio_iostream/tests/auart_hd.py create mode 100644 uasyncio_iostream/tests/iomiss.py create mode 100644 uasyncio_iostream/tests/iotest1.py create mode 100644 uasyncio_iostream/tests/iotest2.py create mode 100644 uasyncio_iostream/tests/iotest3.py create mode 100644 uasyncio_iostream/tests/iotest4.py diff --git a/uasyncio_iostream/__init__.py b/uasyncio_iostream/__init__.py new file mode 100644 index 0000000..972814c --- /dev/null +++ b/uasyncio_iostream/__init__.py @@ -0,0 +1,284 @@ +import uerrno +import uselect as select +import usocket as _socket +from uasyncio.core import * + + +DEBUG = 0 +log = None + +def set_debug(val): + global DEBUG, log + DEBUG = val + if val: + import logging + log = logging.getLogger("uasyncio") + +# add_writer causes read failure if passed the same sock instance as was passed +# to add_reader. Cand we fix this by maintaining two object maps? +class PollEventLoop(EventLoop): + + def __init__(self, runq_len=16, waitq_len=16): + EventLoop.__init__(self, runq_len, waitq_len) + self.poller = select.poll() + self.rdobjmap = {} + self.wrobjmap = {} + self.flags = {} + + # Remove registration of sock for reading or writing. + def _unregister(self, sock, objmap, flag): + # If StreamWriter.awrite() wrote entire buf on 1st pass sock will never + # have been registered. So test for presence in .flags. + if id(sock) in self.flags: + flags = self.flags[id(sock)] + if flags & flag: # flag is currently registered + flags &= ~flag + if flags: + self.flags[id(sock)] = flags + self.poller.register(sock, flags) + else: + del self.flags[id(sock)] + self.poller.unregister(sock) + del objmap[id(sock)] + + # Additively register sock for reading or writing + def _register(self, sock, flag): + if id(sock) in self.flags: + self.flags[id(sock)] |= flag + else: + self.flags[id(sock)] = flag + self.poller.register(sock, self.flags[id(sock)]) + + def add_reader(self, sock, cb, *args): + if DEBUG and __debug__: + log.debug("add_reader%s", (sock, cb, args)) + self._register(sock, select.POLLIN) + if args: + self.rdobjmap[id(sock)] = (cb, args) + else: + self.rdobjmap[id(sock)] = cb + + def remove_reader(self, sock): + if DEBUG and __debug__: + log.debug("remove_reader(%s)", sock) + self._unregister(sock, self.rdobjmap, select.POLLIN) + + def add_writer(self, sock, cb, *args): + if DEBUG and __debug__: + log.debug("add_writer%s", (sock, cb, args)) + self._register(sock, select.POLLOUT) + if args: + self.wrobjmap[id(sock)] = (cb, args) + else: + self.wrobjmap[id(sock)] = cb + + def remove_writer(self, sock): + if DEBUG and __debug__: + log.debug("remove_writer(%s)", sock) + self._unregister(sock, self.wrobjmap, select.POLLOUT) + + def wait(self, delay): + if DEBUG and __debug__: + log.debug("poll.wait(%d)", delay) + # We need one-shot behavior (second arg of 1 to .poll()) + res = self.poller.ipoll(delay, 1) + #log.debug("poll result: %s", res) + for sock, ev in res: + if ev & select.POLLOUT: + cb = self.wrobjmap[id(sock)] + # Test code. Invalidate objmap: this ensures an exception is thrown + # rather than exhibiting weird behaviour when testing. + self.wrobjmap[id(sock)] = None # TEST + if DEBUG and __debug__: + log.debug("Calling IO callback: %r", cb) + if isinstance(cb, tuple): + cb[0](*cb[1]) + else: + cb.pend_throw(None) + self.call_soon(cb) + if ev & select.POLLIN: + cb = self.rdobjmap[id(sock)] + self.rdobjmap[id(sock)] = None # TEST + if ev & (select.POLLHUP | select.POLLERR): + # These events are returned even if not requested, and + # are sticky, i.e. will be returned again and again. + # If the caller doesn't do proper error handling and + # unregister this sock, we'll busy-loop on it, so we + # as well can unregister it now "just in case". + self.remove_reader(sock) + if DEBUG and __debug__: + log.debug("Calling IO callback: %r", cb) + if isinstance(cb, tuple): + cb[0](*cb[1]) + else: + cb.pend_throw(None) + self.call_soon(cb) + + +class StreamReader: + + def __init__(self, polls, ios=None): + if ios is None: + ios = polls + self.polls = polls + self.ios = ios + + def read(self, n=-1): + while True: + yield IORead(self.polls) + res = self.ios.read(n) + if res is not None: + break + # This should not happen for real sockets, but can easily + # happen for stream wrappers (ssl, websockets, etc.) + #log.warn("Empty read") + yield IOReadDone(self.polls) + return res + + def readexactly(self, n): + buf = b"" + while n: + yield IORead(self.polls) + res = self.ios.read(n) + assert res is not None + if not res: + break + buf += res + n -= len(res) + yield IOReadDone(self.polls) + return buf + + def readline(self): + if DEBUG and __debug__: + log.debug("StreamReader.readline()") + buf = b"" + while True: + yield IORead(self.polls) + res = self.ios.readline() + assert res is not None + if not res: + break + buf += res + if buf[-1] == 0x0a: + break + if DEBUG and __debug__: + log.debug("StreamReader.readline(): %s", buf) + yield IOReadDone(self.polls) + return buf + + def aclose(self): + yield IOReadDone(self.polls) + self.ios.close() + + def __repr__(self): + return "" % (self.polls, self.ios) + + +class StreamWriter: + + def __init__(self, s, extra): + self.s = s + self.extra = extra + + def awrite(self, buf, off=0, sz=-1): + # This method is called awrite (async write) to not proliferate + # incompatibility with original asyncio. Unlike original asyncio + # whose .write() method is both not a coroutine and guaranteed + # to return immediately (which means it has to buffer all the + # data), this method is a coroutine. + if sz == -1: + sz = len(buf) - off + if DEBUG and __debug__: + log.debug("StreamWriter.awrite(): spooling %d bytes", sz) + while True: + res = self.s.write(buf, off, sz) + # If we spooled everything, return immediately + if res == sz: + if DEBUG and __debug__: + log.debug("StreamWriter.awrite(): completed spooling %d bytes", res) + yield IOWriteDone(self.s) + return + if res is None: + res = 0 + if DEBUG and __debug__: + log.debug("StreamWriter.awrite(): spooled partial %d bytes", res) + assert res < sz + off += res + sz -= res + yield IOWrite(self.s) + #assert s2.fileno() == self.s.fileno() + if DEBUG and __debug__: + log.debug("StreamWriter.awrite(): can write more") + + # Write piecewise content from iterable (usually, a generator) + def awriteiter(self, iterable): + for buf in iterable: + yield from self.awrite(buf) + + def aclose(self): + yield IOWriteDone(self.s) + self.s.close() + + def get_extra_info(self, name, default=None): + return self.extra.get(name, default) + + def __repr__(self): + return "" % self.s + + +def open_connection(host, port, ssl=False): + if DEBUG and __debug__: + log.debug("open_connection(%s, %s)", host, port) + ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM) + ai = ai[0] + s = _socket.socket(ai[0], ai[1], ai[2]) + s.setblocking(False) + try: + s.connect(ai[-1]) + except OSError as e: + if e.args[0] != uerrno.EINPROGRESS: + raise + if DEBUG and __debug__: + log.debug("open_connection: After connect") + yield IOWrite(s) +# if __debug__: +# assert s2.fileno() == s.fileno() + if DEBUG and __debug__: + log.debug("open_connection: After iowait: %s", s) + if ssl: + print("Warning: uasyncio SSL support is alpha") + import ussl + s.setblocking(True) + s2 = ussl.wrap_socket(s) + s.setblocking(False) + return StreamReader(s, s2), StreamWriter(s2, {}) + return StreamReader(s), StreamWriter(s, {}) + + +def start_server(client_coro, host, port, backlog=10): + if DEBUG and __debug__: + log.debug("start_server(%s, %s)", host, port) + ai = _socket.getaddrinfo(host, port, 0, _socket.SOCK_STREAM) + ai = ai[0] + s = _socket.socket(ai[0], ai[1], ai[2]) + s.setblocking(False) + + s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1) + s.bind(ai[-1]) + s.listen(backlog) + while True: + if DEBUG and __debug__: + log.debug("start_server: Before accept") + yield IORead(s) + if DEBUG and __debug__: + log.debug("start_server: After iowait") + s2, client_addr = s.accept() + s2.setblocking(False) + if DEBUG and __debug__: + log.debug("start_server: After accept: %s", s2) + extra = {"peername": client_addr} + yield client_coro(StreamReader(s2), StreamWriter(s2, extra)) + + +import uasyncio.core +uasyncio.core._event_loop_class = PollEventLoop diff --git a/uasyncio_iostream/moduselect.c b/uasyncio_iostream/moduselect.c new file mode 100644 index 0000000..327013d --- /dev/null +++ b/uasyncio_iostream/moduselect.c @@ -0,0 +1,378 @@ +/* + * This file is part of the MicroPython project, http://micropython.org/ + * + * The MIT License (MIT) + * + * Copyright (c) 2014 Damien P. George + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "py/mpconfig.h" +#if MICROPY_PY_USELECT + +#include + +#include "py/runtime.h" +#include "py/obj.h" +#include "py/objlist.h" +#include "py/stream.h" +#include "py/mperrno.h" +#include "py/mphal.h" + +// Flags for poll() +#define FLAG_ONESHOT (1) + +/// \module select - Provides select function to wait for events on a stream +/// +/// This module provides the select function. + +typedef struct _poll_obj_t { + mp_obj_t obj; + mp_uint_t (*ioctl)(mp_obj_t obj, mp_uint_t request, mp_uint_t arg, int *errcode); + mp_uint_t flags; + mp_uint_t flags_ret; +} poll_obj_t; + +STATIC void poll_map_add(mp_map_t *poll_map, const mp_obj_t *obj, mp_uint_t obj_len, mp_uint_t flags, bool or_flags) { + for (mp_uint_t i = 0; i < obj_len; i++) { + mp_map_elem_t *elem = mp_map_lookup(poll_map, mp_obj_id(obj[i]), MP_MAP_LOOKUP_ADD_IF_NOT_FOUND); + if (elem->value == NULL) { + // object not found; get its ioctl and add it to the poll list + const mp_stream_p_t *stream_p = mp_get_stream_raise(obj[i], MP_STREAM_OP_IOCTL); + poll_obj_t *poll_obj = m_new_obj(poll_obj_t); + poll_obj->obj = obj[i]; + poll_obj->ioctl = stream_p->ioctl; + poll_obj->flags = flags; + poll_obj->flags_ret = 0; + elem->value = poll_obj; + } else { + // object exists; update its flags + if (or_flags) { + ((poll_obj_t*)elem->value)->flags |= flags; + } else { + ((poll_obj_t*)elem->value)->flags = flags; + } + } + } +} + +// poll each object in the map +STATIC mp_uint_t poll_map_poll(mp_map_t *poll_map, mp_uint_t *rwx_num) { + mp_uint_t n_ready = 0; + for (mp_uint_t i = 0; i < poll_map->alloc; ++i) { + if (!MP_MAP_SLOT_IS_FILLED(poll_map, i)) { + continue; + } + + poll_obj_t *poll_obj = (poll_obj_t*)poll_map->table[i].value; + int errcode; + mp_int_t ret = poll_obj->ioctl(poll_obj->obj, MP_STREAM_POLL, poll_obj->flags, &errcode); + poll_obj->flags_ret = ret; + + if (ret == -1) { + // error doing ioctl + mp_raise_OSError(errcode); + } + + if (ret != 0) { + // object is ready + n_ready += 1; + if (rwx_num != NULL) { + if (ret & MP_STREAM_POLL_RD) { + rwx_num[0] += 1; + } + if (ret & MP_STREAM_POLL_WR) { + rwx_num[1] += 1; + } + if ((ret & ~(MP_STREAM_POLL_RD | MP_STREAM_POLL_WR)) != 0) { + rwx_num[2] += 1; + } + } + } + } + return n_ready; +} + +/// \function select(rlist, wlist, xlist[, timeout]) +STATIC mp_obj_t select_select(uint n_args, const mp_obj_t *args) { + // get array data from tuple/list arguments + size_t rwx_len[3]; + mp_obj_t *r_array, *w_array, *x_array; + mp_obj_get_array(args[0], &rwx_len[0], &r_array); + mp_obj_get_array(args[1], &rwx_len[1], &w_array); + mp_obj_get_array(args[2], &rwx_len[2], &x_array); + + // get timeout + mp_uint_t timeout = -1; + if (n_args == 4) { + if (args[3] != mp_const_none) { + #if MICROPY_PY_BUILTINS_FLOAT + float timeout_f = mp_obj_get_float(args[3]); + if (timeout_f >= 0) { + timeout = (mp_uint_t)(timeout_f * 1000); + } + #else + timeout = mp_obj_get_int(args[3]) * 1000; + #endif + } + } + + // merge separate lists and get the ioctl function for each object + mp_map_t poll_map; + mp_map_init(&poll_map, rwx_len[0] + rwx_len[1] + rwx_len[2]); + poll_map_add(&poll_map, r_array, rwx_len[0], MP_STREAM_POLL_RD, true); + poll_map_add(&poll_map, w_array, rwx_len[1], MP_STREAM_POLL_WR, true); + poll_map_add(&poll_map, x_array, rwx_len[2], MP_STREAM_POLL_ERR | MP_STREAM_POLL_HUP, true); + + mp_uint_t start_tick = mp_hal_ticks_ms(); + rwx_len[0] = rwx_len[1] = rwx_len[2] = 0; + for (;;) { + // poll the objects + mp_uint_t n_ready = poll_map_poll(&poll_map, rwx_len); + + if (n_ready > 0 || (timeout != -1 && mp_hal_ticks_ms() - start_tick >= timeout)) { + // one or more objects are ready, or we had a timeout + mp_obj_t list_array[3]; + list_array[0] = mp_obj_new_list(rwx_len[0], NULL); + list_array[1] = mp_obj_new_list(rwx_len[1], NULL); + list_array[2] = mp_obj_new_list(rwx_len[2], NULL); + rwx_len[0] = rwx_len[1] = rwx_len[2] = 0; + for (mp_uint_t i = 0; i < poll_map.alloc; ++i) { + if (!MP_MAP_SLOT_IS_FILLED(&poll_map, i)) { + continue; + } + poll_obj_t *poll_obj = (poll_obj_t*)poll_map.table[i].value; + if (poll_obj->flags_ret & MP_STREAM_POLL_RD) { + ((mp_obj_list_t*)list_array[0])->items[rwx_len[0]++] = poll_obj->obj; + } + if (poll_obj->flags_ret & MP_STREAM_POLL_WR) { + ((mp_obj_list_t*)list_array[1])->items[rwx_len[1]++] = poll_obj->obj; + } + if ((poll_obj->flags_ret & ~(MP_STREAM_POLL_RD | MP_STREAM_POLL_WR)) != 0) { + ((mp_obj_list_t*)list_array[2])->items[rwx_len[2]++] = poll_obj->obj; + } + } + mp_map_deinit(&poll_map); + return mp_obj_new_tuple(3, list_array); + } + MICROPY_EVENT_POLL_HOOK + } +} +MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(mp_select_select_obj, 3, 4, select_select); + +/// \class Poll - poll class + +typedef struct _mp_obj_poll_t { + mp_obj_base_t base; + mp_map_t poll_map; + short iter_cnt; + short iter_idx; + int flags; + // callee-owned tuple + mp_obj_t ret_tuple; +} mp_obj_poll_t; + +/// \method register(obj[, eventmask]) +STATIC mp_obj_t poll_register(uint n_args, const mp_obj_t *args) { + mp_obj_poll_t *self = args[0]; + mp_uint_t flags; + if (n_args == 3) { + flags = mp_obj_get_int(args[2]); + } else { + flags = MP_STREAM_POLL_RD | MP_STREAM_POLL_WR; + } + poll_map_add(&self->poll_map, &args[1], 1, flags, false); + return mp_const_none; +} +MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(poll_register_obj, 2, 3, poll_register); + +/// \method unregister(obj) +STATIC mp_obj_t poll_unregister(mp_obj_t self_in, mp_obj_t obj_in) { + mp_obj_poll_t *self = self_in; + mp_map_lookup(&self->poll_map, mp_obj_id(obj_in), MP_MAP_LOOKUP_REMOVE_IF_FOUND); + // TODO raise KeyError if obj didn't exist in map + return mp_const_none; +} +MP_DEFINE_CONST_FUN_OBJ_2(poll_unregister_obj, poll_unregister); + +/// \method modify(obj, eventmask) +STATIC mp_obj_t poll_modify(mp_obj_t self_in, mp_obj_t obj_in, mp_obj_t eventmask_in) { + mp_obj_poll_t *self = self_in; + mp_map_elem_t *elem = mp_map_lookup(&self->poll_map, mp_obj_id(obj_in), MP_MAP_LOOKUP); + if (elem == NULL) { + mp_raise_OSError(MP_ENOENT); + } + ((poll_obj_t*)elem->value)->flags = mp_obj_get_int(eventmask_in); + return mp_const_none; +} +MP_DEFINE_CONST_FUN_OBJ_3(poll_modify_obj, poll_modify); + +STATIC mp_uint_t poll_poll_internal(uint n_args, const mp_obj_t *args) { + mp_obj_poll_t *self = args[0]; + + // work out timeout (its given already in ms) + mp_uint_t timeout = -1; + int flags = 0; + if (n_args >= 2) { + if (args[1] != mp_const_none) { + mp_int_t timeout_i = mp_obj_get_int(args[1]); + if (timeout_i >= 0) { + timeout = timeout_i; + } + } + if (n_args >= 3) { + flags = mp_obj_get_int(args[2]); + } + } + + self->flags = flags; + + mp_uint_t start_tick = mp_hal_ticks_ms(); + mp_uint_t n_ready; + for (;;) { + // poll the objects + n_ready = poll_map_poll(&self->poll_map, NULL); + if (n_ready > 0 || (timeout != -1 && mp_hal_ticks_ms() - start_tick >= timeout)) { + break; + } + MICROPY_EVENT_POLL_HOOK + } + + return n_ready; +} + +STATIC mp_obj_t poll_poll(uint n_args, const mp_obj_t *args) { + mp_obj_poll_t *self = args[0]; + mp_uint_t n_ready = poll_poll_internal(n_args, args); + + // one or more objects are ready, or we had a timeout + mp_obj_list_t *ret_list = mp_obj_new_list(n_ready, NULL); + n_ready = 0; + for (mp_uint_t i = 0; i < self->poll_map.alloc; ++i) { + if (!MP_MAP_SLOT_IS_FILLED(&self->poll_map, i)) { + continue; + } + poll_obj_t *poll_obj = (poll_obj_t*)self->poll_map.table[i].value; + if (poll_obj->flags_ret != 0) { + mp_obj_t tuple[2] = {poll_obj->obj, MP_OBJ_NEW_SMALL_INT(poll_obj->flags_ret)}; + ret_list->items[n_ready++] = mp_obj_new_tuple(2, tuple); + if (self->flags & FLAG_ONESHOT) { + // Don't poll next time, until new event flags will be set explicitly + poll_obj->flags &= ~(poll_obj->flags_ret & (MP_STREAM_POLL_RD | MP_STREAM_POLL_WR)); + } + } + } + return ret_list; +} +MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(poll_poll_obj, 1, 3, poll_poll); + +STATIC mp_obj_t poll_ipoll(size_t n_args, const mp_obj_t *args) { + mp_obj_poll_t *self = MP_OBJ_TO_PTR(args[0]); + + if (self->ret_tuple == MP_OBJ_NULL) { + self->ret_tuple = mp_obj_new_tuple(2, NULL); + } + + int n_ready = poll_poll_internal(n_args, args); + self->iter_cnt = n_ready; + self->iter_idx = 0; + + return args[0]; +} +MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(poll_ipoll_obj, 1, 3, poll_ipoll); + +STATIC mp_obj_t poll_iternext(mp_obj_t self_in) { + mp_obj_poll_t *self = MP_OBJ_TO_PTR(self_in); + + if (self->iter_cnt == 0) { + return MP_OBJ_STOP_ITERATION; + } + + self->iter_cnt--; + + for (mp_uint_t i = self->iter_idx; i < self->poll_map.alloc; ++i) { + self->iter_idx++; + if (!MP_MAP_SLOT_IS_FILLED(&self->poll_map, i)) { + continue; + } + poll_obj_t *poll_obj = (poll_obj_t*)self->poll_map.table[i].value; + if (poll_obj->flags_ret != 0) { + mp_obj_tuple_t *t = MP_OBJ_TO_PTR(self->ret_tuple); + t->items[0] = poll_obj->obj; + t->items[1] = MP_OBJ_NEW_SMALL_INT(poll_obj->flags_ret); + if (self->flags & FLAG_ONESHOT) { + // Don't poll next time, until new event flags will be set explicitly + poll_obj->flags &= ~(poll_obj->flags_ret & (MP_STREAM_POLL_RD | MP_STREAM_POLL_WR)); + } + return MP_OBJ_FROM_PTR(t); + } + } + + assert(!"inconsistent number of poll active entries"); + self->iter_cnt = 0; + return MP_OBJ_STOP_ITERATION; +} + +STATIC const mp_rom_map_elem_t poll_locals_dict_table[] = { + { MP_ROM_QSTR(MP_QSTR_register), MP_ROM_PTR(&poll_register_obj) }, + { MP_ROM_QSTR(MP_QSTR_unregister), MP_ROM_PTR(&poll_unregister_obj) }, + { MP_ROM_QSTR(MP_QSTR_modify), MP_ROM_PTR(&poll_modify_obj) }, + { MP_ROM_QSTR(MP_QSTR_poll), MP_ROM_PTR(&poll_poll_obj) }, + { MP_ROM_QSTR(MP_QSTR_ipoll), MP_ROM_PTR(&poll_ipoll_obj) }, +}; +STATIC MP_DEFINE_CONST_DICT(poll_locals_dict, poll_locals_dict_table); + +STATIC const mp_obj_type_t mp_type_poll = { + { &mp_type_type }, + .name = MP_QSTR_poll, + .getiter = mp_identity_getiter, + .iternext = poll_iternext, + .locals_dict = (void*)&poll_locals_dict, +}; + +/// \function poll() +STATIC mp_obj_t select_poll(void) { + mp_obj_poll_t *poll = m_new_obj(mp_obj_poll_t); + poll->base.type = &mp_type_poll; + mp_map_init(&poll->poll_map, 0); + poll->iter_cnt = 0; + poll->ret_tuple = MP_OBJ_NULL; + return poll; +} +MP_DEFINE_CONST_FUN_OBJ_0(mp_select_poll_obj, select_poll); + +STATIC const mp_rom_map_elem_t mp_module_select_globals_table[] = { + { MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_uselect) }, + { MP_ROM_QSTR(MP_QSTR_select), MP_ROM_PTR(&mp_select_select_obj) }, + { MP_ROM_QSTR(MP_QSTR_poll), MP_ROM_PTR(&mp_select_poll_obj) }, + { MP_ROM_QSTR(MP_QSTR_POLLIN), MP_ROM_INT(MP_STREAM_POLL_RD) }, + { MP_ROM_QSTR(MP_QSTR_POLLOUT), MP_ROM_INT(MP_STREAM_POLL_WR) }, + { MP_ROM_QSTR(MP_QSTR_POLLERR), MP_ROM_INT(MP_STREAM_POLL_ERR) }, + { MP_ROM_QSTR(MP_QSTR_POLLHUP), MP_ROM_INT(MP_STREAM_POLL_HUP) }, +}; + +STATIC MP_DEFINE_CONST_DICT(mp_module_select_globals, mp_module_select_globals_table); + +const mp_obj_module_t mp_module_uselect = { + .base = { &mp_type_module }, + .globals = (mp_obj_dict_t*)&mp_module_select_globals, +}; + +#endif // MICROPY_PY_USELECT diff --git a/uasyncio_iostream/tests/aswitch.py b/uasyncio_iostream/tests/aswitch.py new file mode 100644 index 0000000..b086633 --- /dev/null +++ b/uasyncio_iostream/tests/aswitch.py @@ -0,0 +1,191 @@ +# aswitch.py Switch and pushbutton classes for asyncio +# Delay_ms A retriggerable delay class. Can schedule a coro on timeout. +# Switch Simple debounced switch class for normally open grounded switch. +# Pushbutton extend the above to support logical state, long press and +# double-click events +# Tested on Pyboard but should run on other microcontroller platforms +# running MicroPython and uasyncio. + +# The MIT License (MIT) +# +# Copyright (c) 2017 Peter Hinch +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +try: + import asyncio_priority as asyncio +except ImportError: + import uasyncio as asyncio +import utime as time +from asyn import launch +# launch: run a callback or initiate a coroutine depending on which is passed. + + +class Delay_ms(object): + def __init__(self, func=None, args=(), can_alloc=True, duration=1000): + self.func = func + self.args = args + self.can_alloc = can_alloc + self.duration = duration # Default duration + self.tstop = None # Not running + self.loop = asyncio.get_event_loop() + if not can_alloc: + self.loop.create_task(self._run()) + + async def _run(self): + while True: + if self.tstop is None: # Not running + await asyncio.sleep_ms(0) + else: + await self.killer() + + def stop(self): + self.tstop = None + + def trigger(self, duration=0): # Update end time + if duration <= 0: + duration = self.duration + if self.can_alloc and self.tstop is None: # No killer task is running + self.tstop = time.ticks_add(time.ticks_ms(), duration) + # Start a task which stops the delay after its period has elapsed + self.loop.create_task(self.killer()) + self.tstop = time.ticks_add(time.ticks_ms(), duration) + + def running(self): + return self.tstop is not None + + async def killer(self): + twait = time.ticks_diff(self.tstop, time.ticks_ms()) + while twait > 0: # Must loop here: might be retriggered + await asyncio.sleep_ms(twait) + if self.tstop is None: + break # Return if stop() called during wait + twait = time.ticks_diff(self.tstop, time.ticks_ms()) + if self.tstop is not None and self.func is not None: + launch(self.func, self.args) # Timed out: execute callback + self.tstop = None # Not running + +class Switch(object): + debounce_ms = 50 + def __init__(self, pin): + self.pin = pin # Should be initialised for input with pullup + self._open_func = False + self._close_func = False + self.switchstate = self.pin.value() # Get initial state + loop = asyncio.get_event_loop() + loop.create_task(self.switchcheck()) # Thread runs forever + + def open_func(self, func, args=()): + self._open_func = func + self._open_args = args + + def close_func(self, func, args=()): + self._close_func = func + self._close_args = args + + # Return current state of switch (0 = pressed) + def __call__(self): + return self.switchstate + + async def switchcheck(self): + loop = asyncio.get_event_loop() + while True: + state = self.pin.value() + if state != self.switchstate: + # State has changed: act on it now. + self.switchstate = state + if state == 0 and self._close_func: + launch(self._close_func, self._close_args) + elif state == 1 and self._open_func: + launch(self._open_func, self._open_args) + # Ignore further state changes until switch has settled + await asyncio.sleep_ms(Switch.debounce_ms) + +class Pushbutton(object): + debounce_ms = 50 + long_press_ms = 1000 + double_click_ms = 400 + def __init__(self, pin): + self.pin = pin # Initialise for input + self._true_func = False + self._false_func = False + self._double_func = False + self._long_func = False + self.sense = pin.value() # Convert from electrical to logical value + self.buttonstate = self.rawstate() # Initial state + loop = asyncio.get_event_loop() + loop.create_task(self.buttoncheck()) # Thread runs forever + + def press_func(self, func, args=()): + self._true_func = func + self._true_args = args + + def release_func(self, func, args=()): + self._false_func = func + self._false_args = args + + def double_func(self, func, args=()): + self._double_func = func + self._double_args = args + + def long_func(self, func, args=()): + self._long_func = func + self._long_args = args + + # Current non-debounced logical button state: True == pressed + def rawstate(self): + return bool(self.pin.value() ^ self.sense) + + # Current debounced state of button (True == pressed) + def __call__(self): + return self.buttonstate + + async def buttoncheck(self): + loop = asyncio.get_event_loop() + if self._long_func: + longdelay = Delay_ms(self._long_func, self._long_args) + if self._double_func: + doubledelay = Delay_ms() + while True: + state = self.rawstate() + # State has changed: act on it now. + if state != self.buttonstate: + self.buttonstate = state + if state: + # Button is pressed + if self._long_func and not longdelay.running(): + # Start long press delay + longdelay.trigger(Pushbutton.long_press_ms) + if self._double_func: + if doubledelay.running(): + launch(self._double_func, self._double_args) + else: + # First click: start doubleclick timer + doubledelay.trigger(Pushbutton.double_click_ms) + if self._true_func: + launch(self._true_func, self._true_args) + else: + # Button release + if self._long_func and longdelay.running(): + # Avoid interpreting a second click as a long push + longdelay.stop() + if self._false_func: + launch(self._false_func, self._false_args) + # Ignore state changes until switch has settled + await asyncio.sleep_ms(Pushbutton.debounce_ms) diff --git a/uasyncio_iostream/tests/auart.py b/uasyncio_iostream/tests/auart.py new file mode 100644 index 0000000..8600529 --- /dev/null +++ b/uasyncio_iostream/tests/auart.py @@ -0,0 +1,25 @@ +# Test of uasyncio stream I/O using UART +# Author: Peter Hinch +# Copyright Peter Hinch 2017 Released under the MIT license +# Link X1 and X2 to test. + +import uasyncio as asyncio +from pyb import UART +uart = UART(4, 9600) + +async def sender(): + swriter = asyncio.StreamWriter(uart, {}) + while True: + await swriter.awrite('Hello uart\n') + await asyncio.sleep(2) + +async def receiver(): + sreader = asyncio.StreamReader(uart) + while True: + res = await sreader.readline() + print('Recieved', res) + +loop = asyncio.get_event_loop() +loop.create_task(sender()) +loop.create_task(receiver()) +loop.run_forever() diff --git a/uasyncio_iostream/tests/auart_hd.py b/uasyncio_iostream/tests/auart_hd.py new file mode 100644 index 0000000..da2b33e --- /dev/null +++ b/uasyncio_iostream/tests/auart_hd.py @@ -0,0 +1,106 @@ +# auart_hd.py +# Author: Peter Hinch +# Copyright Peter Hinch 2018 Released under the MIT license + +# Demo of running a half-duplex protocol to a device. The device never sends +# unsolicited messages. An example is a communications device which responds +# to AT commands. +# The master sends a message to the device, which may respond with one or more +# lines of data. The master assumes that the device has sent all its data when +# a timeout has elapsed. + +# In this test a physical device is emulated by the DEVICE class +# To test link X1-X4 and X2-X3 + +from pyb import UART +import uasyncio as asyncio +import aswitch + +# Dummy device waits for any incoming line and responds with 4 lines at 1 second +# intervals. +class DEVICE(): + def __init__(self, uart_no = 4): + self.uart = UART(uart_no, 9600) + self.loop = asyncio.get_event_loop() + self.swriter = asyncio.StreamWriter(self.uart, {}) + self.sreader = asyncio.StreamReader(self.uart) + loop = asyncio.get_event_loop() + loop.create_task(self._run()) + + async def _run(self): + responses = ['Line 1', 'Line 2', 'Line 3', 'Goodbye'] + while True: + res = await self.sreader.readline() + for response in responses: + await self.swriter.awrite("{}\r\n".format(response)) + # Demo the fact that the master tolerates slow response. + await asyncio.sleep_ms(300) + +# The master's send_command() method sends a command and waits for a number of +# lines from the device. The end of the process is signified by a timeout, when +# a list of lines is returned. This allows line-by-line processing. +# A special test mode demonstrates the behaviour with a non-responding device. If +# None is passed, no commend is sent. The master waits for a response which never +# arrives and returns an empty list. +class MASTER(): + def __init__(self, uart_no = 2, timeout=4000): + self.uart = UART(uart_no, 9600) + self.timeout = timeout + self.loop = asyncio.get_event_loop() + self.swriter = asyncio.StreamWriter(self.uart, {}) + self.sreader = asyncio.StreamReader(self.uart) + self.delay = aswitch.Delay_ms() + self.response = [] + loop = asyncio.get_event_loop() + loop.create_task(self._recv()) + + async def _recv(self): + while True: + res = await self.sreader.readline() + self.response.append(res) # Append to list of lines + self.delay.trigger(self.timeout) # Got something, retrigger timer + + async def send_command(self, command): + self.response = [] # Discard any pending messages + if command is None: + print('Timeout test.') + else: + await self.swriter.awrite("{}\r\n".format(command)) + print('Command sent:', command) + self.delay.trigger(self.timeout) # Re-initialise timer + while self.delay.running(): + await asyncio.sleep(1) # Wait for 4s after last msg received + return self.response + +async def test(): + print('This test takes 10s to complete.') + for cmd in ['Run', None]: + print() + res = await master.send_command(cmd) + # can use b''.join(res) if a single string is required. + if res: + print('Result is:') + for line in res: + print(line.decode('UTF8'), end='') + else: + print('Timed out waiting for result.') + +loop = asyncio.get_event_loop() +master = MASTER() +device = DEVICE() +loop.run_until_complete(test()) + +# Expected output +# >>> import auart_hd +# This test takes 10s to complete. +# +# Command sent: Run +# Result is: +# Line 1 +# Line 2 +# Line 3 +# Goodbye +# +# Timeout test. +# Timed out waiting for result. +# >>> diff --git a/uasyncio_iostream/tests/iomiss.py b/uasyncio_iostream/tests/iomiss.py new file mode 100644 index 0000000..02ef587 --- /dev/null +++ b/uasyncio_iostream/tests/iomiss.py @@ -0,0 +1,54 @@ +# iomiss.py Test for missed reads. The bug was fixed by disabling interrupts in +# ioctl(). +import io, pyb +import uasyncio as asyncio +import micropython +micropython.alloc_emergency_exception_buf(100) + +MP_STREAM_POLL = const(3) +MP_STREAM_POLL_RD = const(1) + +class MyIO(io.IOBase): + def __init__(self): + self.ready = False + self.count = 0 + tim = pyb.Timer(4) + tim.init(freq=1) + tim.callback(self.setready) + + def ioctl(self, req, arg): + if req == MP_STREAM_POLL and (arg & MP_STREAM_POLL_RD): + state = pyb.disable_irq() + r = self.ready + self.ready = False + pyb.enable_irq(state) + return r + return 0 + + def readline(self): + return '{}\n'.format(self.count) + + def setready(self, t): + self.count += 1 + self.ready = True + +myio = MyIO() + +async def receiver(): + last = None + nmissed = 0 + sreader = asyncio.StreamReader(myio) + while True: + res = await sreader.readline() + print('Recieved {} Missed {}'.format(res, nmissed)) + ires = int(res) + if last is not None: + if last != ires -1: + print('Missed {}'.format(ires - 1)) + nmissed += 1 + last = ires + +loop = asyncio.get_event_loop() +loop.create_task(receiver()) +loop.run_forever() + diff --git a/uasyncio_iostream/tests/iotest1.py b/uasyncio_iostream/tests/iotest1.py new file mode 100644 index 0000000..25c98f3 --- /dev/null +++ b/uasyncio_iostream/tests/iotest1.py @@ -0,0 +1,93 @@ +# iotest1.py Test PR #3836. User class write() performs unbuffered writing. + +import io, pyb +import uasyncio as asyncio +import micropython +micropython.alloc_emergency_exception_buf(100) + +MP_STREAM_POLL_RD = const(1) +MP_STREAM_POLL_WR = const(4) +MP_STREAM_POLL = const(3) +MP_STREAM_ERROR = const(-1) + +def printbuf(this_io): + for ch in this_io.wbuf[:this_io.wprint_len]: + print(chr(ch), end='') + +class MyIO(io.IOBase): + def __init__(self): + self.ready_rd = False + self.ready_wr = False + self.wbuf = bytearray(100) # Write buffer + self.wprint_len = 0 + self.widx = 0 + self.wch = b'' + self.rbuf = b'ready\n' # Read buffer + pyb.Timer(4, freq = 1, callback = self.do_input) + pyb.Timer(5, freq = 10, callback = self.do_output) + + # Read callback: emulate asynchronous input from hardware. + # Typically would put bytes into a ring buffer and set .ready_rd. + def do_input(self, t): + self.ready_rd = True # Data is ready to read + + # Write timer callback. Emulate hardware: if there's data in the buffer + # write some or all of it + def do_output(self, t): + if self.wch: + self.wbuf[self.widx] = self.wch + self.widx += 1 + if self.wch == ord('\n'): + self.wprint_len = self.widx # Save for schedule + micropython.schedule(printbuf, self) + self.widx = 0 + self.wch = b'' + + + def ioctl(self, req, arg): # see ports/stm32/uart.c + ret = MP_STREAM_ERROR + if req == MP_STREAM_POLL: + ret = 0 + if arg & MP_STREAM_POLL_RD: + if self.ready_rd: + ret |= MP_STREAM_POLL_RD + if arg & MP_STREAM_POLL_WR: + if not self.wch: + ret |= MP_STREAM_POLL_WR # Ready if no char pending + return ret + + def readline(self): + self.ready_rd = False + return self.rbuf + + def write(self, buf, off, sz): + self.wch = buf[off] # A real driver would trigger hardware to write a char + return 1 # No. of bytes written. uasyncio waits on ioctl write ready + +myio = MyIO() + +async def receiver(): + sreader = asyncio.StreamReader(myio) + while True: + res = await sreader.readline() + print('Received', res) + +async def sender(): + swriter = asyncio.StreamWriter(myio, {}) + await asyncio.sleep(5) + count = 0 + while True: + count += 1 + tosend = 'Wrote Hello MyIO {}\n'.format(count) + await swriter.awrite(tosend.encode('UTF8')) + # Once this has occurred reading stops. ioctl keeps being called with arg == 0 + # which normally occurs once only after a read + # IOWriteDone is never yielded: is this right? + await asyncio.sleep(2) + + +loop = asyncio.get_event_loop() +loop.create_task(receiver()) +loop.create_task(sender()) +loop.run_forever() + diff --git a/uasyncio_iostream/tests/iotest2.py b/uasyncio_iostream/tests/iotest2.py new file mode 100644 index 0000000..02c3ba0 --- /dev/null +++ b/uasyncio_iostream/tests/iotest2.py @@ -0,0 +1,92 @@ +# iotest2.py Test PR #3836. User class write() performs buffered writing. +# This works as expected. + +import io, pyb +import uasyncio as asyncio +import micropython +micropython.alloc_emergency_exception_buf(100) + +MP_STREAM_POLL_RD = const(1) +MP_STREAM_POLL_WR = const(4) +MP_STREAM_POLL = const(3) +MP_STREAM_ERROR = const(-1) + +def printbuf(this_io): + for ch in this_io.wbuf[:this_io.wprint_len]: + print(chr(ch), end='') + this_io.wbuf = b'' + +class MyIO(io.IOBase): + def __init__(self): + self.ready_rd = False + self.ready_wr = False + self.wbuf = b'' + self.wprint_len = 0 + self.ridx = 0 + self.rbuf = b'ready\n' # Read buffer + pyb.Timer(4, freq = 1, callback = self.do_input) + pyb.Timer(5, freq = 10, callback = self.do_output) + + # Read callback: emulate asynchronous input from hardware. + # Typically would put bytes into a ring buffer and set .ready_rd. + def do_input(self, t): + self.ready_rd = True # Data is ready to read + + # Write timer callback. Emulate hardware: if there's data in the buffer + # write some or all of it + def do_output(self, t): + if self.wbuf: + self.wprint_len = self.wbuf.find(b'\n') + 1 + micropython.schedule(printbuf, self) + + + def ioctl(self, req, arg): # see ports/stm32/uart.c +# print('ioctl', req, arg) + ret = MP_STREAM_ERROR + if req == MP_STREAM_POLL: + ret = 0 + if arg & MP_STREAM_POLL_RD: + if self.ready_rd: + ret |= MP_STREAM_POLL_RD + if arg & MP_STREAM_POLL_WR: + if not self.wch: + ret |= MP_STREAM_POLL_WR # Ready if no char pending + return ret + + # Test of device that produces one character at a time + def readline(self): + self.ready_rd = False + ch = self.rbuf[self.ridx] + if ch == ord('\n'): + self.ridx = 0 + else: + self.ridx += 1 + return chr(ch) + + def write(self, buf, off, sz): + self.wbuf = buf[:] + return sz # No. of bytes written. uasyncio waits on ioctl write ready + +myio = MyIO() + +async def receiver(): + sreader = asyncio.StreamReader(myio) + while True: + res = await sreader.readline() + print('Received', res) + +async def sender(): + swriter = asyncio.StreamWriter(myio, {}) + await asyncio.sleep(5) + count = 0 + while True: + count += 1 + tosend = 'Wrote Hello MyIO {}\n'.format(count) + await swriter.awrite(tosend.encode('UTF8')) + await asyncio.sleep(2) + +loop = asyncio.get_event_loop() +loop.create_task(receiver()) +loop.create_task(sender()) +loop.run_forever() + diff --git a/uasyncio_iostream/tests/iotest3.py b/uasyncio_iostream/tests/iotest3.py new file mode 100644 index 0000000..74fa44d --- /dev/null +++ b/uasyncio_iostream/tests/iotest3.py @@ -0,0 +1,111 @@ +# iotest3.py Test PR #3836. User class write() performs unbuffered writing. + +# This test was to demonstrate the workround to the original issue by having +# separate read and write classes. +# With modified moduselect.c and uasyncio.__init__.py the test is probably +# irrelevant. + +import io, pyb +import uasyncio as asyncio +import micropython +micropython.alloc_emergency_exception_buf(100) + +MP_STREAM_POLL_RD = const(1) +MP_STREAM_POLL_WR = const(4) +MP_STREAM_POLL = const(3) +MP_STREAM_ERROR = const(-1) + +class MyIOR(io.IOBase): + def __init__(self): + self.ready_rd = False + self.rbuf = b'ready\n' # Read buffer + pyb.Timer(4, freq = 1, callback = self.do_input) + + # Read callback: emulate asynchronous input from hardware. + # Typically would put bytes into a ring buffer and set .ready_rd. + def do_input(self, t): + self.ready_rd = True # Data is ready to read + + def ioctl(self, req, arg): # see ports/stm32/uart.c + ret = MP_STREAM_ERROR + if req == MP_STREAM_POLL: + ret = 0 + if not arg: + print('ioctl arg 0') + if arg & MP_STREAM_POLL_RD: + if self.ready_rd: + ret |= MP_STREAM_POLL_RD + return ret + + def readline(self): + self.ready_rd = False + return self.rbuf + +# MyIOW emulates a write-only device which can only handle one character at a +# time. The write() method is called by uasyncio. A real driver would cause the +# hardware to write a character. By setting .wch it causes the ioctl to report +# a not ready status. +# Some time later an asynchronous event occurs, indicating that the hardware +# has written a character and is ready for another. In this demo this is done +# by the timer callback do_output(), which clears .wch so that ioctl returns +# a ready status. For the demo it stores the characters in .wbuf for printing. + +def printbuf(this_io): + for x in range(this_io.wprint_len): + print(chr(this_io.wbuf[x]), end = '') + +class MyIOW(io.IOBase): + def __init__(self): + self.wbuf = bytearray(20) # Buffer for printing + self.wprint_len = 0 + self.widx = 0 + self.wch = b'' + wtim = pyb.Timer(5, freq = 10, callback = self.do_output) + + # Write timer callback. Emulate hardware: if there's data in the buffer + # write some or all of it + def do_output(self, t): + if self.wch: + self.wbuf[self.widx] = self.wch + self.widx += 1 + if self.wch == ord('\n'): + self.wprint_len = self.widx # Save for schedule + micropython.schedule(printbuf, self) + self.widx = 0 + self.wch = b'' + + def ioctl(self, req, arg): # see ports/stm32/uart.c + ret = MP_STREAM_ERROR + if req == MP_STREAM_POLL: + ret = 0 + if arg & MP_STREAM_POLL_WR: + if not self.wch: + ret |= MP_STREAM_POLL_WR # Ready if no char pending + return ret + + def write(self, buf, off, sz): + self.wch = buf[off] # A real driver would trigger hardware to write a char + return 1 # No. of bytes written. uasyncio waits on ioctl write ready + +myior = MyIOR() +myiow = MyIOW() + +async def receiver(): + sreader = asyncio.StreamReader(myior) + while True: + res = await sreader.readline() + print('Received', res) + +async def sender(): + swriter = asyncio.StreamWriter(myiow, {}) + count = 0 + while True: + count += 1 + tosend = 'Wrote Hello MyIO {}\n'.format(count) + await swriter.awrite(tosend.encode('UTF8')) + await asyncio.sleep(2) + +loop = asyncio.get_event_loop() +loop.create_task(receiver()) +loop.create_task(sender()) +loop.run_forever() diff --git a/uasyncio_iostream/tests/iotest4.py b/uasyncio_iostream/tests/iotest4.py new file mode 100644 index 0000000..d28ea06 --- /dev/null +++ b/uasyncio_iostream/tests/iotest4.py @@ -0,0 +1,108 @@ +# iotest4.py Test PR #3836. +# User class write() performs unbuffered writing. +# For simplicity this uses buffered read: unbuffered is tested by iotest2.py. + +# This test was to demonstrate the original issue. +# With modified moduselect.c and uasyncio.__init__.py the test now passes. + +# iotest4.test() uses separate read and write objects. +# iotest4.test(False) uses a common object (failed without the mod). + + +import io, pyb +import uasyncio as asyncio +import micropython +micropython.alloc_emergency_exception_buf(100) + +MP_STREAM_POLL_RD = const(1) +MP_STREAM_POLL_WR = const(4) +MP_STREAM_POLL = const(3) +MP_STREAM_ERROR = const(-1) + +def printbuf(this_io): + for ch in this_io.wbuf[:this_io.wprint_len]: + print(chr(ch), end='') + +class MyIO(io.IOBase): + def __init__(self, read=False, write=False): + self.ready_rd = False # Read and write not ready + self.wch = b'' + if read: + self.rbuf = b'ready\n' # Read buffer + pyb.Timer(4, freq = 1, callback = self.do_input) + if write: + self.wbuf = bytearray(100) # Write buffer + self.wprint_len = 0 + self.widx = 0 + pyb.Timer(5, freq = 10, callback = self.do_output) + + # Read callback: emulate asynchronous input from hardware. + # Typically would put bytes into a ring buffer and set .ready_rd. + def do_input(self, t): + self.ready_rd = True # Data is ready to read + + # Write timer callback. Emulate hardware: if there's data in the buffer + # write some or all of it + def do_output(self, t): + if self.wch: + self.wbuf[self.widx] = self.wch + self.widx += 1 + if self.wch == ord('\n'): + self.wprint_len = self.widx # Save for schedule + micropython.schedule(printbuf, self) + self.widx = 0 + self.wch = b'' + + + def ioctl(self, req, arg): # see ports/stm32/uart.c + ret = MP_STREAM_ERROR + if req == MP_STREAM_POLL: + ret = 0 + if arg & MP_STREAM_POLL_RD: + if self.ready_rd: + ret |= MP_STREAM_POLL_RD + if arg & MP_STREAM_POLL_WR: + if not self.wch: + ret |= MP_STREAM_POLL_WR # Ready if no char pending + return ret + + # Emulate a device with buffered read. Return the buffer, falsify read ready + # Read timer sets ready. + def readline(self): + self.ready_rd = False + return self.rbuf + + # Emulate unbuffered hardware which writes one character: uasyncio waits + # until hardware is ready for the next. Hardware ready is emulated by write + # timer callback. + def write(self, buf, off, sz): + self.wch = buf[off] # Hardware starts to write a char + return 1 # 1 byte written. uasyncio waits on ioctl write ready + +async def receiver(myior): + sreader = asyncio.StreamReader(myior) + while True: + res = await sreader.readline() + print('Received', res) + +async def sender(myiow): + swriter = asyncio.StreamWriter(myiow, {}) + await asyncio.sleep(5) + count = 0 + while True: + count += 1 + tosend = 'Wrote Hello MyIO {}\n'.format(count) + await swriter.awrite(tosend.encode('UTF8')) + await asyncio.sleep(2) + +def test(good=True): + if good: + myior = MyIO(read=True) + myiow = MyIO(write=True) + else: + myior = MyIO(read=True, write=True) + myiow = myior + loop = asyncio.get_event_loop() + loop.create_task(receiver(myior)) + loop.create_task(sender(myiow)) + loop.run_forever()