Dave nikdy předtím neviděl požární alarmový systém, ale právě dokončil projekt zákaznických služeb, kde "osobní telefonní hovory zvyšují zapojení o 847%!" Jeho úvahy: "Když mě máma volá přímo, vždycky odpovím.Když se nějaký generický poplach vypne, předpokládám, že je to poplach a ignoruji ho. Fáze 1: Komunikační bod k bodu (telefonní volání fiasko) Když je detekován kouř, systém volá do každého pokoje v hotelu. Pokoj 237 dostane telefonát: „Hej, je kouř v kuchyni, prosím evakuujte.“ Pokoj 301: „Hej, je kouř v kuchyni, prosím evakuujte.“ Všechny 847 místnosti, jeden po druhém. Co se stane, když je v kuchyni malý tukový oheň A kouř v garáži? Daveův systém volá do místnosti 237: "Hej, v kuchyni je kouř." Pak okamžitě: „Ahoj, je kouř v parkovišti.“ Host je zmatený – jaká nouze? Klikněte na . Mezitím, pokoj 512 má host, který je hluchý. pokoj 623 zkontroloval před hodinami, ale telefon stále zvoní. pokoj 108 má spící dítě, jehož rodiče jsou nyní naštvaní. Po třetím falešném poplachu za týden, hosté začnou odpojovat své telefony. Jedná se o klasickou komunikaci point-to-point - jeden odesílatel, jeden přijímač, opakované 847krát. Požární inspektor se objeví: "Dave, nemůžete to udělat. Fáze 2: Rozhlasová komunikace (jaderná možnost) Ale Dave se poučil ze svých chyb. „Mám lepší nápad!“ oznámí. Dave nainstaluje masivní požární poplach, který vybuchne po celém hotelu. Úspěch! Každý to slyší! Ale pak si Dave uvědomí, že tento systém je perfektní pro oznámení. Zavírání bazénu? POŽAROVÝ ALARM. Snídaně končí za 10 minut? POŽAROVÝ ALARM. Ztracené dítě v lobby? POŽAROVÝ ALARM. Jóga třída začíná? POŽAROVÝ ALARM. Hasiči jsou nyní evakuováni pro oznámení o kontinentální snídani. Požární oddělení bylo tento týden voláno 47krát. Jedná se o vysílací komunikaci - jedna zpráva pro každého, ať si to přejí nebo ne.Je to jako použití městského nouzového vysílacího systému k oznámení prodeje garáže. Fáze 3: Dave objevuje něco revolučního Daveův třetí pokus: „Co kdybychom měli různé interkomy pro různé oblasti?“ Instaluje reproduktory po celém hotelu, ale napojí je na oddělené kanály: Pool Area Intercom: "Pool zavírá za 10 minut, servis ručníků končí" Restaurace Intercom: „Šťastný začátek hodiny, kuchyně brzy zavře“ Konferenční místnost Intercom: "Zasedací místnost B je k dispozici, WiFi heslo se změnilo" Spa Intercom: "Masážní schůzky k dispozici, údržba sauny ve 3pm" Podlahové interkomy: "Domácnost v 3. patře, ledový stroj rozbitý v 7. patře" Hosté se přirozeně přizpůsobí interkomům v oblastech, které skutečně používají. Hosté bazénu slyší aktualizace bazénu, restaurace obdrží informace o jídle, účastníci konference obdrží oznámení o místnostech pro schůzky. Co se vlastně stalo: Publish-Subscribe (Pub/Sub) Aniž by si to uvědomil, David vytvořil - jeden z nejdůležitějších vzorců moderní softwarové architektury. publish-subscribe system Zde je, jak to funguje: Vydavatelé (personál hotelu) posílají zprávy na konkrétní témata nebo kanály (aktualizace bazénu, zprávy o restauraci atd.) Předplatitelé (hosti) si vybírají, jaká témata chtějí poslouchat Vydavatelé nevědí, kdo poslouchá, předplatitelé nevědí, kdo odesílá - jsou zcela odpojeni Jedná se o komunikaci N×M: mnoho vydavatelů může posílat mnoha předplatitelům, ale je organizováno a filtrováno.Žádný hluk, žádné chybějící zprávy, žádné volání do každé místnosti individuálně. Proč je to důležité pro váš kód Dave náhodou vytvořil tři základní vzory zpráv: Přímé připojení mezi službami: Nerozšiřuje se, když potřebujete upozornit více systémů. Point-to-Point Jedno poselství pro všechny. vytváří hluk a těsné spojení. Broadcast Vydavatelé posílají zprávy do témat/kanálů, předplatitelé si vybírají, co poslouchat. Pub/Sub V reálných systémech, pub/sub řeší stejné problémy Dave čelil: E-commerce: Aktualizace zásob přecházejí na více služeb (doporučení, analýzy, oznámení) bez toho, aby zásobovací služba věděla, kdo poslouchá Chatové aplikace: Zprávy zveřejněné do kanálů, uživatelé se přihlašují k konverzacím, ve kterých jsou Microservices: Služby zveřejňují události (uživatel registrovaný, objednávka dokončená), na které mohou jiné služby reagovat nezávisle Vytvořte si vlastní pub/sub systém (je to jednodušší, než si myslíte) Pub/sub nemusí být zastrašující.Vytvořme systém třídy výroby, na který by byl Dave pyšný: from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") To je to! pracující pub / sub systém v 20 řádcích. Optimalizace pomocí Async Queues Ale co když je Daveův hotel opravdu zaneprázdněn?Naše jednoduchá verze blokuje vydavatele, když jsou předplatitelé pomalí. import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) Nyní vydavatelé nikdy blokují - jen čekají na zprávy a pokračují dál. Pracovník na pozadí zpracovává doručení všem odběratelům paralelně. Rychlí odběratelé dostávají své zprávy rychle (0,1s), zatímco pomalí zabírají svůj čas (2.5s), ale ani jeden neblokuje druhý. Jednoduchý výkon hack: uvloop Daveův asynchronizovaný systém funguje skvěle, ale pak objeví něco magického: změna jednoho řádku kódu může udělat vše 2-4x rychleji. Vstupte - Drop-in náhrada pro výchozí cyklus událostí Pythonu, který je napsán v Cythonu a je založen na libuv (stejná věc, která dělá Node.js rychle). uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) Databáze píše, API volání, operace souborů implementace uvloop založená na libuv zpracovává tisíce souběžných operací I/O efektivněji Daveův hotel nyní zvládne nahrávky paní Hendersonové v cloudu a příspěvky pana Petersonova na Instagramu současně 2. Many Concurrent Subscribers (1.5-2x speedup) Systémy se stovkami nebo tisíci předplatiteli na kanál Optimalizované plánování úkolů uvloop snižuje nadměrné zatížení Ideální pro Daveovo konferenční centrum s více než 500 odběrateli oznámení o místnosti 3. Thread Pool Operations (30-50% improvement) Synchronizace zpětných hovorů, které se přesouvají do položek Správa thread poolu uvloop je efektivnější Lepší pro Daveovy staré systémy, které nelze asynchronizovat 4. Timer and Queue Precision Přesnější časování pro metriky a omezování sazeb Lepší sledování výkonu Pomáhá Daveovi sledovat, zda jeho systém dodržuje poptávku Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! A nejlepší část? Dave náhodou zjistí, že někdy jsou nejlepší optimalizace ty, které vyžadují nejméně práce. Zero code changes Daveova reakce: "Počkejte, to je to? jen importuji uvloop a všechno se zrychluje? Vypravěč: Není to podvádění, Dave, je to jen dobré inženýrství. When uvloop matters most: Vysoký počet účastníků: 100+ účastníků na kanál I/O závažné zpětné vazby: Databáze píše, API volání, operace souborů Smíšené pracovní zatížení: kombinace rychlých a pomalých předplatitelů Citlivost na latenci: když se počítá každá milisekunda Zatímco uvloop dělá asyncio rychlejší, někteří vývojáři dávají přednost Trio pro složité systémy s tisíci souběžnými úkoly. Trio strukturovaná souběžnost a vestavěná manipulace se zpětným tlakem mohou být spolehlivější pod extrémním zatížením - je navržen tak, aby selhal elegantně spíše než záhadně, když máte 10 000+ simultánních operací. Pro Daveův hotel je asyncio+uvloop ideální. A note about Trio: Dave se pokouší nainstalovat uvloop a je zmatený chybovými zprávami. Zde je věc - uvloop vyžaduje kompilaci, takže potřebujete nainstalované vývojové nástroje. Na macOS s Homebrew: Na Windows... no, Dave se rozhodne držet se standardního okruhu událostí na Windows a nasadit na Linux ve výrobě. The uvloop installation gotcha: apt-get install build-essential brew install python3-dev Going Nuclear: paměťově mapovaný pub/sub Pro extrémní výkon nebo scénáře s více procesy můžeme použít sdílenou paměť s plnou správou předplatitelů: import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: Ultra rychlé publikování: Přímá paměť píše, žádné systémové volání Automatické směrování zpráv: Pozadí zpracovává zprávy a volá účastníky Více předplatitelů na kanál: Každý kanál může mít mnoho posluchačů Izolace chyb: Jeden špatný zpětný hovor neohrozí systém Řízení čistých zdrojů: Automatické čištění po dokončení Daveův hotel nyní zvládá miliony oznámení hostů za sekundu při zachování jednoduchého modelu "subscribe and forget", který učinil jeho systém slavným. Srovnání výkonu Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Basic Pub/Sub • 50K Malé aplikace, prototypy Jednoduchá Queued Pub/Sub • 200K Většina výrobních systémů ⭐⭐ Moderní Memory-Mapped • 5 m + Vysokofrekvenční obchodování, multiproces External (Redis) • 100K + Distribuované systémy ⭐⭐⭐ Moderní When to use each: Základní: Učení, malé projekty, <10K zpráv/s Pořadí: Většina reálných aplikací, dobře zvládá dopravní špičky Paměťové mapování: >500K zpráv/s, komunikace mezi procesy, ultra nízká latence Externí: více serverů, vytrvalost, osvědčená spolehlivost Přidejte manipulaci s chybami, vytrvalost a škálování a získáte zprávy podnikové úrovně. Populární Pub / Sub Technologies Redis Pub/Sub: Jednoduché, rychlé, skvělé pro funkce v reálném čase Apache Kafka: Enterprise-grade, zvládá masivní průtok RabbitMQ: Spolehlivé sledování zpráv s flexibilním směrováním Cloudová řešení: AWS SNS/SQS, Google Pub/Sub, Azure Service Bus Konec příběhu Davida O šest měsíců později je Dave povýšen na "Chief Communication Innovation Officer", protože skóre spokojenosti hostů je přes střechu. Dave si stále myslí, že "kanály" se vztahují k hotelovému kabelovému televiznímu systému, ale jeho interkomové zóny učí studenty informatiky po celém světě, jak fungují vzory zpráv. Dokonce i zastavené hodinky jsou správné dvakrát denně, a dokonce i Dave může narazit do dobré architektury, pokud zlomí dost věcí nejdříve. Jeho novým podnikem je revoluce v podmořské veřejné dopravě s „veřejnými ponorkami“, které jezdí na „databusech“. And what about Dave? Na otázku o technické architektuře Dave nadšeně vysvětluje: „Každý sub zveřejňuje své umístění cestujícím, kteří se přihlásí na konkrétní trasy. Je to pub/sub, ale pro sub! Městské oddělení dopravy se stále snaží zjistit, zda je génius nebo zda potřebují zrušit jeho obchodní licenci. Dave zůstává přesvědčen, že řeší „problém na poslední míli pro vodní chodce“. Příště, když jste u vašeho místního zavlažovacího otvoru, můžete vidět nový "Pub Sub" - představující Daveův revoluční "asynchronně zpracované řemeslné ponorkové sendviče s non-blokujícími řadami ingrediencí." Vánoční dárky dostává od hotelových hostů.