Eine benutzerdefinierte asyncio-Ereignisschleife gibt dir volle Kontrolle über Task-Planung, I/O-Integration und Performance-Optimierung. Ideal für Webserver, Game-Engines oder Hardware-Projekte. In dieser deutschen Fassung lernst du:

  • Kernkonzepte einer Event Loop
  • Eine minimale Schleife mit call_soon und call_later implementieren
  • Fehlerbehandlung und Coroutinen-Support
  • Prioritätssteuerung und I/O-Hooks
  • Erweiterte Scheduler und Publishing als asyncio-Backend

Was ist eine Event Loop?

Eine Event Loop ist eine Dauerschleife, die kontinuierlich:

  1. Tasks ausführt, die mit call_soon sofort geplant wurden.
  2. Tasks ausführt, deren Timer mit call_later abgelaufen ist.
  3. I/O-Polling (Dateien, Sockets) durchführt und passende Callbacks aufruft.

Warum eine eigene Schleife?

Eine eigene Schleife hilft dir:

  • Einblick in das Innenleben von asyncio.
  • Integration mit proprietären I/O-Quellen wie Game-Engines oder Hardware.
  • Anpassung von Scheduling-Strategien (Prioritäten, Ratenbegrenzung).

Minimale Schleife implementieren

import time
import heapq

class MiniLoop:
    def __init__(self):
        self._ready = []          # Sofort-Callbacks
        self._scheduled = []      # Heap von (timestamp, callback, args)
        self._running = False

    def run_forever(self):
        self._running = True
        while self._running:
            now = time.time()

            # Sofort-Callbacks ausführen
            while self._ready:
                cb, args = self._ready.pop(0)
                try:
                    cb(*args)
                except Exception as e:
                    print("Callback-Fehler:", e)

            # Verzögerte Callbacks ausführen
            while self._scheduled and self._scheduled[0][0] <= now:
                _, cb, args = heapq.heappop(self._scheduled)
                try:
                    cb(*args)
                except Exception as e:
                    print("Geplanter Fehler:", e)

            # Warten bis zum nächsten Timer oder kurz schlafen
            if self._scheduled:
                sleep_time = max(0, self._scheduled[0][0] - now)
                time.sleep(sleep_time)
            else:
                time.sleep(0.01)

    def stop(self):
        self._running = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def call_later(self, delay, callback, *args):
        heapq.heappush(self._scheduled, (time.time() + delay, callback, args))

Code-Erklärung:

  • self._ready speichert sofort auszuführende Callbacks.
  • self._scheduled ist ein Min-Heap mit (timestamp, callback, args).
  • run_forever verarbeitet Queues und pausiert, bis stop() aufgerufen wird.
  • Fehler in Callbacks werden abgefangen, damit die Schleife nicht stoppt.

Fehlerbehandlung

try:
    cb(*args)
except Exception as e:
    print("Loop-Fehler:", e)

Code-Erklärung:

  • Verhindert, dass eine auftretende Ausnahme die gesamte Schleife beendet.

Tasks & Coroutinen

import asyncio

def create_task(loop, coro):
    task = asyncio.Task(coro, loop=loop)
    loop.call_soon(task._step)
    return task

Code-Erklärung:

  • Verpackt eine Coroutine in ein Task und plant ihren ersten Schritt über _step.

Timer & Priorisierung

import heapq

class PriorityLoop(MiniLoop):
    def __init__(self):
        super().__init__()
        self._priority = []  # Heap von (prio, callback, args)

    def call_soon_priority(self, prio, callback, *args):
        heapq.heappush(self._priority, (prio, callback, args))

    def run_forever(self):
        self._running = True
        while self._running:
            # Prioritäts-Callbacks zuerst
            while self._priority:
                _, cb, args = heapq.heappop(self._priority)
                cb(*args)
            # Dann normale Callbacks und Timer
            super().run_forever()

Code-Erklärung:

  • self._priority speichert Callbacks mit Priorität.
  • run_forever führt sie vor den normalen Callbacks aus.

I/O-Integration

import selectors

class IOLoop(MiniLoop):
    def __init__(self):
        super().__init__()
        self._sel = selectors.DefaultSelector()

    def add_reader(self, fd, callback, *args):
        self._sel.register(fd, selectors.EVENT_READ, (callback, args))

    def run_forever(self):
        self._running = True
        while self._running:
            super().run_forever_iteration()  
            for key, _ in self._sel.select(timeout=0):
                cb, args = key.data
                cb(*args)

Code-Erklärung:

  • Registriert Dateideskriptoren für I/O-Polling.
  • run_forever_iteration führt eine Iteration der MiniLoop aus.

Performance & Benchmarking

import timeit

setup = '''
from asyncio import SelectorEventLoop
loop = SelectorEventLoop()
def noop(): pass
loop.call_soon(noop)
'''

print(timeit.timeit(
    'loop.run_until_complete(asyncio.sleep(0))',
    setup=setup,
    number=1000
))

Code-Erklärung:

  • Misst die Ausführungszeit eines No-Op-Coroutine im Standard-Loop.

Vollständige API-Implementierung

import selectors
import heapq
import time
import asyncio
from asyncio import Task

class FullLoop(MiniLoop):
    def __init__(self):
        super().__init__()
        self._selector = selectors.DefaultSelector()

    def add_reader(self, fd, callback, *args):
        self._selector.register(fd, selectors.EVENT_READ, (callback, args))
    def remove_reader(self, fd):
        self._selector.unregister(fd)

    def create_task(self, coro):
        task = Task(coro, loop=self)
        self.call_soon(task._step)
        return task

    def cancel_task(self, task):
        task.cancel()

    def call_exception_handler(self, context):
        print("Exception in loop:", context)

    def run_forever(self):
        self._running = True
        while self._running:
            now = time.time()
            # Ready-Callbacks & Tasks
            while self._ready:
                cb, args = self._ready.pop(0)
                try:
                    cb(*args)
                except Exception as e:
                    self.call_exception_handler({"message":"callback","exception":e})
            # Timer-Callbacks
            while self._scheduled and self._scheduled[0][0] <= now:
                _, cb, args = heapq.heappop(self._scheduled)
                try:
                    cb(*args)
                except Exception as e:
                    self.call_exception_handler({"message":"timer","exception":e})
            # I/O-Callbacks
            timeout = max(0, self._scheduled[0][0] - now) if self._scheduled else None
            for key, _ in self._selector.select(timeout):
                cb, args = key.data
                try:
                    cb(*args)
                except Exception as e:
                    self.call_exception_handler({"message":"I/O","exception":e})

Code-Erklärung:

  • Beinhaltet alle Methoden der englischen Version: I/O, Tasks, Timer, Exception-Handling.

Erweiterte Scheduler

Siehe englische Version für Beispiele zu Real-Time Deadlines, Rate Limiting und Slice Scheduling.

Custom I/O-Integration

Siehe englische Version für Beispiele zu Hardware und SSL-Sockets.

Packaging & Veröffentlichung

import asyncio
class CustomPolicy(asyncio.DefaultEventLoopPolicy):
    def new_event_loop(self): return FullLoop()
asyncio.set_event_loop_policy(CustomPolicy())

Code-Erklärung:

  • Definiert eine EventLoopPolicy für deine FullLoop.