From 6b78a1bf00b4d95b018e64f84bc1f5a36502940e Mon Sep 17 00:00:00 2001 From: Damien George Date: Tue, 1 Aug 2023 22:54:52 +1000 Subject: [PATCH] tests/extmod: Add coverage tests for select module. Signed-off-by: Damien George --- tests/extmod/select_ipoll.py | 55 +++++++++++++++++ tests/extmod/select_ipoll.py.exp | 6 ++ tests/extmod/select_poll_basic.py | 1 + tests/extmod/select_poll_custom.py | 83 ++++++++++++++++++++++++++ tests/extmod/select_poll_custom.py.exp | 11 ++++ tests/extmod/select_poll_eintr.py | 47 +++++++++++++++ tests/extmod/select_poll_fd.py | 44 ++++++++++++++ 7 files changed, 247 insertions(+) create mode 100644 tests/extmod/select_ipoll.py create mode 100644 tests/extmod/select_ipoll.py.exp create mode 100644 tests/extmod/select_poll_custom.py create mode 100644 tests/extmod/select_poll_custom.py.exp create mode 100644 tests/extmod/select_poll_eintr.py create mode 100644 tests/extmod/select_poll_fd.py diff --git a/tests/extmod/select_ipoll.py b/tests/extmod/select_ipoll.py new file mode 100644 index 0000000000..0b661c11c8 --- /dev/null +++ b/tests/extmod/select_ipoll.py @@ -0,0 +1,55 @@ +# Test select.ipoll(). + +try: + import socket, select +except ImportError: + print("SKIP") + raise SystemExit + + +def print_poll_output(lst): + print([(type(obj), flags) for obj, flags in lst]) + + +poller = select.poll() + +# Use a new UDP socket for tests, which should be writable but not readable. +try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1]) +except OSError: + print("SKIP") + raise SystemExit + +poller.register(s) + +# Basic polling. +print_poll_output(poller.ipoll(0)) + +# Pass in flags=1 for one-shot behaviour. +print_poll_output(poller.ipoll(0, 1)) + +# Socket should be deregistered and poll should return nothing. +print_poll_output(poller.ipoll(0)) + +# Create a second socket. +s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s2.bind(socket.getaddrinfo("127.0.0.1", 8001)[0][-1]) + +# Register both sockets (to reset the first one). +poller.register(s) +poller.register(s2) + +# Basic polling with two sockets. +print_poll_output(poller.ipoll(0)) + +# Unregister the first socket, to test polling the remaining one. +poller.unregister(s) +print_poll_output(poller.ipoll(0)) + +# Unregister the second socket, to test polling none. +poller.unregister(s2) +print_poll_output(poller.ipoll(0)) + +s2.close() +s.close() diff --git a/tests/extmod/select_ipoll.py.exp b/tests/extmod/select_ipoll.py.exp new file mode 100644 index 0000000000..cbeabdce90 --- /dev/null +++ b/tests/extmod/select_ipoll.py.exp @@ -0,0 +1,6 @@ +[(, 4)] +[(, 4)] +[] +[(, 4), (, 4)] +[(, 4)] +[] diff --git a/tests/extmod/select_poll_basic.py b/tests/extmod/select_poll_basic.py index b36e16f018..0814d89ce9 100644 --- a/tests/extmod/select_poll_basic.py +++ b/tests/extmod/select_poll_basic.py @@ -16,6 +16,7 @@ poller.register(s) # "Registering a file descriptor that’s already registered is not an error, # and has the same effect as registering the descriptor exactly once." poller.register(s) +poller.register(s, select.POLLIN | select.POLLOUT) # 2 args are mandatory unlike register() try: diff --git a/tests/extmod/select_poll_custom.py b/tests/extmod/select_poll_custom.py new file mode 100644 index 0000000000..0cbb610327 --- /dev/null +++ b/tests/extmod/select_poll_custom.py @@ -0,0 +1,83 @@ +# Test custom pollable objects implemented in Python. + +from micropython import const + +try: + import socket, select, io +except ImportError: + print("SKIP") + raise SystemExit + +_MP_STREAM_POLL = const(3) +_MP_STREAM_GET_FILENO = const(10) + +_MP_STREAM_POLL_RD = const(0x0001) +_MP_STREAM_POLL_WR = const(0x0004) + + +def print_poll_output(lst): + print([(type(obj), flags) for obj, flags in lst]) + + +class CustomPollable(io.IOBase): + def __init__(self): + self.poll_state = 0 + + def ioctl(self, cmd, arg): + if cmd == _MP_STREAM_GET_FILENO: + # Bare-metal ports don't call this ioctl, so don't print it. + return -1 + + print("CustomPollable.ioctl", cmd, arg) + if cmd == _MP_STREAM_POLL: + if self.poll_state == "delay_rd": + self.poll_state = _MP_STREAM_POLL_RD + return 0 + elif self.poll_state < 0: + return self.poll_state + else: + return self.poll_state & arg + + +poller = select.poll() + +# Use a new UDP socket for tests, which should be writable but not readable. +try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1]) +except OSError: + print("SKIP") + raise SystemExit + +x = CustomPollable() + +# Register both a file-descriptor-based object and a custom pure-Python object. +poller.register(s) +poller.register(x) + +# Modify the flags for the custom object. +poller.modify(x, select.POLLIN) + +# Test polling. +print_poll_output(poller.poll(0)) +x.poll_state = _MP_STREAM_POLL_WR +print_poll_output(poller.poll(0)) +x.poll_state = _MP_STREAM_POLL_RD +print_poll_output(poller.poll(0)) + +# The custom object becomes readable only after being polled. +poller.modify(s, select.POLLIN) +x.poll_state = "delay_rd" +print_poll_output(poller.poll()) + +# The custom object returns an error. +x.poll_state = -1000 +try: + poller.poll(0) +except OSError as er: + print("OSError", er.errno) + +poller.unregister(x) +poller.unregister(s) + +s.close() diff --git a/tests/extmod/select_poll_custom.py.exp b/tests/extmod/select_poll_custom.py.exp new file mode 100644 index 0000000000..bcb4d83e2d --- /dev/null +++ b/tests/extmod/select_poll_custom.py.exp @@ -0,0 +1,11 @@ +CustomPollable.ioctl 3 1 +[(, 4)] +CustomPollable.ioctl 3 1 +[(, 4)] +CustomPollable.ioctl 3 1 +[(, 4), (, 1)] +CustomPollable.ioctl 3 1 +CustomPollable.ioctl 3 1 +[(, 1)] +CustomPollable.ioctl 3 1 +OSError 1000 diff --git a/tests/extmod/select_poll_eintr.py b/tests/extmod/select_poll_eintr.py new file mode 100644 index 0000000000..f53efcc83e --- /dev/null +++ b/tests/extmod/select_poll_eintr.py @@ -0,0 +1,47 @@ +# Test interruption of select.poll by EINTR signal, when +# MICROPY_PY_SELECT_POSIX_OPTIMISATIONS is enabled. + +try: + import time, gc, select, socket, _thread + + time.time_ns # Check for time_ns on MicroPython + select.poll # Raises AttributeError for CPython implementations without poll() +except (ImportError, AttributeError): + print("SKIP") + raise SystemExit + + +def thread_main(): + lock.acquire() + time.sleep(0.2) + print("thread gc start") + # The unix gc.collect() implementation will raise EINTR on other threads. + # Could possibly use _thread._interrupt_main() instead if MicroPython had it. + gc.collect() + print("thread gc end") + + +# Start a thread to interrupt the main thread during its call to poll. +lock = _thread.allocate_lock() +lock.acquire() +_thread.start_new_thread(thread_main, ()) + +# Use a new UDP socket for tests, which should be writable but not readable. +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1]) + +# Create the poller object. +poller = select.poll() +poller.register(s, select.POLLIN) + +# Poll on the UDP socket for a set timeout, which should be reached. +print("poll") +lock.release() +t0 = time.time_ns() +result = poller.poll(400) +dt_ms = (time.time_ns() - t0) / 1e6 +print("result:", result) +print("dt in range:", 380 <= dt_ms <= 500) + +# Clean up. +s.close() diff --git a/tests/extmod/select_poll_fd.py b/tests/extmod/select_poll_fd.py new file mode 100644 index 0000000000..fab9c0e0e9 --- /dev/null +++ b/tests/extmod/select_poll_fd.py @@ -0,0 +1,44 @@ +# Test select.poll in combination with file descriptors. + +try: + import select, errno + + select.poll # Raises AttributeError for CPython implementations without poll() +except (ImportError, AttributeError): + print("SKIP") + raise SystemExit + +# Check that poll supports registering file descriptors (integers). +try: + select.poll().register(0) +except OSError: + print("SKIP") + raise SystemExit + +# Register invalid file descriptor. +try: + select.poll().register(-1) +except ValueError: + print("ValueError") + +# Test polling stdout, it should be writable. +poller = select.poll() +poller.register(1) +poller.modify(1, select.POLLOUT) +print(poller.poll()) + +# Unregister then re-register. +poller.unregister(1) +poller.register(1, select.POLLIN) + +# Poll for input, should return an empty list. +print(poller.poll(0)) + +# Test registering a very large number of file descriptors. +poller = select.poll() +for fd in range(6000): + poller.register(fd) +try: + poller.poll() +except OSError as er: + print(er.errno == errno.EINVAL)