micropython-samples/uasyncio_iostream/v3/primitives/queue.py

67 wiersze
2.0 KiB
Python

# queue.py: adapted from uasyncio V2
# Code is based on Paul Sokolovsky's work.
# This is a temporary solution until uasyncio V3 gets an efficient official version
import uasyncio as asyncio
# Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
# Exception raised by put_nowait().
class QueueFull(Exception):
pass
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = []
def _get(self):
return self._queue.pop(0)
async def get(self): # Usage: item = await queue.get()
while self.empty():
# Queue is empty, put the calling Task on the waiting queue
await asyncio.sleep_ms(0)
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if self.empty():
raise QueueEmpty()
return self._get()
def _put(self, val):
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
while self.qsize() >= self.maxsize and self.maxsize:
# Queue full
await asyncio.sleep_ms(0)
# Task(s) waiting to get from queue, schedule first Task
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.qsize() >= self.maxsize and self.maxsize:
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return len(self._queue) == 0
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default),
# then full() is never True.
if self.maxsize <= 0:
return False
else:
return self.qsize() >= self.maxsize