Une boucle d’événements asyncio personnalisée vous offre un contrôle total : planification des tâches, intégration I/O, optimisation des performances. Idéale pour serveurs web, moteurs de jeu ou projets hardware. Dans ce guide (version française et allemande), vous apprendrez :
- Les concepts clés d’une boucle d’événements
- L’implémentation d’une boucle minimale avec
call_soon
etcall_later
- La gestion des erreurs, coroutines et hooks I/O
- Des planificateurs avancés (priorités, limitation de débit)
- Le packaging en tant que backend
asyncio
Qu’est-ce qu’une boucle d’événements ?
Une boucle d’événements exécute en boucle :
- Les tâches planifiées immédiatement (
call_soon
). - Les tâches planifiées avec délai (
call_later
). - Le sondage I/O (fichiers, sockets) et leurs callbacks.
Pourquoi créer sa propre boucle ?
Bien que la boucle intégrée réponde à la plupart des cas, une boucle personnalisée permet :
- Optimisation pour des charges spécifiques (faible latence, haut débit)
- Intégration avec I/O non standard (moteur de jeu, hardware)
- Personnalisation des politiques de planification (priorités, taux, tranches de temps)
Implémentation d’une boucle minimale
Voici une MiniLoop
basique :
import time
import heapq
class MiniLoop:
def __init__(self):
self._ready = [] # callbacks immédiats
self._scheduled = [] # heap de (timestamp, callback, args)
self._running = False
def run_forever(self):
self._running = True
while self._running:
now = time.time()
# 1) Exécuter les callbacks immédiatement
while self._ready:
cb, args = self._ready.pop(0)
try:
cb(*args)
except Exception as e:
print("Erreur callback :", e)
# 2) Exécuter les callbacks différés
while self._scheduled and self._scheduled[0][0] <= now:
_, cb, args = heapq.heappop(self._scheduled)
try:
cb(*args)
except Exception as e:
print("Erreur programmé :", e)
# 3) Pause jusqu’au prochain timer ou brièvement
if self._scheduled:
time.sleep(max(0, self._scheduled[0][0] - now))
else:
time.sleep(0.01)
def stop(self): self._running = False
def call_soon(self, callback, *args): self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
heapq.heappush(self._scheduled, (time.time()+delay, callback, args))
Bonnes pratiques de gestion d’erreurs
Placer les appels dans try/except
:
try:
cb(*args)
except Exception as e:
print("Erreur boucle :", e)
Ajout de tâches & coroutines
Pour supporter async/await
:
import asyncio
def create_task(loop, coro):
task = asyncio.Task(coro, loop=loop)
loop.call_soon(task._step)
return task
Minuteries & planification par priorité
Planifier selon une file à priorité :
import heapq
class PriorityLoop(MiniLoop):
def __init__(self):
super().__init__()
self._priority = []
def call_soon_priority(self, prio, cb, *args):
heapq.heappush(self._priority, (prio, cb, args))
def run_forever(self):
self._running=True
while self._running:
while self._priority:
_, cb, args = heapq.heappop(self._priority)
cb(*args)
super().run_forever()
Intégration I/O
Avec selectors
:
import selectors
class IOLoop(MiniLoop):
def __init__(self): super().__init__(); self._sel=selectors.DefaultSelector()
def add_reader(self, fd, cb, *a): self._sel.register(fd, selectors.EVENT_READ, (cb,a))
def run_forever(self):
while self._running:
super().run_forever_iteration()
for key,_ in self._sel.select(0):
cb,args=key.data; cb(*args)
Planificateurs avancés
- Délais précis : planifier à un timestamp exact
- Limitation de débit : réguler la fréquence d’exécution
- Tranches coopératives : répartir le temps CPU
Packaging & publication
import asyncio
class CustomPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self): return FullLoop()
asyncio.set_event_loop_policy(CustomPolicy())
Étapes suivantes & ressources
- Benchmarker avec
SelectorEventLoop
viatimeit
- Tester des sources I/O et planificateurs personnalisés