Eine benutzerdefinierte asyncio-Ereignisschleife gibt dir volle Kontrolle über Task-Planung, I/O-Integration und Performance-Optimierung. Ideal für Webserver, Game-Engines oder Hardware-Projekte. In dieser deutschen Fassung lernst du:
- Kernkonzepte einer Event Loop
- Eine minimale Schleife mit
call_soon
undcall_later
implementieren - Fehlerbehandlung und Coroutinen-Support
- Prioritätssteuerung und I/O-Hooks
- Erweiterte Scheduler und Publishing als
asyncio
-Backend
Was ist eine Event Loop?
Eine Event Loop ist eine Dauerschleife, die kontinuierlich:
- Tasks ausführt, die mit
call_soon
sofort geplant wurden. - Tasks ausführt, deren Timer mit
call_later
abgelaufen ist. - I/O-Polling (Dateien, Sockets) durchführt und passende Callbacks aufruft.
Warum eine eigene Schleife?
Eine eigene Schleife hilft dir:
- Einblick in das Innenleben von
asyncio
. - Integration mit proprietären I/O-Quellen wie Game-Engines oder Hardware.
- Anpassung von Scheduling-Strategien (Prioritäten, Ratenbegrenzung).
Minimale Schleife implementieren
import time
import heapq
class MiniLoop:
def __init__(self):
self._ready = [] # Sofort-Callbacks
self._scheduled = [] # Heap von (timestamp, callback, args)
self._running = False
def run_forever(self):
self._running = True
while self._running:
now = time.time()
# Sofort-Callbacks ausführen
while self._ready:
cb, args = self._ready.pop(0)
try:
cb(*args)
except Exception as e:
print("Callback-Fehler:", e)
# Verzögerte Callbacks ausführen
while self._scheduled and self._scheduled[0][0] <= now:
_, cb, args = heapq.heappop(self._scheduled)
try:
cb(*args)
except Exception as e:
print("Geplanter Fehler:", e)
# Warten bis zum nächsten Timer oder kurz schlafen
if self._scheduled:
sleep_time = max(0, self._scheduled[0][0] - now)
time.sleep(sleep_time)
else:
time.sleep(0.01)
def stop(self):
self._running = False
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
heapq.heappush(self._scheduled, (time.time() + delay, callback, args))
Code-Erklärung:
self._ready
speichert sofort auszuführende Callbacks.self._scheduled
ist ein Min-Heap mit(timestamp, callback, args)
.run_forever
verarbeitet Queues und pausiert, bisstop()
aufgerufen wird.- Fehler in Callbacks werden abgefangen, damit die Schleife nicht stoppt.
Fehlerbehandlung
try:
cb(*args)
except Exception as e:
print("Loop-Fehler:", e)
Code-Erklärung:
- Verhindert, dass eine auftretende Ausnahme die gesamte Schleife beendet.
Tasks & Coroutinen
import asyncio
def create_task(loop, coro):
task = asyncio.Task(coro, loop=loop)
loop.call_soon(task._step)
return task
Code-Erklärung:
- Verpackt eine Coroutine in ein
Task
und plant ihren ersten Schritt über_step
.
Timer & Priorisierung
import heapq
class PriorityLoop(MiniLoop):
def __init__(self):
super().__init__()
self._priority = [] # Heap von (prio, callback, args)
def call_soon_priority(self, prio, callback, *args):
heapq.heappush(self._priority, (prio, callback, args))
def run_forever(self):
self._running = True
while self._running:
# Prioritäts-Callbacks zuerst
while self._priority:
_, cb, args = heapq.heappop(self._priority)
cb(*args)
# Dann normale Callbacks und Timer
super().run_forever()
Code-Erklärung:
self._priority
speichert Callbacks mit Priorität.run_forever
führt sie vor den normalen Callbacks aus.
I/O-Integration
import selectors
class IOLoop(MiniLoop):
def __init__(self):
super().__init__()
self._sel = selectors.DefaultSelector()
def add_reader(self, fd, callback, *args):
self._sel.register(fd, selectors.EVENT_READ, (callback, args))
def run_forever(self):
self._running = True
while self._running:
super().run_forever_iteration()
for key, _ in self._sel.select(timeout=0):
cb, args = key.data
cb(*args)
Code-Erklärung:
- Registriert Dateideskriptoren für I/O-Polling.
run_forever_iteration
führt eine Iteration der MiniLoop aus.
Performance & Benchmarking
import timeit
setup = '''
from asyncio import SelectorEventLoop
loop = SelectorEventLoop()
def noop(): pass
loop.call_soon(noop)
'''
print(timeit.timeit(
'loop.run_until_complete(asyncio.sleep(0))',
setup=setup,
number=1000
))
Code-Erklärung:
- Misst die Ausführungszeit eines No-Op-Coroutine im Standard-Loop.
Vollständige API-Implementierung
import selectors
import heapq
import time
import asyncio
from asyncio import Task
class FullLoop(MiniLoop):
def __init__(self):
super().__init__()
self._selector = selectors.DefaultSelector()
def add_reader(self, fd, callback, *args):
self._selector.register(fd, selectors.EVENT_READ, (callback, args))
def remove_reader(self, fd):
self._selector.unregister(fd)
def create_task(self, coro):
task = Task(coro, loop=self)
self.call_soon(task._step)
return task
def cancel_task(self, task):
task.cancel()
def call_exception_handler(self, context):
print("Exception in loop:", context)
def run_forever(self):
self._running = True
while self._running:
now = time.time()
# Ready-Callbacks & Tasks
while self._ready:
cb, args = self._ready.pop(0)
try:
cb(*args)
except Exception as e:
self.call_exception_handler({"message":"callback","exception":e})
# Timer-Callbacks
while self._scheduled and self._scheduled[0][0] <= now:
_, cb, args = heapq.heappop(self._scheduled)
try:
cb(*args)
except Exception as e:
self.call_exception_handler({"message":"timer","exception":e})
# I/O-Callbacks
timeout = max(0, self._scheduled[0][0] - now) if self._scheduled else None
for key, _ in self._selector.select(timeout):
cb, args = key.data
try:
cb(*args)
except Exception as e:
self.call_exception_handler({"message":"I/O","exception":e})
Code-Erklärung:
- Beinhaltet alle Methoden der englischen Version: I/O, Tasks, Timer, Exception-Handling.
Erweiterte Scheduler
Siehe englische Version für Beispiele zu Real-Time Deadlines, Rate Limiting und Slice Scheduling.
Custom I/O-Integration
Siehe englische Version für Beispiele zu Hardware und SSL-Sockets.
Packaging & Veröffentlichung
import asyncio
class CustomPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self): return FullLoop()
asyncio.set_event_loop_policy(CustomPolicy())
Code-Erklärung:
- Definiert eine
EventLoopPolicy
für deineFullLoop
.