Add iotest5: unbuffered read and write.

pull/7/head
Peter Hinch 2018-06-10 12:15:57 +01:00
rodzic a439064774
commit 2693472af6
4 zmienionych plików z 143 dodań i 4 usunięć

Wyświetl plik

@ -0,0 +1,18 @@
# Tests for uasyncio iostream read/write changes
These tests perform concurrent input and output and use timers to
emulate read/write hardware.
iotest1.py Device can perform unbuffered writes only.
iotest2.py Device performs buffered writes and unbuffered reads.
iotest4.py Run test(False) for unbuffered writes and buffered reads.
iotest5.py Unbuffered read and write.
Obsolete test:
iotest3.py Demonstrated workround for failing concurrent I/O using separate
input and output objects.
Other tests:
iotest.py Measure timing of I/O scheduling with a scope.
auart.py Run a loopback test on a physical UART.
auart_hd.py Simulate a pair of devices running a half-duplex protocol over a
pair of UARTs.

Wyświetl plik

@ -1,5 +1,5 @@
# iomiss.py Test for missed reads. The bug was fixed by disabling interrupts in
# ioctl().
# iotest.py Test PR #3836 timing using GPIO pins.
import io, pyb
import uasyncio as asyncio
import micropython
@ -8,6 +8,8 @@ micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL = const(3)
MP_STREAM_POLL_RD = const(1)
y1 = pyb.Pin('Y1', pyb.Pin.OUT)
class MyIO(io.IOBase):
def __init__(self):
self.ready = False
@ -15,7 +17,7 @@ class MyIO(io.IOBase):
tim = pyb.Timer(4)
tim.init(freq=1)
tim.callback(self.setready)
def ioctl(self, req, arg):
if req == MP_STREAM_POLL and (arg & MP_STREAM_POLL_RD):
state = pyb.disable_irq()
@ -26,14 +28,25 @@ class MyIO(io.IOBase):
return 0
def readline(self):
y1.value(0)
return '{}\n'.format(self.count)
def setready(self, t):
self.count += 1
y1.value(1)
self.ready = True
myio = MyIO()
async def foo(p):
print('start foo', p)
pin = pyb.Pin(p, pyb.Pin.OUT)
while True:
pin.value(1)
await asyncio.sleep(0)
pin.value(0)
await asyncio.sleep(0)
async def receiver():
last = None
nmissed = 0
@ -50,5 +63,7 @@ async def receiver():
loop = asyncio.get_event_loop()
loop.create_task(receiver())
loop.create_task(foo('Y2'))
loop.create_task(foo('Y3'))
loop.run_forever()

Wyświetl plik

@ -1,5 +1,5 @@
# iotest2.py Test PR #3836. User class write() performs buffered writing.
# This works as expected.
# Reading is unbuffered.
import io, pyb
import uasyncio as asyncio

Wyświetl plik

@ -0,0 +1,106 @@
# iotest5.py Test PR #3836.
# User class write() performs unbuffered writing.
# Read is also unbuffered.
# This test was to demonstrate the original issue.
# With modified moduselect.c and uasyncio.__init__.py the test now passes.
# iotest4.test() uses separate read and write objects.
# iotest4.test(False) uses a common object (failed without the mod).
import io, pyb
import uasyncio as asyncio
import micropython
micropython.alloc_emergency_exception_buf(100)
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL_WR = const(4)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
def printbuf(this_io):
for ch in this_io.wbuf[:this_io.wprint_len]:
print(chr(ch), end='')
class MyIO(io.IOBase):
def __init__(self, read=False, write=False):
self.ready_rd = False # Read and write not ready
self.rbuf = b'ready\n' # Read buffer
self.ridx = 0
pyb.Timer(4, freq = 5, callback = self.do_input)
self.wch = b''
self.wbuf = bytearray(100) # Write buffer
self.wprint_len = 0
self.widx = 0
pyb.Timer(5, freq = 10, callback = self.do_output)
# Read callback: emulate asynchronous input from hardware.
# Typically would put bytes into a ring buffer and set .ready_rd.
def do_input(self, t):
self.ready_rd = True # Data is ready to read
# Write timer callback. Emulate hardware: if there's data in the buffer
# write some or all of it
def do_output(self, t):
if self.wch:
self.wbuf[self.widx] = self.wch
self.widx += 1
if self.wch == ord('\n'):
self.wprint_len = self.widx # Save for schedule
micropython.schedule(printbuf, self)
self.widx = 0
self.wch = b''
def ioctl(self, req, arg): # see ports/stm32/uart.c
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self.ready_rd:
ret |= MP_STREAM_POLL_RD
if arg & MP_STREAM_POLL_WR:
if not self.wch:
ret |= MP_STREAM_POLL_WR # Ready if no char pending
return ret
# Test of device that produces one character at a time
def readline(self):
self.ready_rd = False # Cleared by timer cb do_input
ch = self.rbuf[self.ridx]
if ch == ord('\n'):
self.ridx = 0
else:
self.ridx += 1
return chr(ch)
# Emulate unbuffered hardware which writes one character: uasyncio waits
# until hardware is ready for the next. Hardware ready is emulated by write
# timer callback.
def write(self, buf, off, sz):
self.wch = buf[off] # Hardware starts to write a char
return 1 # 1 byte written. uasyncio waits on ioctl write ready
async def receiver(myior):
sreader = asyncio.StreamReader(myior)
while True:
res = await sreader.readline()
print('Received', res)
async def sender(myiow):
swriter = asyncio.StreamWriter(myiow, {})
await asyncio.sleep(5)
count = 0
while True:
count += 1
tosend = 'Wrote Hello MyIO {}\n'.format(count)
await swriter.awrite(tosend.encode('UTF8'))
await asyncio.sleep(2)
myior = MyIO()
myiow = myior
loop = asyncio.get_event_loop()
loop.create_task(receiver(myior))
loop.create_task(sender(myiow))
loop.run_forever()