Faceți cunoștință cu Dave, un dezvoltator care a primit contractul de alarmă de incendiu pentru că era nepotul CEO-ului. Dave nu a mai văzut niciodată un sistem de alarmă de incendiu înainte, dar tocmai a finalizat un proiect de servicii pentru clienți în care " apelurile telefonice personale cresc angajamentul cu 847%!" Raționamentul său: „Când mama mea mă sună direct, întotdeauna răspund.Când o alarmă generică se stinge, presupun că este o alarmă de mașină și o ignor. Faza 1: Comunicarea punct-la-punct (fiasco-ul apelului telefonic) Când se detectează fum, sistemul apelează la fiecare cameră din hotel. Camera 237 primește un apel: "Bună ziua, există fum în bucătărie, vă rugăm să evacuați." Camera 301: "Bună ziua, există fum în bucătărie, vă rugăm să evacuați." Toate camerele 847, unul câte unul. Ce se întâmplă când există un mic incendiu de grăsime în bucătărie ȘI fum în garaj?Sistemul lui Dave cheamă camera 237: "Bună ziua, există fum în bucătărie". Apoi imediat: "Bună ziua, există fum în garaj." Oaspetele este confuz - ce urgență? Click pe . Între timp, camera 512 are un oaspete care este surd. camera 623 s-a verificat cu ore în urmă, dar telefonul continuă să sune. camera 108 are un copil care doarme al cărui părinți sunt acum furioși. După a treia alarmă falsă într-o săptămână, oaspeții încep să-și deconecteze telefoanele. Aceasta este o comunicare clasică punct-la-punct - un expeditor, un receptor, repetat de 847 de ori. Inspectorul de incendiu apare: "Dave, nu poți face asta. Instalați o alarmă normală de incendiu." Faza 2: Comunicarea de difuzare (opțiunea nucleară) Dar Dave a învățat din greșelile sale. „Am o idee mai bună!” anunță el. Dave instalează o alarmă masivă de incendiu care explodează în întregul hotel. Succes! Toată lumea o aude! Dar apoi Dave își dă seama că acest sistem este PERFECT pentru anunțuri. închiderea piscinei? ALARMA de incendiu. Micul dejun se termină în 10 minute? ALARMA de incendiu. Copil pierdut în lobby? ALARMA de incendiu. Clasa de yoga începe? ALARMA de incendiu. Oaspeții sunt acum evacuați pentru notificări de mic dejun continental. Departamentul de pompieri a fost sunat de 47 de ori în această săptămână. Acesta este comunicarea de radiodifuziune - un mesaj pentru toată lumea, indiferent dacă o doresc sau nu.Este ca și cum ai folosi un sistem de radiodifuziune de urgență la nivel de oraș pentru a anunța vânzarea garajului. Etapa 3: Dave descoperă ceva revoluționar A treia încercare a lui Dave: "Ce-ar fi dacă am avea intercomuri diferite pentru zone diferite?" El instalează difuzoare în întregul hotel, dar le conectează la canale separate: Pool Area Intercom: "Piscina se închide în 10 minute, serviciul de prosoape se încheie" Restaurant Intercom: "Happy hour start, bucătărie se închide în curând" Camera de conferințe Intercom: "Sala de întâlniri B este disponibilă, parola WiFi a fost schimbată" Spa Intercom: "Întâlniri de masaj disponibile, întreținere sauna la 3pm" Intercomuri specifice etajului: "Housekeeping on floor 3, ice machine broken on floor 7" Oaspeții piscinei aud actualizări ale piscinei, cinații din restaurante primesc informații despre mese, participanții la conferințe primesc anunțuri de la sala de întâlniri. Ce a construit de fapt Dave: Publish-Subscribe (Pub/Sub) Fără să-şi dea seama, David a creat o - unul dintre cele mai importante modele din arhitectura software-ului modern. publish-subscribe system Iată cum funcționează: Publisherii (personalul hotelului) trimit mesaje către subiecte sau canale specifice (actualizări de piscină, știri de restaurant etc.) Abonații (oaspeții) aleg subiectele pe care doresc să le asculte Publisherii nu știu cine ascultă, abonații nu știu cine trimite - sunt complet deconectați Aceasta este comunicarea N×M: mulți editori pot trimite la mulți abonați, dar este organizat și filtrat. De ce contează acest lucru pentru codul tău Dave a construit accidental cele trei modele de mesagerie fundamentale: Conexiuni directe între servicii. Nu scalează atunci când trebuie să notificați mai multe sisteme. Point-to-Point Un singur mesaj pentru toată lumea. Creează zgomot și îmbinare strânsă. Broadcast Publisherii trimit mesaje la subiecte/canale, abonații aleg ce să asculte. Pub/Sub În sistemele reale, pub/sub rezolvă aceleași probleme cu care s-a confruntat Dave: E-commerce: Actualizările inventarului merg la mai multe servicii (recomandări, analize, notificări) fără ca serviciul de inventar să știe cine ascultă Aplicații de chat: mesaje publicate pe canale, utilizatorii se abonează la conversații în care se află Microservices: Serviciile publică evenimente (utilizator înregistrat, comandă finalizată) la care alte servicii pot reacționa independent Construiți propriul dvs. sistem de pub / subsistem (Este mai ușor decât credeți) Pub/sub nu trebuie să fie intimidant.Haideți să construim un sistem de nivel de producție pe care Dave ar fi mândru de: from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") Un sistem de lucru pub/sub în 20 de linii. Optimizarea cu Async Queues Dar ce se întâmplă dacă hotelul lui Dave devine cu adevărat ocupat? versiunea noastră simplă blochează editorii atunci când abonații sunt lenti. import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) Acum, editorii nu blochează niciodată – ei doar urmează mesaje și continuă să meargă. Lucrătorul de fundal se ocupă de livrarea tuturor abonaților în paralel. Abonații rapizi își primesc mesajele rapid (0.1s), în timp ce cei lente își iau timpul (2.5s), dar nici unul nu blochează celălalt. Hack-ul de performanță ușor: uvloop Sistemul async al lui Dave funcționează foarte bine, dar apoi descoperă ceva magic: schimbarea unei linii de cod poate face totul de 2-4 ori mai rapid. intră - un înlocuitor drop-in pentru ciclul de evenimente implicit al Python, care este scris în Cython și bazat pe libuv (același lucru care face ca Node.js să fie rapid). uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) Baza de date scrie, apeluri API, operațiuni de fișiere Implementarea bazată pe libuv a uvloop gestionează mai eficient mii de operațiuni I/O simultane Hotelul lui Dave poate acum să se ocupe de încărcările jurnalului de cloud al doamnei Henderson ȘI de postările de pe Instagram ale domnului Peterson simultan 2. Many Concurrent Subscribers (1.5-2x speedup) Sisteme cu sute sau mii de abonați pe canal Planificarea optimizată a sarcinilor de la uvloop reduce supraaglomerarea Perfect pentru centrul de conferințe al lui Dave cu peste 500 de abonați la notificări de cameră 3. Thread Pool Operations (30-50% improvement) Sincronizarea apelurilor care se mută în bazinele de thread Managementul bazinului de fire al uvloop este mai eficient Mai bine pentru sistemele moștenite ale lui Dave care nu pot fi asincronizate 4. Timer and Queue Precision Timing mai precis pentru metrice și limitarea ratei Monitorizare mai bună a performanței în coadă Ajută-l pe Dave să urmărească dacă sistemul său respectă cererea Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! Cea mai bună parte? Dave descoperă accidental că uneori cele mai bune optimizări sunt cele care necesită cel mai puțin efort. Zero code changes Reacția lui Dave: "Așteptați, asta e? eu doar import uvloop și totul devine mai rapid? Narrator: Nu este înșelătorie, Dave. Este doar o inginerie bună. When uvloop matters most: Număr mare de abonați: 100+ abonați pe canal I/O-heavy callbacks: scrieri de baze de date, apeluri API, operațiuni de fișiere Sarcini de lucru mixte: o combinație de abonați rapizi și lenti Latency-sensitive: când fiecare milisecundă contează În timp ce uvloop face asyncio mai rapid, unii dezvoltatori preferă Trio pentru sisteme complexe cu mii de sarcini concomitente. concurența structurată a Trio și gestionarea încorporată a presiunii din spate pot fi mai fiabile sub sarcini extreme - este proiectat pentru a eșua grațios, mai degrabă decât pentru a se agăța misterios atunci când aveți 10.000 de operațiuni simultane. Pentru hotelul lui Dave, asyncio +uvloop este perfect. Pentru următoarea afacere a lui Dave (un sistem de tranzacționare în timp real pentru mărfuri subacvatice), Trio ar putea preveni câteva sesiuni de debugging de 3am. A note about Trio: Dave încearcă să instaleze uvloop și devine confuz de mesajele de eroare. Iată lucrul - uvloop necesită compilare, deci aveți nevoie de instrumente de dezvoltare instalate. Pe macOS cu Homebrew: Pe Windows... bine, Dave decide să adere la ciclul de evenimente standard pe Windows și să îl implementeze pe Linux în producție. The uvloop installation gotcha: apt-get install build-essential brew install python3-dev Going Nuclear: Memory-Mapped Pub / Sub Pentru scenarii de performanță extremă sau multi-proces, putem folosi memoria partajată cu gestionarea completă a abonaților: import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: Publicare ultra-rapidă: Memorie directă scrie, fără apeluri de sistem Routarea automată a mesajelor: firul de fundal procesează mesajele și apelează la abonați Mai mulți abonați pe canal: Fiecare canal poate avea mai mulți ascultători Izolarea erorilor: Un apel rău nu prăbușește sistemul Managementul resurselor curate: curățare automată după finalizare Versiunea cartografiată cu memorie vă oferă interfața familiară pub/sub cu performanțe de nivel enterprise. hotelul lui Dave poate acum să gestioneze milioane de notificări pentru oaspeți pe secundă, menținând în același timp modelul simplu "subscribe și uita" care a făcut faimos sistemul său. Comparație de performanță Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Basic Pub/Sub 50K Aplicații mici, prototipuri simplă Queued Pub/Sub 200K Cele mai multe sisteme de producție ⭐⭐ Moderat Memory-Mapped • 5 m+ Tranzacționare de înaltă frecvență, multi-proces Expert External (Redis) ~ 100K+ Sisteme distribuite ⭐⭐⭐ Moderat When to use each: De bază: învățare, proiecte mici, <10K mesaje/sec Coada: Cele mai multe aplicații reale, se ocupă bine de vârfurile de trafic Memorie cartografiată: >500K mesaje/sec, comunicare interproces, latență ultra scăzută Extern: mai multe servere, persistență, fiabilitate dovedită Adăugați gestionarea erorilor, persistența și scalarea și aveți mesagerie la nivel de întreprindere. Popular Pub / Sub Tehnologii Redis Pub/Sub: simplu, rapid, excelent pentru caracteristici în timp real Apache Kafka: Enterprise-grade, se ocupă de pierderea masivă RabbitMQ: Coada de mesaje fiabilă cu rutare flexibilă Soluții cloud: AWS SNS/SQS, Google Pub/Sub, Azure Service Bus Sfârșitul poveștii lui David Șase luni mai târziu, Dave este promovat la „Chief Communication Innovation Officer” deoarece scorurile de satisfacție a oaspeților sunt prin acoperiș. Dave încă crede că "canalele" se referă la sistemul de televiziune prin cablu al hotelului, dar zonele sale intercom îi învață pe studenții de informatică din întreaga lume cum funcționează modelele de mesagerie. Chiar și un ceas oprit este corect de două ori pe zi, și chiar și Dave se poate ciocni în arhitectură bună dacă el rupe suficient de multe lucruri mai întâi. Noua sa afacere revolutioneaza transportul public subacvatic cu "submarine publice" care ruleaza pe "Data Buses". And what about Dave? Întrebat despre arhitectura tehnică, Dave explică cu entuziasm: „Fiecare submarin își publică locația pasagerilor care se abonează la rute specifice. Este un pub/submarin, dar pentru submarine! Departamentul de transport al orașului încearcă încă să afle dacă el este un geniu sau dacă trebuie să-și retragă licența de afaceri. Dave a fost atât de încurajat de succesul noului său sistem încât s-a ramificat și a început o nouă afacere. data viitoare când vă aflați la gaura de udare locală, ați putea vedea noul "Pub Sub" - cu revoluționarul lui Dave "sandwich-uri submarine artizanale procesate în mod asincron cu cozi de ingrediente care nu blochează". El încă primește cărți de Crăciun de la oaspeții hotelului.