Jared Hancock 2024-04-04 07:15:45 +08:00 zatwierdzone przez GitHub
commit 0963bb91a9
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
1 zmienionych plików z 39 dodań i 26 usunięć

Wyświetl plik

@ -3,6 +3,7 @@
from time import ticks_ms as ticks, ticks_diff, ticks_add from time import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select import sys, select
from select import POLLIN, POLLOUT
# Import TaskQueue and Task, preferring built-in C code over Python code # Import TaskQueue and Task, preferring built-in C code over Python code
try: try:
@ -54,7 +55,8 @@ class SingletonGenerator:
# Use a SingletonGenerator to do it without allocating on the heap # Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()): def sleep_ms(t, sgen=SingletonGenerator()):
assert sgen.state is None assert sgen.state is None
sgen.state = ticks_add(ticks(), max(0, t)) now = ticks()
sgen.state = ticks_add(now, t) if t > 0 else now
return sgen return sgen
@ -66,6 +68,8 @@ def sleep(t):
################################################################################ ################################################################################
# Queue and poller for stream IO # Queue and poller for stream IO
nPOLLIN = ~POLLIN
nPOLLOUT = ~POLLOUT
class IOQueue: class IOQueue:
def __init__(self): def __init__(self):
@ -77,13 +81,13 @@ class IOQueue:
entry = [None, None, s] entry = [None, None, s]
entry[idx] = cur_task entry[idx] = cur_task
self.map[id(s)] = entry self.map[id(s)] = entry
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) self.poller.register(s, POLLIN if idx == 0 else POLLOUT)
else: else:
sm = self.map[id(s)] sm = self.map[id(s)]
assert sm[idx] is None assert sm[idx] is None
assert sm[1 - idx] is not None assert sm[1 - idx] is not None
sm[idx] = cur_task sm[idx] = cur_task
self.poller.modify(s, select.POLLIN | select.POLLOUT) self.poller.modify(s, POLLIN | POLLOUT)
# Link task to this IOQueue so it can be removed if needed # Link task to this IOQueue so it can be removed if needed
cur_task.data = self cur_task.data = self
@ -113,21 +117,22 @@ class IOQueue:
def wait_io_event(self, dt): def wait_io_event(self, dt):
for s, ev in self.poller.ipoll(dt): for s, ev in self.poller.ipoll(dt):
sm = self.map[id(s)] sm = self.map[id(s)]
q0, q1, _ = sm
# print('poll', s, sm, ev) # print('poll', s, sm, ev)
if ev & ~select.POLLOUT and sm[0] is not None: if ev & nPOLLOUT and q0 is not None:
# POLLIN or error # POLLIN or error
_task_queue.push(sm[0]) _task_queue.push(q0)
sm[0] = None sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None: if ev & nPOLLIN and q1 is not None:
# POLLOUT or error # POLLOUT or error
_task_queue.push(sm[1]) _task_queue.push(q1)
sm[1] = None sm[1] = None
if sm[0] is None and sm[1] is None: if q0 is None and q1 is None:
self._dequeue(s) self._dequeue(s)
elif sm[0] is None: elif q0 is None:
self.poller.modify(s, select.POLLOUT) self.poller.modify(s, POLLOUT)
else: else:
self.poller.modify(s, select.POLLIN) self.poller.modify(s, POLLIN)
################################################################################ ################################################################################
@ -147,27 +152,35 @@ def create_task(coro):
_task_queue.push(t) _task_queue.push(t)
return t return t
# Keep scheduling tasks until there are none left to schedule # Keep scheduling tasks until there are none left to schedule
def run_until_complete(main_task=None): def run_until_complete(main_task=None):
global cur_task global cur_task
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
while True: while True:
# Wait until the head of _task_queue is ready to run try:
dt = 1 while True:
while dt > 0: # Wait until the head of _task_queue is ready to run
dt = -1 t = _task_queue.peek()
t = _task_queue.peek() if t:
if t: # A task waiting on _task_queue; "ph_key" is time to schedule task at
# A task waiting on _task_queue; "ph_key" is time to schedule task at dt = ticks_diff(t.ph_key, ticks())
dt = max(0, ticks_diff(t.ph_key, ticks())) _io_queue.wait_io_event(dt if dt > 0 else 0)
elif not _io_queue.map: if dt <= 0:
# No tasks can be woken so finished running break
cur_task = None elif not _io_queue.map:
return # No tasks can be woken so finished running
# print('(poll {})'.format(dt), len(_io_queue.map)) cur_task = None
_io_queue.wait_io_event(dt) return
else:
_io_queue.wait_io_event(-1)
except BaseException as exc:
try:
if main_task:
main_task.coro.throw(exc)
except StopIteration:
pass
raise
# Get next task to run and continue it # Get next task to run and continue it
t = _task_queue.pop() t = _task_queue.pop()