resilient directory added.

pull/7/head
Peter Hinch 2018-11-30 10:34:35 +00:00
rodzic 11d6db9d68
commit b43be456c9
7 zmienionych plików z 896 dodań i 0 usunięć

Wyświetl plik

@ -56,6 +56,11 @@ After 100 messages reports maximum and minimum delays.
`conn.py` Connect in station mode using saved connection details where possible.
# resilient
A guide to writing reliable ESP8266 networking code. Probably applies to other
WiFi connected MicroPython devices, but reliable ones are thin on the ground.
# Rotary Incremental Encoder
Classes for handling incremental rotary position encoders. Note that the Pyboard

289
resilient/README.md 100644
Wyświetl plik

@ -0,0 +1,289 @@
# Resilient MicroPython WiFi code
The following is based on experience with the ESP8266. It aims to show how
to design responsive bidirectional networking applications which are resilent:
they recover from WiFi and server outages and are capable of long term running
without crashing.
It is possible to write resilient code for ESP8266, but little existing code
takes account of the properties of wireless links and the limitations of the
hardware. On bare metal, in the absence of an OS, it is necessary to detect
outages and initiate recovery to ensure that consistent program state is
maintained and to avoid crashes and `LmacRxBlk` errors.
Radio links are inherently unreliable. They can be disrupted by sporadic RF
interference, especially near the limits of range. A mobile device such as a
robot can move slowly out of range and then back in again. The access point
(AP) can suffer an outage as can the application code at the other end of the
link. An application intended for long term running on a WiFi connected device
should be able to recover from such events. Brief outages are common. In a
house whose WiFi is reliable as experienced on normal devices, outages occur
at a rate of around 20 per day.
The brute-force approach of a hardware watchdog timer has merit for recovering
from crashes but the use of a hard reset implies the loss of program state. A
hardware or software watchdog does not remove the need to perform continuous
monitoring of connectivity. In the event of an outage code may continue to run
feeding the watchdog; when the outage ends the ESP8266 will reconnect but the
application will be in an arbitrary state. Further, sockets may be left open
leading to `LmacRxBlk` errors and crashes.
# 1. Abstract
Many applications keep sockets open for long periods during which connectivity
may temporarily be lost. The socket may raise an exception but this is not
guaranteed: in cases of WiFi outage, loss of connectivity cannot be determined
from the socket state.
Detecting an outage is vital to ensure sockets are closed and to enable code at
both endpoints to initiate recovery; also to avoid crashes caused by writing
to a socket whose counterpart is unavailable.
It seems that the only sure way to detect an outage is for each endpoint
regularly to send data, and for the receiving endpoint to implement a read
timeout.
Failure correctly to detect and recover from WiFi disruption is a major cause
of unreliability in ESP8266 applications.
A demo is provided of a system where multiple ESP8266 clients communicate with
a wired server with low latency full duplex links. This has run for extended
periods with mutiple clients without issue. The demo is intended to illustrate
the minimum requirements for a resilient system.
# 2. Hardware
There are numerous poor quality ESP8266 boards. There can also be issues caused
by inadequate power supplies. I have found the following to be bomb-proof:
1. [Adafruit Feather Huzzah](https://www.adafruit.com/product/2821)
2. [Adafruit Huzzah](https://www.adafruit.com/product/2471)
3. [WeMos D1 Mini](https://wiki.wemos.cc/products:d1:d1_mini) My testing was
on an earlier version with the metal cased ESP8266.
# 3. Introduction
I became aware of the issue when running the official umqtt clients on an
ESP8266. Despite being one room away from the AP the connection seldom stayed
up for more than an hour or two. This in a house where WiFi as percieved by
PC's and other devices is rock-solid. Subsequent tests using the code in this
repo have demonstrated that brief outages are frequent.
I developed a [resilient MQTT driver](https://github.com/peterhinch/micropython-mqtt.git)
which is capable of recovering from WiFi outages. This is rather complex, in
part because of the requirements of MQTT.
The demo code in this repo aims to establish the minimum requirements for a
resilient bidirectional link between an application on a wired server and a
client on an ESP8266. If a loss of connectivity occurs for any reason,
communication pauses for the duration, resuming when the link is restored.
# 4. Application design
The two problems which must be solved are detection of an outage and ensuring
that, when the outage ends, both endpoint applications can resume without loss
of program state.
While an ESP8266 can detect a local loss of WiFi connectivity detection of link
deterioration or of failure of the remote endpoint is more difficult.
To enable a WiFi device to cope with outages there are three approaches of
increasing sophistication.
1. Brief connection: the device code runs an infinite loop. It periodically
waits for WiFi availability, connects to the remote, does its job and
disconnects. The hope is that WiFi failure during the brief period of
connection is unlikely. Program state is maintained. Advantage: outage
detection is avoided. Drawbacks: unlikely is not impossible. The device cannot
respond quickly to data from the remote
2. Hard reset: this implies detecting in code an outage of WiFi or of the
remote and triggering a hard reset. This implies a loss of program state.
3. Resilient connection. This is the approach discussed here, where an outage
is detected. The code on each endpoint recovers when connectivity resumes.
Program state after recovery is consistent.
In the first two options the remote endpoint loops: it waits for a connection,
acquires the data, then closes the connection.
## 4.1 Outage detection
At low level communication is via sockets linking two endpoints. In the case
under discussion the endpoints are on physically separate hardware, at least
one device being physically connected by WiFi. Each endpoint has a socket
instance with both sharing a port. If one endpoint closes its socket, the other
gets an exception which should be handled appropriately - especially by closing
its socket.
Based on experience with the ESP8266, WiFi failures seldom cause exceptions to
be thrown. Consider a nonblocking socket performing reads from a device. In an
outage the socket will behave in the same way as during periods when it waits
for data to arrive. During an outage, writes to a nonblocking socket will
proceed normally until the ESP8266 buffers fill, provoking the dreaded
`LmacRxBlk:1` messages.
The `isconnected()` method is inadequate for detecting outages as it is a
property of the interface rather than the link. If two WiFi devices are
communicating, one may lose `isconnected()` owing to local radio conditions. If
the other end tried to assess connectivity with `isconnected()` it would
incorrectly conclude that there was no problem. Further, the method is unable
to detect outages caused by program failure on the remote endpoint.
The only reliable way to detect loss of connectivity appears to be by means of
timeouts, in particuar on socket reads. To keep a link open a minimum interval
between data writes must be enforced. The endpoint performing the read times
the interval between successful reads: if this exceeds a threshold the link is
presumed to have died and a recovery process initiated.
This implies that WiFi applications which only send data cannot reliably deal
with outages: to create a resilient link both ends need to wait on a read while
checking for a timeout. A device whose network connection is via WiFi can
sometimes get early notification of an outage with `isconnected()` but this is
only an adjunct to the read timeout.
When a wireless device detects an outage it should ensure that the other end of
the link also detects it so that sockets may be closed and connectivity may be
restored when the WiFi recovers. This means that it avoid sending data for a
period greater than the timeout period.
A further requirement for ESP8266 is to limit the amount of data put into a
socket while the remote endpoint is down: excessive data quantities can provoke
`LmacRxBlk` errors. I have not quantified this, but in general if N packets are
sent in each timeout interval there will be a maximum pemissible size for a
packet. The timeout interval will therefore be constrained by the maximum
throughput required.
## 4.2 Timeout value
The demo uses timeouts measured in seconds, enabling prompt recovery from
outages. The assumption is that all devices share a local network. If the
server is on the internet longer timeouts will be required.
To preserve reliability the amount of data sent during the timeout period must
be controlled. If connectivity is lost immediately after a keepalive, the loss
will be undetected until the timeout has elapsed. Any data sent during that
period will be buffered by the ESP8266 vendor code. Too much will lead to
`LmacRxBlk` and probable crashes. What constitutes "excessive" is moot:
experimentation is required.
## 4.3 Recovery
The demo system employs the following procedure for recovering from outages.
The wirelessly connected client behaves as follows.
All coroutines accessing the interface are cancelled, and all open sockets are
closed: this is essential to avoid `LmacRxBlk:1` messages and crashes. The WiFi
connection is downed.
The client then periodically attempts to reconnect to WiFi. On success it
checks that local WiFi connectivity remains good for a period of double the
timeout. During this period no attempt is made to send or receive data. This
ensures that the remote device will also detect an outage and close its
sockets. The procedure also establishes confidence that the WiFi as seen by the
client is stable.
At the end of this period the client attempts to re-establish the connection,
repeating the recovery procedure on failure. The server responds to the loss of
connectivity by closing the connection and the sockets. It responds to the
reconnection as per a new connection.
# 5. Demo system
This demo is of a minimal system based on nonblocking sockets. It is responsive
in that each endpoint can respond immediately to a packet from its counterpart.
WiFi connected clients can run indefinitely near the limit of wireless range;
they automatically recover from outages of the WiFi and of the remote endpoint.
The application scenario is of multiple wirelessly connected clients, each
communicating with its own application object running on a wired server.
Communication is asynchronous and full-duplex (i.e. communication is
bidirectional and can be initiated asynchronously by either end of the link).
A data packet is a '\n' terminated line of text. Blank lines are reserved for
keepalive packets. The demo application uses JSON to serialise and exchange
arbitrary Python objects.
The demo comprises the following files:
1. `server.py` A server for MicroPython Unix build on a wired network
connection.
2. `application.py` Server-side application demo.
3. `client_w.py` A client for ESP8266.
4. `client_id.py` Each client must have a unique ID provided by this file.
Also holds server IP and port number.
5. `primitives.py` A stripped down version of
[asyn.py](https://github.com/peterhinch/micropython-async/blob/master/asyn.py)
This is used by server and client. The aim is RAM saving on ESP8266.
## 5.1 The client
The principal purpose of the demo is to expose the client code. A more usable
version could be written where the boilerplate code was separated from the
application code, and I will do this. This version deliberately lays bare its
workings for study.
It is started by instantiating a `Client` object. The constructor assumes
that the ESP8266 will auto-connect to an existing network. It starts a `run()`
coroutine which executes an infinite loop, initially waiting for a WiFi
connection. It then launches `reader` and `writer` coroutines. The `writer`
coro periodically sends a JSON encoded list, and the remote endpoint does
likewise.
The client's `readline()` function times out after 1.5 seconds, issuing an
`OSError`. If this occurs, the `reader` coro terminates clearing the `.rok`
(reader OK) flag. This causes the `run()` to terminate the `writer`
(and `_keepalive`) coros. When all coros have died, `run()` downs the WiFi for
double the timeout period to ensure that the remote will detect and respond to
the outage. The loop then repeats, attempting again to establish a connection.
The `writer` coro has similar logic ensuring that if it encounters an error the
other coros will be terminated.
Both `reader` and `writer` start by instantiating a socket and connecting to
the appropriate port. The socket is set to nonblocking and the unique client ID
(retrieved from `client_id.py`) is sent to the server. This enables the server
to associate a connection with a specific client.
When `writer` has connected to the server it starts the `_keepalive` method:
this sends a blank line at a rate guaranteed to ensure that at least one will
be sent every timeout interval.
The server also sends a blank line priodically. This serves to reset the
timeout on the `readline()` method the client, preventing a timeout from
occuring. Thus outage detection is effectively transparent: client and server
applications can send data at any rate.
## 5.2 The application
In this demo the application is assumed to reside on the server machine. This
enables a substantial simplification with the timeout, keepalive and error
handling being devolved to the server.
In this demo upto 4 clients with ID's 1-4 are each served by an instance of
`App`. However they could equally be served by different application classes.
When an `App` is instantiated the `start()` coro runs which waits for the
server to establish a connection with the correct client. It retrieves the
connection, starts `reader` and `writer` coros, and quits.
The `reader` and `writer` coros need take no account of link status. They
communicate using server `readline()` and `write()` methods which will pause
for the duration of any outage.
## 5.3 The server
The application starts the server by launching the `server.run` coro. The
argument defines the read timeout value which should be the same as that on the
client. The value (in ms) determines the keepalive rate and the minimum
downtime of an outage.
The `server.run` coro runs forever awaiting incoming connections. When one
occurs a socket is created and a line containing the client ID is read. If the
client ID is not in the dictionary of clients (`Connection.connections`) a
`Connection` is instantiated for that client and placed in the dictionary. On
subsequent connections of that client, the `Connection` will be retrieved from
the dictionary. This is done by classmethod `Connection.go`, which assigns the
socket to that `Connection` instance.
The server provides `readline` and `write` methods. In the event of an outage
they will pause for the duration. Message transmission is not guaranteed: if an
outage occurs after tansmission has commenced, the message will be lost.
In testing through hundreds of outages, no instances of corrupted or partial
messages occurred. Presumably TCP/IP ensures this.

Wyświetl plik

@ -0,0 +1,64 @@
# application.py
# Released under the MIT licence.
# Copyright (C) Peter Hinch 2018
# The App class emulates a user application intended to service a single
# client. In this case we have four instances of the application servicing
# clients with ID's 1-4.
import uasyncio as asyncio
loop = asyncio.get_event_loop(runq_len=32, waitq_len=32)
import ujson
import server
class App():
def __init__(self, client_id):
self.client_id = client_id
self.data = [0, 0] # Exchange a 2-list with remote
loop = asyncio.get_event_loop()
loop.create_task(self.start(loop))
async def start(self, loop):
print('Client {} Awaiting connection.'.format(self.client_id))
conn = None
while conn is None:
await asyncio.sleep_ms(100)
conn = server.client_conn(self.client_id)
loop.create_task(self.reader(conn))
loop.create_task(self.writer(conn))
async def reader(self, conn):
print('Started reader')
while True:
# Attempt to read data: server times out if none arrives in timeout
# period closing the Connection. .readline() pauses until the
# connection is re-established.
line = await conn.readline()
self.data = ujson.loads(line)
# Receives [restart count, uptime in secs]
print('Got', self.data, 'from remote', self.client_id)
# Send [approx application uptime in secs, received client uptime]
async def writer(self, conn):
print('Started writer')
count = 0
while True:
self.data[0] = count
count += 1
print('Sent', self.data, 'to remote', self.client_id)
print()
# .write() behaves as per .readline()
await conn.write('{}\n'.format(ujson.dumps(self.data)))
await asyncio.sleep(5)
clients = [App(n) for n in range(1, 5)] # Accept 4 clients with ID's 1-4
try:
loop.run_until_complete(server.run(timeout=1500))
except KeyboardInterrupt:
print('Interrupted')
finally:
print('Closing sockets')
for s in server.socks:
s.close()

Wyświetl plik

@ -0,0 +1,4 @@
MY_ID = '2\n'
#_SERVER = '192.168.0.35' # Laptop
SERVER = '192.168.0.33' # Pi
PORT = 8123

Wyświetl plik

@ -0,0 +1,151 @@
# client_w.py Demo of a resilient asynchronous full-duplex ESP8266 client
# Released under the MIT licence.
# Copyright (C) Peter Hinch 2018
import usocket as socket
import uasyncio as asyncio
import ujson
import network
import utime
from machine import Pin
import primitives as asyn # Stripped-down asyn.py
# Get local config. ID is string of form '1\n'
from client_id import MY_ID, PORT, SERVER
class Client():
def __init__(self, timeout, loop):
self.timeout = timeout
self.led = Pin(2, Pin.OUT, value = 1)
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)
self.server = socket.getaddrinfo(SERVER, PORT)[0][-1] # server read
self.evfail = asyn.Event(100)
self.lock = asyn.Lock(100) # 100ms pause
self.connects = 0 # Connect count
self.sock = None
loop.create_task(self._run(loop))
# Make an attempt to connect to WiFi. May not succeed.
async def _connect(self, s):
print('Connecting to WiFi')
s.active(True)
s.connect() # ESP8266 remembers connection.
# Break out on fail or success.
while s.status() == network.STAT_CONNECTING:
await asyncio.sleep(1)
t = utime.ticks_ms()
print('Checking WiFi stability for {}ms'.format(2 * self.timeout))
# Timeout ensures stable WiFi and forces minimum outage duration
while s.isconnected() and utime.ticks_diff(utime.ticks_ms(), t) < 2 * self.timeout:
await asyncio.sleep(1)
async def _run(self, loop):
s = self._sta_if
while True:
while not s.isconnected(): # Try until stable for 2*server timeout
await self._connect(s)
print('WiFi OK')
self.sock = socket.socket()
try:
self.sock.connect(self.server)
self.sock.setblocking(False)
await self.send(self.sock, MY_ID) # Can throw OSError
except OSError:
pass
else:
self.evfail.clear()
loop.create_task(asyn.Cancellable(self.reader)())
loop.create_task(asyn.Cancellable(self.writer, loop)())
loop.create_task(asyn.Cancellable(self._keepalive)())
await self.evfail # Pause until something goes wrong
await asyn.Cancellable.cancel_all()
self.close() # Close sockets
print('Fail detected. Coros stopped, disconnecting.')
s.disconnect()
await asyncio.sleep(1)
while s.isconnected():
await asyncio.sleep(1)
@asyn.cancellable
async def reader(self):
c = self.connects # Count and transmit successful connects
try:
while True:
r = await self.readline() # OSError on fail
if c == self.connects: # If read succeeded
self.connects += 1 # update connect count
d = ujson.loads(r)
print('Got data', d)
except OSError:
self.evfail.set()
@asyn.cancellable
async def writer(self, loop):
data = [0, 0]
try:
while True:
data[0] = self.connects # Send connection count
async with self.lock:
await self.send(self.sock, '{}\n'.format(ujson.dumps(data)))
print('Sent data', data)
data[1] += 1 # Packet counter
await asyncio.sleep(5)
except OSError:
self.evfail.set()
@asyn.cancellable
async def _keepalive(self):
tim = self.timeout * 2 // 3 # Ensure >= 1 keepalives in server t/o
try:
while True:
await asyncio.sleep_ms(tim)
async with self.lock:
await self.send(self.sock, '\n')
except OSError:
self.evfail.set()
# Read a line from nonblocking socket: reads can return partial data which
# are joined into a line. Blank lines are keepalive packets which reset
# the timeout: readline() pauses until a complete line has been received.
async def readline(self):
line = b''
start = utime.ticks_ms()
while True:
if line.endswith(b'\n'):
if len(line) > 1:
return line
line = b''
start = utime.ticks_ms() # Blank line is keepalive
self.led(not self.led())
await asyncio.sleep_ms(100) # nonzero wait seems empirically necessary
d = self.sock.readline()
if d == b'':
raise OSError
if d is not None:
line = b''.join((line, d))
if utime.ticks_diff(utime.ticks_ms(), start) > self.timeout:
raise OSError
async def send(self, s, d): # Write a line to either socket.
start = utime.ticks_ms()
while len(d):
ns = s.send(d) # OSError if client fails
d = d[ns:] # Possible partial write
await asyncio.sleep_ms(100)
if utime.ticks_diff(utime.ticks_ms(), start) > self.timeout:
raise OSError
def close(self):
print('Closing sockets.')
if isinstance(self.sock, socket.socket):
self.sock.close()
loop = asyncio.get_event_loop()
client = Client(1500, loop) # Server timeout set by server side app: 1.5s
try:
loop.run_forever()
finally:
client.close() # Close sockets in case of ctrl-C or bug

Wyświetl plik

@ -0,0 +1,228 @@
# primitives.py A stripped-down verion of asyn.py with Lock and Event only.
# Save RAM on ESP8266
# Released under the MIT licence.
# Copyright (C) Peter Hinch 2018
import uasyncio as asyncio
class Lock():
def __init__(self, delay_ms=0):
self._locked = False
self.delay_ms = delay_ms
def locked(self):
return self._locked
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
self.release()
await asyncio.sleep(0)
async def acquire(self):
while True:
if self._locked:
await asyncio.sleep_ms(self.delay_ms)
else:
self._locked = True
break
def release(self):
if not self._locked:
raise RuntimeError('Attempt to release a lock which has not been set')
self._locked = False
class Event():
def __init__(self, delay_ms=0):
self.delay_ms = delay_ms
self.clear()
def clear(self):
self._flag = False
self._data = None
def __await__(self):
while not self._flag:
await asyncio.sleep_ms(self.delay_ms)
__iter__ = __await__
def is_set(self):
return self._flag
def set(self, data=None):
self._flag = True
self._data = data
def value(self):
return self._data
class Barrier():
def __init__(self, participants, func=None, args=()):
self._participants = participants
self._func = func
self._args = args
self._reset(True)
def __await__(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
return
direction = self._down
while True: # Wait until last waiting thread changes the direction
if direction != self._down:
return
yield
__iter__ = __await__
def trigger(self):
self._update()
if self._at_limit(): # All other threads are also at limit
if self._func is not None:
launch(self._func, self._args)
self._reset(not self._down) # Toggle direction to release others
def _reset(self, down):
self._down = down
self._count = self._participants if down else 0
def busy(self):
if self._down:
done = self._count == self._participants
else:
done = self._count == 0
return not done
def _at_limit(self): # Has count reached up or down limit?
limit = 0 if self._down else self._participants
return self._count == limit
def _update(self):
self._count += -1 if self._down else 1
if self._count < 0 or self._count > self._participants:
raise ValueError('Too many tasks accessing Barrier')
# Task Cancellation
try:
StopTask = asyncio.CancelledError # More descriptive name
except AttributeError:
raise OSError('asyn.py requires uasyncio V1.7.1 or above.')
class TaskId():
def __init__(self, taskid):
self.taskid = taskid
def __call__(self):
return self.taskid
# Sleep coro breaks up a sleep into shorter intervals to ensure a rapid
# response to StopTask exceptions
async def sleep(t, granularity=100): # 100ms default
if granularity <= 0:
raise ValueError('sleep granularity must be > 0')
t = int(t * 1000) # ms
if t <= granularity:
await asyncio.sleep_ms(t)
else:
n, rem = divmod(t, granularity)
for _ in range(n):
await asyncio.sleep_ms(granularity)
await asyncio.sleep_ms(rem)
class Cancellable():
task_no = 0 # Generated task ID, index of tasks dict
tasks = {} # Value is [coro, group, barrier] indexed by integer task_no
@classmethod
def _cancel(cls, task_no):
task = cls.tasks[task_no][0]
asyncio.cancel(task)
@classmethod
async def cancel_all(cls, group=0, nowait=False):
tokill = cls._get_task_nos(group)
barrier = Barrier(len(tokill) + 1) # Include this task
for task_no in tokill:
cls.tasks[task_no][2] = barrier
cls._cancel(task_no)
if nowait:
barrier.trigger()
else:
await barrier
@classmethod
def _is_running(cls, group=0):
tasks = cls._get_task_nos(group)
if tasks == []:
return False
for task_no in tasks:
barrier = cls.tasks[task_no][2]
if barrier is None: # Running, not yet cancelled
return True
if barrier.busy():
return True
return False
@classmethod
def _get_task_nos(cls, group): # Return task nos in a group
return [task_no for task_no in cls.tasks if cls.tasks[task_no][1] == group]
@classmethod
def _get_group(cls, task_no): # Return group given a task_no
return cls.tasks[task_no][1]
@classmethod
def _stopped(cls, task_no):
if task_no in cls.tasks:
barrier = cls.tasks[task_no][2]
if barrier is not None: # Cancellation in progress
barrier.trigger()
del cls.tasks[task_no]
def __init__(self, gf, *args, group=0, **kwargs):
task = gf(TaskId(Cancellable.task_no), *args, **kwargs)
if task in self.tasks:
raise ValueError('Task already exists.')
self.tasks[Cancellable.task_no] = [task, group, None]
self.task_no = Cancellable.task_no # For subclass
Cancellable.task_no += 1
self.task = task
def __call__(self):
return self.task
def __await__(self): # Return any value returned by task.
return (yield from self.task)
__iter__ = __await__
# @cancellable decorator
def cancellable(f):
def new_gen(*args, **kwargs):
if isinstance(args[0], TaskId): # Not a bound method
task_id = args[0]
g = f(*args[1:], **kwargs)
else: # Task ID is args[1] if a bound method
task_id = args[1]
args = (args[0],) + args[2:]
g = f(*args, **kwargs)
try:
res = await g
return res
finally:
NamedTask._stopped(task_id)
return new_gen

155
resilient/server.py 100644
Wyświetl plik

@ -0,0 +1,155 @@
# server.py Minimal server.
# Released under the MIT licence.
# Copyright (C) Peter Hinch 2018
# Maintains bidirectional full-duplex links between server applications and
# multiple WiFi connected clients. Each application instance connects to its
# designated client. Connections areresilient and recover from outages of WiFi
# and of the connected endpoint.
# This server and the server applications are assumed to reside on a device
# with a wired interface.
# Run under MicroPython Unix build.
import usocket as socket
import uasyncio as asyncio
import utime
import primitives as asyn
from client_id import PORT
# Global list of open sockets. Enables application to close any open sockets in
# the event of error.
socks = []
# Read a line from a nonblocking socket. Nonblocking reads and writes can
# return partial data.
# Timeout: client is deemed dead if this period elapses without receiving data.
# This seems to be the only way to detect a WiFi failure, where the client does
# not get the chance explicitly to close the sockets.
# Note: on WiFi connected devices sleep_ms(0) produced unreliable results.
async def readline(s, timeout):
line = b''
start = utime.ticks_ms()
while True:
if line.endswith(b'\n'):
if len(line) > 1:
return line
line = b''
start = utime.ticks_ms() # A blank line is just a keepalive
await asyncio.sleep_ms(100) # See note above
d = s.readline()
if d == b'':
raise OSError
if d is not None:
line = b''.join((line, d))
if utime.ticks_diff(utime.ticks_ms(), start) > timeout:
raise OSError
async def send(s, d, timeout):
start = utime.ticks_ms()
while len(d):
ns = s.send(d) # OSError if client fails
d = d[ns:]
await asyncio.sleep_ms(100) # See note above
if utime.ticks_diff(utime.ticks_ms(), start) > timeout:
raise OSError
# Return the connection for a client if it is connected (else None)
def client_conn(client_id):
try:
c = Connection.conns[client_id]
except KeyError:
return
if c.ok():
return c
# API: application calls server.run()
# Not using uasyncio.start_server because of https://github.com/micropython/micropython/issues/4290
async def run(timeout, nconns=10, verbose=False):
addr = socket.getaddrinfo('0.0.0.0', PORT, 0, socket.SOCK_STREAM)[0][-1]
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socks.append(s)
s.bind(addr)
s.listen(nconns)
verbose and print('Awaiting connection.')
while True:
yield asyncio.IORead(s) # Register socket for polling
conn, addr = s.accept()
conn.setblocking(False)
try:
idstr = await readline(conn, timeout)
verbose and print('Got connection from client', idstr)
socks.append(conn)
Connection.go(int(idstr), timeout, verbose, conn)
except OSError:
if conn is not None:
conn.close()
# A Connection persists even if client dies (minimise object creation).
# If client dies Connection is closed: .close() flags this state by closing its
# socket and setting .conn to None (.ok() == False).
class Connection():
conns = {} # index: client_id. value: Connection instance
@classmethod
def go(cls, client_id, timeout, verbose, conn):
if client_id not in cls.conns: # New client: instantiate Connection
Connection(client_id, timeout, verbose)
cls.conns[client_id].conn = conn
def __init__(self, client_id, timeout, verbose):
self.client_id = client_id
self.timeout = timeout
self.verbose = verbose
Connection.conns[client_id] = self
# Startup timeout: cancel startup if both sockets not created in time
self.lock = asyn.Lock(100)
self.conn = None # Socket
loop = asyncio.get_event_loop()
loop.create_task(self._keepalive())
def ok(self):
return self.conn is not None
async def _keepalive(self):
to = self.timeout * 2 // 3
while True:
await self.write('\n')
await asyncio.sleep_ms(to)
async def readline(self):
while True:
if self.verbose and not self.ok():
print('Reader Client:', self.client_id, 'awaiting OK status')
while not self.ok():
await asyncio.sleep_ms(100)
self.verbose and print('Reader Client:', self.client_id, 'OK')
try:
line = await readline(self.conn, self.timeout)
return line
except (OSError, AttributeError): # AttributeError if ok status lost while waiting for lock
self.verbose and print('Read client disconnected: closing connection.')
self.close()
async def write(self, buf):
while True:
if self.verbose and not self.ok():
print('Writer Client:', self.client_id, 'awaiting OK status')
while not self.ok():
await asyncio.sleep_ms(100)
self.verbose and print('Writer Client:', self.client_id, 'OK')
try:
async with self.lock: # >1 writing task?
await send(self.conn, buf, self.timeout) # OSError on fail
return
except (OSError, AttributeError):
self.verbose and print('Write client disconnected: closing connection.')
self.close()
def close(self):
if self.conn is not None:
if self.conn in socks:
socks.remove(self.conn)
self.conn.close()
self.conn = None