Rencontrez Dave, un développeur qui a obtenu le contrat d'alarme incendie de l'hôtel parce qu'il était le neveu du PDG. Dave n'avait jamais vu de système d'alarme incendie avant, mais il avait juste terminé un projet de service à la clientèle où "les appels téléphoniques personnels augmentent l'engagement de 847%!" Son raisonnement: "Quand ma mère m'appelle directement, je réponds toujours.Quand une alarme générique s'éteint, je suppose qu'il s'agit d'une alarme de voiture et l'ignore. Phase 1 : Communication point-à-point (Le fiasco des appels téléphoniques) Lorsque la fumée est détectée, le système appelle chaque chambre de l'hôtel. Chambre 237 reçoit un appel : « Bonjour, il y a de la fumée dans la cuisine, s'il vous plaît évacuer. » Chambre 301: « Bonjour, il y a de la fumée dans la cuisine, s'il vous plaît évacuer. » Que se passe-t-il quand il y a un petit feu de graisse dans la cuisine ET de la fumée dans le garage ?Le système de Dave appelle la chambre 237 : « Bonjour, il y a de la fumée dans la cuisine. » Puis immédiatement : « Bonjour, il y a de la fumée dans le garage. » L’invité est confus – quelle urgence ? Cliquez sur . Pendant ce temps, la chambre 512 a un invité qui est sourd.La chambre 623 a été vérifiée il y a des heures mais le téléphone continue de sonner.La chambre 108 a un bébé endormi dont les parents sont maintenant en colère.Après la troisième fausse alarme en une semaine, les invités commencent à déconnecter leurs téléphones. Il s'agit d'une communication point-à-point classique - un émetteur, un récepteur, répété 847 fois. L’inspecteur incendie apparaît : « Dave, vous ne pouvez pas faire cela. Installez une alarme incendie normale. » Phase 2 : Communication de radiodiffusion (option nucléaire) Mais Dave a appris de ses erreurs. « J’ai une meilleure idée ! » il annonce. Dave installe un massif d'alarme incendie qui explose dans tout l'hôtel. Succès! Tout le monde l'entend! Mais alors Dave se rend compte que ce système est PERFECT pour les annonces. Fermeture de la piscine? ALARME DE FEU. Petit déjeuner se terminant en 10 minutes? ALARME DE FEU. Enfant perdu dans le hall? ALARME DE FEU. Cours de yoga commençant? ALARME DE FEU. Les clients sont maintenant évacués pour des notifications de petit déjeuner continental.Le service des pompiers a été appelé 47 fois cette semaine.Quelqu'un a essayé de vérifier à travers la sortie d'urgence. C'est une communication de radiodiffusion - un message à tous, qu'ils le veuillent ou non. C'est comme utiliser un système de radiodiffusion d'urgence à l'échelle de la ville pour annoncer votre vente de garage. Phase 3 : Dave découvre quelque chose de révolutionnaire Troisième tentative de Dave : « Et si nous avions des intercoms différents pour des zones différentes ? » Il installe des haut-parleurs dans tout l’hôtel mais les connecte à des canaux séparés : Pool Area Intercom : « La piscine ferme dans 10 minutes, le service de serviettes se termine » Restaurant Intercom : "Happy hour start, cuisine clôture bientôt" Chambre de conférence Intercom: "La salle de réunion B est disponible, le mot de passe WiFi a été changé" Spa Intercom : « rendez-vous de massage disponibles, entretien de sauna à 15h » Intercoms spécifiques au rez-de-chaussée: "Maintenance du logement au 3ème étage, machine à glace cassée au 7ème étage" Les invités de la piscine écoutent les mises à jour de la piscine, les dîners du restaurant reçoivent des informations sur les repas, les participants à la conférence reçoivent des annonces de la salle de réunion. Qu’est-ce qu’il a vraiment fait : Publish-Subscribe (Pub/Sub) Sans s’en rendre compte, il a créé un - l'un des modèles les plus importants de l'architecture logicielle moderne. publish-subscribe system Voici comment ça fonctionne : Les éditeurs (le personnel de l’hôtel) envoient des messages à des sujets ou des canaux spécifiques (mises à jour de la piscine, nouvelles de restaurants, etc.) Les abonnés (invités) choisissent les sujets qu’ils veulent écouter Les éditeurs ne savent pas qui écoute, les abonnés ne savent pas qui envoie - ils sont complètement déconnectés C’est la communication N×M : de nombreux éditeurs peuvent envoyer à de nombreux abonnés, mais elle est organisée et filtrée. Plus de bruit, plus de messages manqués, plus d’appels à chaque chambre individuellement. Pourquoi cela compte pour votre code Dave a accidentellement construit les trois modèles de messagerie de base: Connexions directes entre services. Ne pas évoluer lorsque vous avez besoin de notifier plusieurs systèmes. Point-to-Point Un message pour tout le monde. Création de bruit et de couplage serré. Broadcast Les éditeurs envoient des messages à des sujets/canalisations, les abonnés choisissent ce qu’ils écoutent. Pub/Sub Dans les systèmes réels, pub/sub résout les mêmes problèmes que ceux rencontrés par Dave : Commerce électronique: les mises à jour de l'inventaire vont à plusieurs services (recommandations, analyses, notifications) sans que le service d'inventaire sache qui écoute Applications de chat: Messages publiés sur les canaux, les utilisateurs s'abonnent aux conversations dans lesquelles ils se trouvent Microservices : les services publient des événements (utilisateur enregistré, commande terminée) auxquels d’autres services peuvent réagir de manière indépendante. Créer votre propre système de pub / sous-service (c'est plus facile que vous ne le pensez) Pub/sub n'a pas besoin d'être intimidant. Construisons un système de qualité de production dont Dave serait fier: from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") Un système de pub/sub fonctionnant en 20 lignes. Optimisation avec Async Queues Mais que se passe-t-il si l’hôtel de Dave est vraiment occupé ?Notre version simple bloque les éditeurs lorsque les abonnés sont lents. import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) Maintenant, les éditeurs ne bloquent jamais - ils ne font que faire la queue des messages et continuer. Le travailleur en arrière-plan gère la livraison à tous les abonnés en parallèle. Les abonnés rapides reçoivent leurs messages rapidement (0,1s), tandis que les plus lents prennent leur temps (2,5s), mais ne bloquent pas l'autre. Le hack de performance facile: uvloop Le système asynchrone de Dave fonctionne bien, mais il découvre alors quelque chose de magique: changer une ligne de code peut rendre tout 2 à 4 fois plus rapide. Entrez - un remplacement drop-in pour la boucle d'événement par défaut de Python qui est écrit en Cython et basé sur libuv (la même chose qui rend Node.js rapide). uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) Base de données écrits, appels API, opérations de fichiers La mise en œuvre libuv de uvloop gère plus efficacement des milliers d'opérations I/O simultanées L'hôtel de Dave peut désormais gérer les téléchargements de journaux cloud de Mme Henderson ET les messages Instagram de M. Peterson simultanément 2. Many Concurrent Subscribers (1.5-2x speedup) Des centaines ou des milliers d’abonnés par chaîne La planification optimisée des tâches de uvloop réduit l'overhead Parfait pour le centre de conférence de Dave avec plus de 500 abonnés à la notification de chambre 3. Thread Pool Operations (30-50% improvement) Synchroniser les appels qui sont déplacés vers les pools de fil La gestion de thread pool de uvloop est plus efficace Mieux pour les systèmes héréditaires de Dave qui ne peuvent pas être rendus asynchrones 4. Timer and Queue Precision Temps plus précis pour les métriques et la limitation des taux Mieux surveiller les performances de la queue Aide Dave à suivre si son système est à la hauteur de la demande Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! La meilleure partie ? Dave découvre accidentellement que parfois les meilleures optimisations sont celles qui nécessitent le moins de travail. Zero code changes La réaction de Dave : « Attendez, c’est ça ? j’importe juste uvloop et tout va plus vite ? Narrateur: Ce n'est pas tricher, Dave. C'est juste une bonne ingénierie. When uvloop matters most: Nombre d’abonnés élevé : 100+ abonnés par canal I/O-heavy callbacks: bases de données écrites, appels API, opérations de fichiers Charges de travail mixtes : combinaison d’abonnés rapides et lents Latency-sensitive : quand chaque milliseconde compte Alors que uvloop rend asyncio plus rapide, certains développeurs préfèrent Trio pour les systèmes complexes avec des milliers de tâches simultanées. La concurrence structurée et la gestion de la pression arrière intégrée de Trio peuvent être plus fiables sous une charge extrême - il est conçu pour échouer gracieusement plutôt que mystérieusement lorsque vous avez 10 000 opérations simultanées. Pour l'hôtel de Dave, asyncio+uvloop est parfait. Pour la prochaine entreprise de Dave (un système de négociation en temps réel pour les matières premières sous-marines), le Trio pourrait empêcher quelques séances de débogage de 3 heures. A note about Trio: Dave tente d'installer uvloop et est confus par les messages d'erreur. Voici la chose - uvloop nécessite la compilation, donc vous avez besoin d'outils de développement installés. Sur macOS avec Homebrew : Sur Windows... Eh bien, Dave décide de s’en tenir à la boucle d’événements standard sur Windows et de le déployer sur Linux en production. The uvloop installation gotcha: apt-get install build-essential brew install python3-dev Going Nuclear : Pub/Sub cartographié par mémoire Pour des scénarios de performances extrêmes ou multi-proces, nous pouvons utiliser la mémoire partagée avec la gestion complète des abonnés : import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: Publication ultra-rapide : mémoire directe écrit, pas d'appels système Routage automatique des messages : le thread de fond traite les messages et appelle les abonnés Plusieurs abonnés par chaîne : Chaque chaîne peut avoir plusieurs auditeurs Isolation d'erreur : un mauvais rappel n'effondre pas le système Gestion des ressources propres : nettoyage automatique lorsqu’il est terminé L'hôtel de Dave peut maintenant gérer des millions de notifications d'hôtes par seconde tout en conservant le modèle simple "abonnez-vous et oubliez" qui a rendu son système célèbre. Comparaison des performances Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Basic Pub/Sub à 50K Petits modèles, prototypes Simple Queued Pub/Sub • 200K La plupart des systèmes de production ⭐⭐ Modérée Memory-Mapped • 5M+ Trading à haute fréquence, multi-processus External (Redis) ~ 100K + Systèmes distribués ⭐⭐⭐ Modérée When to use each: Basic: Apprentissage, petits projets, <10K messages/sec Queue: la plupart des applications réelles, gère bien les pics de trafic Mémoire cartographiée : > 500K messages/sec, communication transversale, latence ultra basse Externe : plusieurs serveurs, persistance, fiabilité éprouvée Ajoutez la gestion des erreurs, la persistance et l'évolutivité, et vous obtenez des messages de classe entreprise. Publicité / Sub Technologies Redis Pub/Sub: Simple, rapide, excellent pour les fonctionnalités en temps réel Apache Kafka: Enterprise-grade, gère le débit massif RabbitMQ : une file d’attente fiable avec un routage flexible Solutions cloud : AWS SNS/SQS, Google Pub/Sub, Azure Service Bus La fin de l'histoire de Dave Six mois plus tard, Dave est promu « Chief Communication Innovation Officer » parce que les résultats de satisfaction des clients sont à travers le toit. Dave pense toujours que les «canaux» se réfèrent au système de télévision par câble de l'hôtel, mais ses zones intercom enseignent aux étudiants en informatique du monde entier comment fonctionnent les modèles de messagerie. Même une horloge arrêtée est juste deux fois par jour, et même Dave peut se heurter à une bonne architecture s'il brise suffisamment de choses en premier. Il est revenu à ce qu'il connaît le mieux - les systèmes de transport. sa nouvelle entreprise est de révolutionner le transport public sous-marin avec "Submarins publics" qui fonctionnent sur "Data Busses". And what about Dave? Interrogé sur l’architecture technique, Dave explique avec enthousiasme : « Chaque sous-marin publie son emplacement aux passagers qui souscrivent à des itinéraires spécifiques. C’est un pub/sub, mais pour les sous-marins ! Le département des transports de la ville tente toujours de déterminer s'il est un génie ou s'il a besoin de révoquer sa licence d'affaires. La prochaine fois que vous serez dans votre trou d'arrosage local, vous pourriez voir le nouveau "Pub Sub" - avec le révolutionnaire "sandwiches sous-marins artisanaux traités de manière asynchrone avec des files d'attente d'ingrédients non-bloquants."La viande est assise dans un moulin chaud pour une "persistance optimale du message", et Dave insiste sur le pain sucré est en fait "amélioré par l'humidité pour un meilleur débit." Il reçoit toujours des cartes de Noël des clients de l'hôtel.