Conoce a Dave, un desarrollador que obtuvo el contrato de alarma de incendio del hotel porque era sobrino del CEO. Dave nunca había visto un sistema de alarma de incendio antes, pero acaba de terminar un proyecto de servicio al cliente donde "las llamadas telefónicas personales aumentan el compromiso en un 847%!" Su razonamiento: "Cuando mi madre me llama directamente, siempre respondo.Cuando alguna alarma genérica se apaga, asumo que es una alarma de coche y la ignoro. Fase 1: Comunicación de punto a punto (el fiasco de la llamada telefónica) Cuando se detecta humo, el sistema llama a cada habitación del hotel. Habitación 237 recibe una llamada: "Hola, hay humo en la cocina, por favor evacuarse." Habitación 301: "Hola, hay humo en la cocina, por favor evacuarse." ¿Qué sucede cuando hay un pequeño fuego de grasa en la cocina Y humo en el garaje?El sistema de Dave llama a la habitación 237: "Hola, hay humo en la cocina". Luego inmediatamente: "Hola, hay humo en el garaje."El huésped está confundido - ¿qué emergencia? Clique en . Mientras tanto, en la habitación 512 hay un huésped que está sordo.La habitación 623 ya había salido hace horas pero el teléfono sigue sonando.La habitación 108 tiene un bebé dormido cuyos padres están ahora furiosos.Después de la tercera falsa alarma en una semana, los huéspedes comienzan a desconectar sus teléfonos. Esta es la comunicación clásica punto a punto - un remitente, un receptor, repetido 847 veces. no se escala, es ineficiente y molesta a todos los involucrados. El inspector de incendio aparece: "Dave, no puedes hacer esto. Instalar una alarma de incendio normal." Fase 2: Comunicación de transmisión (la opción nuclear) Pero Dave ha aprendido de sus errores. "Tengo una idea mejor!" anuncia. Dave instala una MASSIVA alarma de incendio que explota en todo el hotel. SUCESO! Todo el mundo la oye! Pero entonces Dave se da cuenta de que este sistema es PERFECTO para anuncios. Cierre de la piscina? ALARMA DE INCENDIO. El desayuno termina en 10 minutos? ALARMA DE INCENDIO. Niño perdido en el vestíbulo? ALARMA DE INCENDIO. clase de yoga comenzando? ALARMA DE INCENDIO. Los huéspedes están ahora evacuando para notificaciones de desayuno continental.El departamento de bomberos ha sido llamado 47 veces esta semana.Alguien intentó comprobar a través de la salida de emergencia. Es como usar un sistema de radiodifusión de emergencia en toda la ciudad para anunciar su venta de garaje. Fase 3: Dave descubre algo revolucionario El tercer intento de Dave: "¿Y si tuviera diferentes intercoms para diferentes áreas?" Instala altavoces en todo el hotel pero los conecta a canales separados: Pool Area Intercom: "La piscina cierra en 10 minutos, el servicio de toallas termina" Restaurante Intercom: "Happy hour begins, la cocina cierra pronto" Sala de conferencias Intercom: "La sala de reuniones B está disponible, cambió la contraseña WiFi" Spa Intercom: "Encuentros de masaje disponibles, mantenimiento de sauna a las 3pm" Intercoms específicos del piso: "Casa en el piso 3, máquina de hielo roto en el piso 7" Los huéspedes de la piscina escuchan las actualizaciones de la piscina, los comensales de los restaurantes reciben información de comidas, los asistentes a conferencias reciben anuncios de sala de reuniones. Lo que realmente construyó Dave: Publish-Subscribe (Pub/Sub) Sin darse cuenta de ello, creó un - uno de los patrones más importantes en la arquitectura de software moderna. publish-subscribe system Aquí está cómo funciona: Los editores (el personal del hotel) envían mensajes a temas o canales específicos (actualizaciones de piscinas, noticias de restaurantes, etc.) Los suscriptores (invitados) eligen qué temas quieren escuchar Los editores no saben quién está escuchando, los suscriptores no saben quién está enviando - están completamente desconectados Esta es la comunicación N×M: muchos editores pueden enviar a muchos suscriptores, pero está organizado y filtrado.No más ruido, no más mensajes perdidos, no más llamando cada habitación individualmente. ¿Por qué es importante para tu código? Dave accidentalmente construyó los tres patrones fundamentales de mensajería: : Conexiones directas entre servicios. No escala cuando necesita notificar a varios sistemas. Point-to-Point : Un mensaje para todos. Crea ruido y acoplamiento apertado. Broadcast Los editores envían mensajes a los temas/canales, los suscriptores eligen qué escuchar. Pub/Sub En sistemas reales, pub/sub resuelve los mismos problemas que Dave enfrentó: Comercio electrónico: las actualizaciones de inventario van a múltiples servicios (recomendaciones, análisis, notificaciones) sin que el servicio de inventario sepa quién está escuchando Aplicaciones de chat: mensajes publicados a los canales, los usuarios se suscriben a las conversaciones en las que están Microservicios: los servicios publican eventos (usuario registrado, pedido completado) a los que otros servicios pueden reaccionar de forma independiente. Construir tu propio sistema de pub / sub (Es más fácil de lo que piensas) Pub/sub no tiene que ser intimidante. Vamos a construir un sistema de calidad de producción que Dave estaría orgulloso de: from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") Un sistema de pubs/subs en 20 líneas. Optimización con Async Queues Pero, ¿qué pasa si el hotel de Dave está realmente ocupado?Nuestra versión simple bloquea a los editores cuando los suscriptores son lentos. import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) Ahora los editores nunca bloquean - solo cuentan mensajes y siguen adelante.El trabajador de fondo gestiona la entrega a todos los suscriptores en paralelo.Los suscriptores rápidos reciben sus mensajes rápidamente (0.1s), mientras que los lentos toman su tiempo (2.5s), pero no bloquean el otro. Hackeo de rendimiento fácil: uvloop El sistema de asíncope de Dave está funcionando bien, pero entonces descubre algo mágico: cambiar una línea de código puede hacer todo 2-4 veces más rápido. Entra - un reemplazo de drop-in para el ciclo de eventos predeterminado de Python que está escrito en Cython y basado en libuv (la misma cosa que hace que Node.js sea rápido). uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) Bases de datos, llamadas de API, operaciones de archivos La implementación basada en libuv de uvloop gestiona miles de operaciones de I/O simultáneas de manera más eficiente El hotel de Dave ahora puede manejar las cargas de diario en la nube de Mrs. Henderson Y las publicaciones de Mr. Peterson en Instagram simultáneamente 2. Many Concurrent Subscribers (1.5-2x speedup) Sistemas con cientos o miles de suscriptores por canal La planificación optimizada de tareas de uvloop reduce el sobrepeso Perfecto para el centro de conferencias de Dave con más de 500 suscriptores de notificaciones de habitación 3. Thread Pool Operations (30-50% improvement) Sincronización de llamadas que se mueven a los grupos de thread La gestión del pool de thread de uvloop es más eficiente Mejor para los sistemas heredados de Dave que no se pueden hacer asíncronos 4. Timer and Queue Precision Tiempo más preciso para las métricas y la limitación de la tasa Mejor seguimiento del rendimiento de la cola Ayuda a Dave a rastrear si su sistema está siguiendo la demanda Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! ¿La mejor parte? Dave descubre accidentalmente que a veces las mejores optimizaciones son aquellas que requieren menos trabajo. Zero code changes La reacción de Dave: "¡Espera, eso es? sólo importa uvloop y todo se hace más rápido? ¡Esto se siente como engañar!" Narrador: No es engañar, Dave. Es solo una buena ingeniería. When uvloop matters most: Número de suscriptores: 100 + suscriptores por canal I/O-heavy callbacks: escritos de base de datos, llamadas de API, operaciones de archivos Cargas de trabajo mixtas: combinación de suscriptores rápidos y lentos Latency-sensitive: cuando cada milisegundo cuenta Mientras que uvloop hace asyncio más rápido, algunos desarrolladores prefieren Trio para sistemas complejos con miles de tareas simultáneas. La concorrencia estructurada de Trio y el manejo de la presión trasera integrado pueden ser más fiables bajo una carga extrema - está diseñado para fallar graciosamente en lugar de misteriosamente colgar cuando tienes 10.000 operaciones simultáneas. Para el hotel de Dave, asyncio +uvloop es perfecto. Para la próxima empresa de Dave (un sistema de negociación en tiempo real para mercancías submarinas), Trio podría evitar algunas sesiones de depuración 3am. A note about Trio: Dave intenta instalar uvloop y se confunde con los mensajes de error. Aquí está la cosa - uvloop requiere compilación, así que necesita herramientas de desarrollo instaladas. En Ubuntu/Debian: En macOS con Homebrew: En Windows... bueno, Dave decide seguir con el ciclo de eventos estándar en Windows y implementarlo en Linux en la producción. The uvloop installation gotcha: apt-get install build-essential brew install python3-dev Going Nuclear: Memory-Mapped Pub / Sub Para escenarios de rendimiento extremo o multiproceso, podemos utilizar la memoria compartida con la gestión completa de suscriptores: import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: Publicación ultrarrápida: Memoria directa escribe, sin llamadas del sistema Rutación automática de mensajes: el hilo de fondo procesa mensajes y llama a suscriptores Múltiples suscriptores por canal: Cada canal puede tener varios oyentes Aislamiento de errores: Una mala llamada no colapsa el sistema Gestión de recursos limpios: limpieza automática cuando se realiza El hotel de Dave ahora puede manejar millones de notificaciones a los huéspedes por segundo, manteniendo el simple modelo "subscribe and forget" que hizo famoso su sistema. Comparación de rendimiento Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Basic Pub/Sub - 50K Aplicaciones pequeñas, prototipos Esta sencilla Queued Pub/Sub 200K La mayoría de los sistemas de producción ⭐⭐ Moderado Memory-Mapped ~ 5M + Comercio de alta frecuencia, multiproceso ⭐⭐⭐⭐⭐ Expertos External (Redis) ~ 100K + Sistemas distribuidos ⭐⭐⭐ Moderado When to use each: Básico: Aprendizaje, pequeños proyectos, <10K mensajes/sec La cola: la mayoría de las aplicaciones reales, maneja bien los picos de tráfico Memoria mapeada: >500K mensajes/segundo, comunicación transversal, latencia ultra baja External: múltiples servidores, persistencia, fiabilidad probada Agregue el manejo de errores, la persistencia y la escalación y obtendrá mensajería de nivel empresarial. Popular Pub/Sub Tecnologías Redis Pub/Sub: sencillo, rápido, excelente para funciones en tiempo real Apache Kafka: Enterprise-grade, maneja una gran cantidad de tráfico RabbitMQ: lista de mensajes fiable con enrutamiento flexible Soluciones en la nube: AWS SNS/SQS, Google Pub/Sub, Azure Service Bus El final de la historia de Dave Seis meses después, Dave es promovido a "Chief Communication Innovation Officer" porque las puntuaciones de satisfacción de los huéspedes están a través del techo. Dave todavía piensa que "canales" se refiere al sistema de televisión por cable del hotel, pero sus zonas intercom están enseñando a los estudiantes de ciencias de la computación de todo el mundo cómo funcionan los patrones de mensajería. Incluso un reloj parado es correcto dos veces al día, e incluso Dave puede chocar con una buena arquitectura si rompe suficientes cosas primero. Su nuevo negocio está revolucionando el transporte público submarino con "submarinos públicos" que funcionan en "buses de datos". And what about Dave? Cuando se le preguntó sobre la arquitectura técnica, Dave explicó con entusiasmo: “Cada sub publica su ubicación a los pasajeros que se suscriben a rutas específicas. ¡Es un pub/sub, pero para submarinos! El departamento de transporte de la ciudad todavía está tratando de averiguar si es un genio o si necesitan revocar su licencia de negocio. Dave sigue convencido de que está resolviendo "el problema de última milla para los caminantes acuáticos". La próxima vez que estés en tu agujero de riego local, podrías ver el nuevo "Pub Sub" - que presenta el revolucionario "sandwiches submarinos artesanales procesados de forma asíncrona con colas de ingredientes no bloqueantes".La carne se sienta en un frasco caliente para "persistencia de mensaje óptima", y Dave insiste en que el pan suave es en realidad "enriquecido con humedad para una mejor rendimiento". Todavía recibe tarjetas de Navidad de los huéspedes del hotel.