diff --git a/README.md b/README.md index d6e49e4..0215564 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,11 @@ After 100 messages reports maximum and minimum delays. `conn.py` Connect in station mode using saved connection details where possible. +# resilient + +A guide to writing reliable ESP8266 networking code. Probably applies to other +WiFi connected MicroPython devices, but reliable ones are thin on the ground. + # Rotary Incremental Encoder Classes for handling incremental rotary position encoders. Note that the Pyboard diff --git a/resilient/README.md b/resilient/README.md new file mode 100644 index 0000000..fffbbda --- /dev/null +++ b/resilient/README.md @@ -0,0 +1,289 @@ +# Resilient MicroPython WiFi code + +The following is based on experience with the ESP8266. It aims to show how +to design responsive bidirectional networking applications which are resilent: +they recover from WiFi and server outages and are capable of long term running +without crashing. + +It is possible to write resilient code for ESP8266, but little existing code +takes account of the properties of wireless links and the limitations of the +hardware. On bare metal, in the absence of an OS, it is necessary to detect +outages and initiate recovery to ensure that consistent program state is +maintained and to avoid crashes and `LmacRxBlk` errors. + +Radio links are inherently unreliable. They can be disrupted by sporadic RF +interference, especially near the limits of range. A mobile device such as a +robot can move slowly out of range and then back in again. The access point +(AP) can suffer an outage as can the application code at the other end of the +link. An application intended for long term running on a WiFi connected device +should be able to recover from such events. Brief outages are common. In a +house whose WiFi is reliable as experienced on normal devices, outages occur +at a rate of around 20 per day. + +The brute-force approach of a hardware watchdog timer has merit for recovering +from crashes but the use of a hard reset implies the loss of program state. A +hardware or software watchdog does not remove the need to perform continuous +monitoring of connectivity. In the event of an outage code may continue to run +feeding the watchdog; when the outage ends the ESP8266 will reconnect but the +application will be in an arbitrary state. Further, sockets may be left open +leading to `LmacRxBlk` errors and crashes. + +# 1. Abstract + +Many applications keep sockets open for long periods during which connectivity +may temporarily be lost. The socket may raise an exception but this is not +guaranteed: in cases of WiFi outage, loss of connectivity cannot be determined +from the socket state. + +Detecting an outage is vital to ensure sockets are closed and to enable code at +both endpoints to initiate recovery; also to avoid crashes caused by writing +to a socket whose counterpart is unavailable. + +It seems that the only sure way to detect an outage is for each endpoint +regularly to send data, and for the receiving endpoint to implement a read +timeout. + +Failure correctly to detect and recover from WiFi disruption is a major cause +of unreliability in ESP8266 applications. + +A demo is provided of a system where multiple ESP8266 clients communicate with +a wired server with low latency full duplex links. This has run for extended +periods with mutiple clients without issue. The demo is intended to illustrate +the minimum requirements for a resilient system. + +# 2. Hardware + +There are numerous poor quality ESP8266 boards. There can also be issues caused +by inadequate power supplies. I have found the following to be bomb-proof: + 1. [Adafruit Feather Huzzah](https://www.adafruit.com/product/2821) + 2. [Adafruit Huzzah](https://www.adafruit.com/product/2471) + 3. [WeMos D1 Mini](https://wiki.wemos.cc/products:d1:d1_mini) My testing was + on an earlier version with the metal cased ESP8266. + +# 3. Introduction + +I became aware of the issue when running the official umqtt clients on an +ESP8266. Despite being one room away from the AP the connection seldom stayed +up for more than an hour or two. This in a house where WiFi as percieved by +PC's and other devices is rock-solid. Subsequent tests using the code in this +repo have demonstrated that brief outages are frequent. + +I developed a [resilient MQTT driver](https://github.com/peterhinch/micropython-mqtt.git) +which is capable of recovering from WiFi outages. This is rather complex, in +part because of the requirements of MQTT. + +The demo code in this repo aims to establish the minimum requirements for a +resilient bidirectional link between an application on a wired server and a +client on an ESP8266. If a loss of connectivity occurs for any reason, +communication pauses for the duration, resuming when the link is restored. + +# 4. Application design + +The two problems which must be solved are detection of an outage and ensuring +that, when the outage ends, both endpoint applications can resume without loss +of program state. + +While an ESP8266 can detect a local loss of WiFi connectivity detection of link +deterioration or of failure of the remote endpoint is more difficult. + +To enable a WiFi device to cope with outages there are three approaches of +increasing sophistication. + + 1. Brief connection: the device code runs an infinite loop. It periodically + waits for WiFi availability, connects to the remote, does its job and + disconnects. The hope is that WiFi failure during the brief period of + connection is unlikely. Program state is maintained. Advantage: outage + detection is avoided. Drawbacks: unlikely is not impossible. The device cannot + respond quickly to data from the remote + 2. Hard reset: this implies detecting in code an outage of WiFi or of the + remote and triggering a hard reset. This implies a loss of program state. + 3. Resilient connection. This is the approach discussed here, where an outage + is detected. The code on each endpoint recovers when connectivity resumes. + Program state after recovery is consistent. + +In the first two options the remote endpoint loops: it waits for a connection, +acquires the data, then closes the connection. + +## 4.1 Outage detection + +At low level communication is via sockets linking two endpoints. In the case +under discussion the endpoints are on physically separate hardware, at least +one device being physically connected by WiFi. Each endpoint has a socket +instance with both sharing a port. If one endpoint closes its socket, the other +gets an exception which should be handled appropriately - especially by closing +its socket. + +Based on experience with the ESP8266, WiFi failures seldom cause exceptions to +be thrown. Consider a nonblocking socket performing reads from a device. In an +outage the socket will behave in the same way as during periods when it waits +for data to arrive. During an outage, writes to a nonblocking socket will +proceed normally until the ESP8266 buffers fill, provoking the dreaded +`LmacRxBlk:1` messages. + +The `isconnected()` method is inadequate for detecting outages as it is a +property of the interface rather than the link. If two WiFi devices are +communicating, one may lose `isconnected()` owing to local radio conditions. If +the other end tried to assess connectivity with `isconnected()` it would +incorrectly conclude that there was no problem. Further, the method is unable +to detect outages caused by program failure on the remote endpoint. + +The only reliable way to detect loss of connectivity appears to be by means of +timeouts, in particuar on socket reads. To keep a link open a minimum interval +between data writes must be enforced. The endpoint performing the read times +the interval between successful reads: if this exceeds a threshold the link is +presumed to have died and a recovery process initiated. + +This implies that WiFi applications which only send data cannot reliably deal +with outages: to create a resilient link both ends need to wait on a read while +checking for a timeout. A device whose network connection is via WiFi can +sometimes get early notification of an outage with `isconnected()` but this is +only an adjunct to the read timeout. + +When a wireless device detects an outage it should ensure that the other end of +the link also detects it so that sockets may be closed and connectivity may be +restored when the WiFi recovers. This means that it avoid sending data for a +period greater than the timeout period. + +A further requirement for ESP8266 is to limit the amount of data put into a +socket while the remote endpoint is down: excessive data quantities can provoke +`LmacRxBlk` errors. I have not quantified this, but in general if N packets are +sent in each timeout interval there will be a maximum pemissible size for a +packet. The timeout interval will therefore be constrained by the maximum +throughput required. + +## 4.2 Timeout value + +The demo uses timeouts measured in seconds, enabling prompt recovery from +outages. The assumption is that all devices share a local network. If the +server is on the internet longer timeouts will be required. + +To preserve reliability the amount of data sent during the timeout period must +be controlled. If connectivity is lost immediately after a keepalive, the loss +will be undetected until the timeout has elapsed. Any data sent during that +period will be buffered by the ESP8266 vendor code. Too much will lead to +`LmacRxBlk` and probable crashes. What constitutes "excessive" is moot: +experimentation is required. + +## 4.3 Recovery + +The demo system employs the following procedure for recovering from outages. +The wirelessly connected client behaves as follows. + +All coroutines accessing the interface are cancelled, and all open sockets are +closed: this is essential to avoid `LmacRxBlk:1` messages and crashes. The WiFi +connection is downed. + +The client then periodically attempts to reconnect to WiFi. On success it +checks that local WiFi connectivity remains good for a period of double the +timeout. During this period no attempt is made to send or receive data. This +ensures that the remote device will also detect an outage and close its +sockets. The procedure also establishes confidence that the WiFi as seen by the +client is stable. + +At the end of this period the client attempts to re-establish the connection, +repeating the recovery procedure on failure. The server responds to the loss of +connectivity by closing the connection and the sockets. It responds to the +reconnection as per a new connection. + +# 5. Demo system + +This demo is of a minimal system based on nonblocking sockets. It is responsive +in that each endpoint can respond immediately to a packet from its counterpart. +WiFi connected clients can run indefinitely near the limit of wireless range; +they automatically recover from outages of the WiFi and of the remote endpoint. + +The application scenario is of multiple wirelessly connected clients, each +communicating with its own application object running on a wired server. +Communication is asynchronous and full-duplex (i.e. communication is +bidirectional and can be initiated asynchronously by either end of the link). + +A data packet is a '\n' terminated line of text. Blank lines are reserved for +keepalive packets. The demo application uses JSON to serialise and exchange +arbitrary Python objects. + +The demo comprises the following files: + 1. `server.py` A server for MicroPython Unix build on a wired network + connection. + 2. `application.py` Server-side application demo. + 3. `client_w.py` A client for ESP8266. + 4. `client_id.py` Each client must have a unique ID provided by this file. + Also holds server IP and port number. + 5. `primitives.py` A stripped down version of + [asyn.py](https://github.com/peterhinch/micropython-async/blob/master/asyn.py) + This is used by server and client. The aim is RAM saving on ESP8266. + +## 5.1 The client + +The principal purpose of the demo is to expose the client code. A more usable +version could be written where the boilerplate code was separated from the +application code, and I will do this. This version deliberately lays bare its +workings for study. + +It is started by instantiating a `Client` object. The constructor assumes +that the ESP8266 will auto-connect to an existing network. It starts a `run()` +coroutine which executes an infinite loop, initially waiting for a WiFi +connection. It then launches `reader` and `writer` coroutines. The `writer` +coro periodically sends a JSON encoded list, and the remote endpoint does +likewise. + +The client's `readline()` function times out after 1.5 seconds, issuing an +`OSError`. If this occurs, the `reader` coro terminates clearing the `.rok` +(reader OK) flag. This causes the `run()` to terminate the `writer` +(and `_keepalive`) coros. When all coros have died, `run()` downs the WiFi for +double the timeout period to ensure that the remote will detect and respond to +the outage. The loop then repeats, attempting again to establish a connection. + +The `writer` coro has similar logic ensuring that if it encounters an error the +other coros will be terminated. + +Both `reader` and `writer` start by instantiating a socket and connecting to +the appropriate port. The socket is set to nonblocking and the unique client ID +(retrieved from `client_id.py`) is sent to the server. This enables the server +to associate a connection with a specific client. + +When `writer` has connected to the server it starts the `_keepalive` method: +this sends a blank line at a rate guaranteed to ensure that at least one will +be sent every timeout interval. + +The server also sends a blank line priodically. This serves to reset the +timeout on the `readline()` method the client, preventing a timeout from +occuring. Thus outage detection is effectively transparent: client and server +applications can send data at any rate. + +## 5.2 The application + +In this demo the application is assumed to reside on the server machine. This +enables a substantial simplification with the timeout, keepalive and error +handling being devolved to the server. + +In this demo upto 4 clients with ID's 1-4 are each served by an instance of +`App`. However they could equally be served by different application classes. +When an `App` is instantiated the `start()` coro runs which waits for the +server to establish a connection with the correct client. It retrieves the +connection, starts `reader` and `writer` coros, and quits. + +The `reader` and `writer` coros need take no account of link status. They +communicate using server `readline()` and `write()` methods which will pause +for the duration of any outage. + +## 5.3 The server + +The application starts the server by launching the `server.run` coro. The +argument defines the read timeout value which should be the same as that on the +client. The value (in ms) determines the keepalive rate and the minimum +downtime of an outage. + +The `server.run` coro runs forever awaiting incoming connections. When one +occurs a socket is created and a line containing the client ID is read. If the +client ID is not in the dictionary of clients (`Connection.connections`) a +`Connection` is instantiated for that client and placed in the dictionary. On +subsequent connections of that client, the `Connection` will be retrieved from +the dictionary. This is done by classmethod `Connection.go`, which assigns the +socket to that `Connection` instance. + +The server provides `readline` and `write` methods. In the event of an outage +they will pause for the duration. Message transmission is not guaranteed: if an +outage occurs after tansmission has commenced, the message will be lost. + +In testing through hundreds of outages, no instances of corrupted or partial +messages occurred. Presumably TCP/IP ensures this. diff --git a/resilient/application.py b/resilient/application.py new file mode 100644 index 0000000..65099ad --- /dev/null +++ b/resilient/application.py @@ -0,0 +1,64 @@ +# application.py + +# Released under the MIT licence. +# Copyright (C) Peter Hinch 2018 + +# The App class emulates a user application intended to service a single +# client. In this case we have four instances of the application servicing +# clients with ID's 1-4. + +import uasyncio as asyncio +loop = asyncio.get_event_loop(runq_len=32, waitq_len=32) +import ujson +import server + +class App(): + def __init__(self, client_id): + self.client_id = client_id + self.data = [0, 0] # Exchange a 2-list with remote + loop = asyncio.get_event_loop() + loop.create_task(self.start(loop)) + + async def start(self, loop): + print('Client {} Awaiting connection.'.format(self.client_id)) + conn = None + while conn is None: + await asyncio.sleep_ms(100) + conn = server.client_conn(self.client_id) + loop.create_task(self.reader(conn)) + loop.create_task(self.writer(conn)) + + async def reader(self, conn): + print('Started reader') + while True: + # Attempt to read data: server times out if none arrives in timeout + # period closing the Connection. .readline() pauses until the + # connection is re-established. + line = await conn.readline() + self.data = ujson.loads(line) + # Receives [restart count, uptime in secs] + print('Got', self.data, 'from remote', self.client_id) + + # Send [approx application uptime in secs, received client uptime] + async def writer(self, conn): + print('Started writer') + count = 0 + while True: + self.data[0] = count + count += 1 + print('Sent', self.data, 'to remote', self.client_id) + print() + # .write() behaves as per .readline() + await conn.write('{}\n'.format(ujson.dumps(self.data))) + await asyncio.sleep(5) + + +clients = [App(n) for n in range(1, 5)] # Accept 4 clients with ID's 1-4 +try: + loop.run_until_complete(server.run(timeout=1500)) +except KeyboardInterrupt: + print('Interrupted') +finally: + print('Closing sockets') + for s in server.socks: + s.close() diff --git a/resilient/client_id.py b/resilient/client_id.py new file mode 100644 index 0000000..a068e77 --- /dev/null +++ b/resilient/client_id.py @@ -0,0 +1,4 @@ +MY_ID = '2\n' +#_SERVER = '192.168.0.35' # Laptop +SERVER = '192.168.0.33' # Pi +PORT = 8123 diff --git a/resilient/client_w.py b/resilient/client_w.py new file mode 100644 index 0000000..9819a3d --- /dev/null +++ b/resilient/client_w.py @@ -0,0 +1,151 @@ +# client_w.py Demo of a resilient asynchronous full-duplex ESP8266 client + +# Released under the MIT licence. +# Copyright (C) Peter Hinch 2018 + +import usocket as socket +import uasyncio as asyncio +import ujson +import network +import utime +from machine import Pin +import primitives as asyn # Stripped-down asyn.py +# Get local config. ID is string of form '1\n' +from client_id import MY_ID, PORT, SERVER + + +class Client(): + def __init__(self, timeout, loop): + self.timeout = timeout + self.led = Pin(2, Pin.OUT, value = 1) + self._sta_if = network.WLAN(network.STA_IF) + self._sta_if.active(True) + self.server = socket.getaddrinfo(SERVER, PORT)[0][-1] # server read + self.evfail = asyn.Event(100) + self.lock = asyn.Lock(100) # 100ms pause + self.connects = 0 # Connect count + self.sock = None + loop.create_task(self._run(loop)) + + # Make an attempt to connect to WiFi. May not succeed. + async def _connect(self, s): + print('Connecting to WiFi') + s.active(True) + s.connect() # ESP8266 remembers connection. + # Break out on fail or success. + while s.status() == network.STAT_CONNECTING: + await asyncio.sleep(1) + t = utime.ticks_ms() + print('Checking WiFi stability for {}ms'.format(2 * self.timeout)) + # Timeout ensures stable WiFi and forces minimum outage duration + while s.isconnected() and utime.ticks_diff(utime.ticks_ms(), t) < 2 * self.timeout: + await asyncio.sleep(1) + + async def _run(self, loop): + s = self._sta_if + while True: + while not s.isconnected(): # Try until stable for 2*server timeout + await self._connect(s) + print('WiFi OK') + self.sock = socket.socket() + try: + self.sock.connect(self.server) + self.sock.setblocking(False) + await self.send(self.sock, MY_ID) # Can throw OSError + except OSError: + pass + else: + self.evfail.clear() + loop.create_task(asyn.Cancellable(self.reader)()) + loop.create_task(asyn.Cancellable(self.writer, loop)()) + loop.create_task(asyn.Cancellable(self._keepalive)()) + await self.evfail # Pause until something goes wrong + await asyn.Cancellable.cancel_all() + self.close() # Close sockets + print('Fail detected. Coros stopped, disconnecting.') + s.disconnect() + await asyncio.sleep(1) + while s.isconnected(): + await asyncio.sleep(1) + + @asyn.cancellable + async def reader(self): + c = self.connects # Count and transmit successful connects + try: + while True: + r = await self.readline() # OSError on fail + if c == self.connects: # If read succeeded + self.connects += 1 # update connect count + d = ujson.loads(r) + print('Got data', d) + except OSError: + self.evfail.set() + + @asyn.cancellable + async def writer(self, loop): + data = [0, 0] + try: + while True: + data[0] = self.connects # Send connection count + async with self.lock: + await self.send(self.sock, '{}\n'.format(ujson.dumps(data))) + print('Sent data', data) + data[1] += 1 # Packet counter + await asyncio.sleep(5) + except OSError: + self.evfail.set() + + @asyn.cancellable + async def _keepalive(self): + tim = self.timeout * 2 // 3 # Ensure >= 1 keepalives in server t/o + try: + while True: + await asyncio.sleep_ms(tim) + async with self.lock: + await self.send(self.sock, '\n') + except OSError: + self.evfail.set() + + # Read a line from nonblocking socket: reads can return partial data which + # are joined into a line. Blank lines are keepalive packets which reset + # the timeout: readline() pauses until a complete line has been received. + async def readline(self): + line = b'' + start = utime.ticks_ms() + while True: + if line.endswith(b'\n'): + if len(line) > 1: + return line + line = b'' + start = utime.ticks_ms() # Blank line is keepalive + self.led(not self.led()) + await asyncio.sleep_ms(100) # nonzero wait seems empirically necessary + d = self.sock.readline() + if d == b'': + raise OSError + if d is not None: + line = b''.join((line, d)) + if utime.ticks_diff(utime.ticks_ms(), start) > self.timeout: + raise OSError + + async def send(self, s, d): # Write a line to either socket. + start = utime.ticks_ms() + while len(d): + ns = s.send(d) # OSError if client fails + d = d[ns:] # Possible partial write + await asyncio.sleep_ms(100) + if utime.ticks_diff(utime.ticks_ms(), start) > self.timeout: + raise OSError + + def close(self): + print('Closing sockets.') + if isinstance(self.sock, socket.socket): + self.sock.close() + + +loop = asyncio.get_event_loop() +client = Client(1500, loop) # Server timeout set by server side app: 1.5s +try: + loop.run_forever() +finally: + client.close() # Close sockets in case of ctrl-C or bug diff --git a/resilient/primitives.py b/resilient/primitives.py new file mode 100644 index 0000000..0ac2b2e --- /dev/null +++ b/resilient/primitives.py @@ -0,0 +1,228 @@ +# primitives.py A stripped-down verion of asyn.py with Lock and Event only. +# Save RAM on ESP8266 + +# Released under the MIT licence. +# Copyright (C) Peter Hinch 2018 + +import uasyncio as asyncio + +class Lock(): + def __init__(self, delay_ms=0): + self._locked = False + self.delay_ms = delay_ms + + def locked(self): + return self._locked + + async def __aenter__(self): + await self.acquire() + return self + + async def __aexit__(self, *args): + self.release() + await asyncio.sleep(0) + + async def acquire(self): + while True: + if self._locked: + await asyncio.sleep_ms(self.delay_ms) + else: + self._locked = True + break + + def release(self): + if not self._locked: + raise RuntimeError('Attempt to release a lock which has not been set') + self._locked = False + + +class Event(): + def __init__(self, delay_ms=0): + self.delay_ms = delay_ms + self.clear() + + def clear(self): + self._flag = False + self._data = None + + def __await__(self): + while not self._flag: + await asyncio.sleep_ms(self.delay_ms) + + __iter__ = __await__ + + def is_set(self): + return self._flag + + def set(self, data=None): + self._flag = True + self._data = data + + def value(self): + return self._data + + +class Barrier(): + def __init__(self, participants, func=None, args=()): + self._participants = participants + self._func = func + self._args = args + self._reset(True) + + def __await__(self): + self._update() + if self._at_limit(): # All other threads are also at limit + if self._func is not None: + launch(self._func, self._args) + self._reset(not self._down) # Toggle direction to release others + return + + direction = self._down + while True: # Wait until last waiting thread changes the direction + if direction != self._down: + return + yield + + __iter__ = __await__ + + def trigger(self): + self._update() + if self._at_limit(): # All other threads are also at limit + if self._func is not None: + launch(self._func, self._args) + self._reset(not self._down) # Toggle direction to release others + + def _reset(self, down): + self._down = down + self._count = self._participants if down else 0 + + def busy(self): + if self._down: + done = self._count == self._participants + else: + done = self._count == 0 + return not done + + def _at_limit(self): # Has count reached up or down limit? + limit = 0 if self._down else self._participants + return self._count == limit + + def _update(self): + self._count += -1 if self._down else 1 + if self._count < 0 or self._count > self._participants: + raise ValueError('Too many tasks accessing Barrier') + +# Task Cancellation +try: + StopTask = asyncio.CancelledError # More descriptive name +except AttributeError: + raise OSError('asyn.py requires uasyncio V1.7.1 or above.') + +class TaskId(): + def __init__(self, taskid): + self.taskid = taskid + + def __call__(self): + return self.taskid + +# Sleep coro breaks up a sleep into shorter intervals to ensure a rapid +# response to StopTask exceptions +async def sleep(t, granularity=100): # 100ms default + if granularity <= 0: + raise ValueError('sleep granularity must be > 0') + t = int(t * 1000) # ms + if t <= granularity: + await asyncio.sleep_ms(t) + else: + n, rem = divmod(t, granularity) + for _ in range(n): + await asyncio.sleep_ms(granularity) + await asyncio.sleep_ms(rem) + + +class Cancellable(): + task_no = 0 # Generated task ID, index of tasks dict + tasks = {} # Value is [coro, group, barrier] indexed by integer task_no + + @classmethod + def _cancel(cls, task_no): + task = cls.tasks[task_no][0] + asyncio.cancel(task) + + @classmethod + async def cancel_all(cls, group=0, nowait=False): + tokill = cls._get_task_nos(group) + barrier = Barrier(len(tokill) + 1) # Include this task + for task_no in tokill: + cls.tasks[task_no][2] = barrier + cls._cancel(task_no) + if nowait: + barrier.trigger() + else: + await barrier + + @classmethod + def _is_running(cls, group=0): + tasks = cls._get_task_nos(group) + if tasks == []: + return False + for task_no in tasks: + barrier = cls.tasks[task_no][2] + if barrier is None: # Running, not yet cancelled + return True + if barrier.busy(): + return True + return False + + @classmethod + def _get_task_nos(cls, group): # Return task nos in a group + return [task_no for task_no in cls.tasks if cls.tasks[task_no][1] == group] + + @classmethod + def _get_group(cls, task_no): # Return group given a task_no + return cls.tasks[task_no][1] + + @classmethod + def _stopped(cls, task_no): + if task_no in cls.tasks: + barrier = cls.tasks[task_no][2] + if barrier is not None: # Cancellation in progress + barrier.trigger() + del cls.tasks[task_no] + + def __init__(self, gf, *args, group=0, **kwargs): + task = gf(TaskId(Cancellable.task_no), *args, **kwargs) + if task in self.tasks: + raise ValueError('Task already exists.') + self.tasks[Cancellable.task_no] = [task, group, None] + self.task_no = Cancellable.task_no # For subclass + Cancellable.task_no += 1 + self.task = task + + def __call__(self): + return self.task + + def __await__(self): # Return any value returned by task. + return (yield from self.task) + + __iter__ = __await__ + + +# @cancellable decorator + +def cancellable(f): + def new_gen(*args, **kwargs): + if isinstance(args[0], TaskId): # Not a bound method + task_id = args[0] + g = f(*args[1:], **kwargs) + else: # Task ID is args[1] if a bound method + task_id = args[1] + args = (args[0],) + args[2:] + g = f(*args, **kwargs) + try: + res = await g + return res + finally: + NamedTask._stopped(task_id) + return new_gen + diff --git a/resilient/server.py b/resilient/server.py new file mode 100644 index 0000000..46c6263 --- /dev/null +++ b/resilient/server.py @@ -0,0 +1,155 @@ +# server.py Minimal server. + +# Released under the MIT licence. +# Copyright (C) Peter Hinch 2018 + +# Maintains bidirectional full-duplex links between server applications and +# multiple WiFi connected clients. Each application instance connects to its +# designated client. Connections areresilient and recover from outages of WiFi +# and of the connected endpoint. +# This server and the server applications are assumed to reside on a device +# with a wired interface. + +# Run under MicroPython Unix build. + +import usocket as socket +import uasyncio as asyncio +import utime +import primitives as asyn +from client_id import PORT + +# Global list of open sockets. Enables application to close any open sockets in +# the event of error. +socks = [] + +# Read a line from a nonblocking socket. Nonblocking reads and writes can +# return partial data. +# Timeout: client is deemed dead if this period elapses without receiving data. +# This seems to be the only way to detect a WiFi failure, where the client does +# not get the chance explicitly to close the sockets. +# Note: on WiFi connected devices sleep_ms(0) produced unreliable results. +async def readline(s, timeout): + line = b'' + start = utime.ticks_ms() + while True: + if line.endswith(b'\n'): + if len(line) > 1: + return line + line = b'' + start = utime.ticks_ms() # A blank line is just a keepalive + await asyncio.sleep_ms(100) # See note above + d = s.readline() + if d == b'': + raise OSError + if d is not None: + line = b''.join((line, d)) + if utime.ticks_diff(utime.ticks_ms(), start) > timeout: + raise OSError + +async def send(s, d, timeout): + start = utime.ticks_ms() + while len(d): + ns = s.send(d) # OSError if client fails + d = d[ns:] + await asyncio.sleep_ms(100) # See note above + if utime.ticks_diff(utime.ticks_ms(), start) > timeout: + raise OSError + +# Return the connection for a client if it is connected (else None) +def client_conn(client_id): + try: + c = Connection.conns[client_id] + except KeyError: + return + if c.ok(): + return c + +# API: application calls server.run() +# Not using uasyncio.start_server because of https://github.com/micropython/micropython/issues/4290 +async def run(timeout, nconns=10, verbose=False): + addr = socket.getaddrinfo('0.0.0.0', PORT, 0, socket.SOCK_STREAM)[0][-1] + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + socks.append(s) + s.bind(addr) + s.listen(nconns) + verbose and print('Awaiting connection.') + while True: + yield asyncio.IORead(s) # Register socket for polling + conn, addr = s.accept() + conn.setblocking(False) + try: + idstr = await readline(conn, timeout) + verbose and print('Got connection from client', idstr) + socks.append(conn) + Connection.go(int(idstr), timeout, verbose, conn) + except OSError: + if conn is not None: + conn.close() + +# A Connection persists even if client dies (minimise object creation). +# If client dies Connection is closed: .close() flags this state by closing its +# socket and setting .conn to None (.ok() == False). +class Connection(): + conns = {} # index: client_id. value: Connection instance + @classmethod + def go(cls, client_id, timeout, verbose, conn): + if client_id not in cls.conns: # New client: instantiate Connection + Connection(client_id, timeout, verbose) + cls.conns[client_id].conn = conn + + def __init__(self, client_id, timeout, verbose): + self.client_id = client_id + self.timeout = timeout + self.verbose = verbose + Connection.conns[client_id] = self + # Startup timeout: cancel startup if both sockets not created in time + self.lock = asyn.Lock(100) + self.conn = None # Socket + loop = asyncio.get_event_loop() + loop.create_task(self._keepalive()) + + def ok(self): + return self.conn is not None + + async def _keepalive(self): + to = self.timeout * 2 // 3 + while True: + await self.write('\n') + await asyncio.sleep_ms(to) + + async def readline(self): + while True: + if self.verbose and not self.ok(): + print('Reader Client:', self.client_id, 'awaiting OK status') + while not self.ok(): + await asyncio.sleep_ms(100) + self.verbose and print('Reader Client:', self.client_id, 'OK') + try: + line = await readline(self.conn, self.timeout) + return line + except (OSError, AttributeError): # AttributeError if ok status lost while waiting for lock + self.verbose and print('Read client disconnected: closing connection.') + self.close() + + async def write(self, buf): + while True: + if self.verbose and not self.ok(): + print('Writer Client:', self.client_id, 'awaiting OK status') + while not self.ok(): + await asyncio.sleep_ms(100) + self.verbose and print('Writer Client:', self.client_id, 'OK') + try: + async with self.lock: # >1 writing task? + await send(self.conn, buf, self.timeout) # OSError on fail + return + except (OSError, AttributeError): + self.verbose and print('Write client disconnected: closing connection.') + self.close() + + def close(self): + if self.conn is not None: + if self.conn in socks: + socks.remove(self.conn) + self.conn.close() + self.conn = None