Ontmoet Dave, 'n ontwikkelaar wat die hotel brandalarm kontrak gekry het omdat hy die CEO se neef was. Dave het nog nooit 'n brandalarm stelsel voorheen gesien nie, maar hy het net 'n kliëntediensprojek voltooi waar "persoonlike telefoonoproepe verhoog betrokkenheid met 847%!" Sy redenasie: "Wanneer my ma my direk bel, antwoord ek altyd.Wanneer 'n algemene alarm uitkom, neem ek aan dat dit 'n motoralarm is en ignoreer dit. Fase 1: Punt-tot-punt kommunikasie (die telefoon oproep fiasko) Wanneer rook gedetekteer word, roep die stelsel elke enkele kamer in die hotel. Kamer 237 kry 'n oproep: "Hallo, daar is rook in die kombuis, asseblief evakueer." Kamer 301: "Hallo, daar is rook in die kombuis, asseblief evakueer." Alle 847 kamers, een na een. Wat gebeur as daar 'n klein vetbrand in die kombuis EN rook in die parkeerplek? Dave se stelsel bel kamer 237: "Hallo, daar is rook in die kombuis." Dan onmiddellik: "Hallo, daar is rook in die parkeergarage." Die gas is verward - wat noodgeval? Klik hier. Ondertussen het kamer 512 'n gas wat doof is. kamer 623 het ure gelede uitgekoop, maar die telefoon bly klink. kamer 108 het 'n slapende baba wie se ouers nou woedend is. Dit is die klassieke punt-tot-punt kommunikasie - een afsender, een ontvanger, herhaal 847 keer. Die brandinspekteur verskyn: "Dave, jy kan dit nie doen nie. Installeer 'n normale brandalarm." Fase 2: Broadcast Communication (die kernopsie) Maar Dave het uit sy foute geleer. "Ek het 'n BETER idee!" aankondig hy. Dave installeer 'n massiewe brandalarm wat deur die hele hotel ontploff. Sukses! Almal hoor dit! Maar dan besef Dave hierdie stelsel is PERFEK vir aankondigings. Pool sluit? Vuur Alarm. Ontbyt eindig in 10 minute? Vuur Alarm. Verlore kind in die lobby? Vuur Alarm. Yoga klas begin? Vuur Alarm. Gaste word nou geëvacueer vir kontinentale ontbyt kennisgevings. Die brandweer is hierdie week 47 keer gebel. Iemand het probeer om deur die nooduitgang te kyk. Dit is uitsaai kommunikasie - een boodskap aan almal, of hulle dit wil of nie.Dit is soos om 'n stedelike nood uitsaai stelsel te gebruik om jou garage verkoop aan te kondig. Stap 3: Dave ontdek iets wat revolusionêr is Dave se derde poging: "Wat as ons verskillende intercoms vir verskillende gebiede gehad het?" Hy installeer luidsprekers in die hele hotel, maar draai hulle na afsonderlike kanale: Pool Area Intercom: "Pool sluit in 10 minute, handdoekdiens eindig" Restaurant Intercom: "Gelukkige uur begin, die kombuis sluit gou" Konferensieruimte Intercom: "Konferensieruimte B is beskikbaar, WiFi wagwoord verander" Spa Intercom: "Massage afsprake beskikbaar, sauna onderhoud by 3pm" Vloerspecifieke Intercoms: "Housekeeping op verdieping 3, ysmasjien gebreek op verdieping 7" Gaste stem natuurlik in op die intercoms in gebiede wat hulle eintlik gebruik. Pool gaste hoor pool-updates, restaurant diners kry kos inligting, konferensie deelnemers kry vergadering kamer aankondigings. Wat Dave eintlik gebou het: Publish-Subscribe (Pub/Sub) sonder om dit te besef, het Dave 'n - een van die belangrikste patrone in moderne sagteware-argitektuur. publish-subscribe system Hier is hoe dit werk: Uitgewers (die hotelpersoneel) stuur boodskappe na spesifieke onderwerpe of kanale (poolupdates, restaurant nuus, ens.) Abonnee (gaste) kies watter onderwerpe hulle wil luister Uitgewers weet nie wie luister nie, abonnees weet nie wie stuur nie - hulle is heeltemal ontkoppel Dit is N×M kommunikasie: baie uitgewers kan na baie abonneer stuur, maar dit is georganiseer en gefilter. Geen meer lawaai, geen meer gemiste boodskappe nie, nie meer roep elke kamer individueel nie. Hoekom dit belangrik is vir jou kode Dave het per ongeluk die drie basiese boodskappatrone gebou: : Direkte verbindings tussen dienste. Skaleer nie wanneer u verskeie stelsels moet kennisgewing. Point-to-Point : Een boodskap vir almal. Skep lawaai en stewige koppeling. Broadcast Uitgewers stuur boodskappe na onderwerpe / kanale, abonnees kies wat om te luister. Pub/Sub In werklike stelsels los pub/sub dieselfde probleme op wat Dave ondervind het: E-handel: Inventarisupdates gaan na verskeie dienste (aanbevelings, analitiese, kennisgewing) sonder dat die inventarisdiens weet wie luister Chat-toepassings: boodskappe wat aan kanale gepubliseer word, gebruikers abonneer op gesprekke wat hulle in Microservices: Dienste publiseer gebeurtenisse (gebruiker geregistreer, bestelling voltooi) wat ander dienste onafhanklik kan reageer Bou jou eie Pub / Sub-stelsel (Dit is makliker as wat jy dink) Pub / sub hoef nie intimiderend te wees nie. Laat ons 'n produksie-klas stelsel bou waar Dave trots op sou wees: from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") Dit is dit! 'n werkende pub / sub stelsel in 20 lyne. Optimalisering met Async Queues Maar wat as Dave se hotel regtig besig word?Ons eenvoudige weergawe blokkeer uitgewers wanneer abonnees stadig is. import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) Nou blokkeer uitgewers nooit nie - hulle volg net boodskappe en gaan voort. Die agtergrondwerkers hanteer die aflewering aan al die abonnees in parallel. Vinnig abonnees kry hul boodskappe vinnig (0.1s), terwyl stadiges hul tyd neem (2.5s), maar nie die ander blokkeer nie. Die maklike prestasie hack: uvloop Dave se asynchroniese stelsel werk goed, maar dan ontdek hy iets magies: die verandering van een rigting van kode kan alles 2-4 keer vinniger maak. Kom in - 'n drop-in vervanging vir Python se standaard gebeurtenisloop wat in Cython geskryf is en gebaseer is op libuv (dit is dieselfde ding wat Node.js vinnig maak). uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) Databasis skryf, API oproepe, lêeroperasies uvloop se libuv-gebaseerde implementasie hanteer duisende gelyktydige I/O-operasies doeltreffender Dave se hotel kan nou Mrs. Henderson se wolk dagboek oplaai EN Mr. Peterson se Instagram poste simultaan hanteer 2. Many Concurrent Subscribers (1.5-2x speedup) Stelsels met honderde of duisende abonnees per kanaal uvloop se geoptimaliseerde taakbeplanning verminder oorhead Ideaal vir Dave se konferensiecentrum met 500+ kamer kennisgewing abonnees 3. Thread Pool Operations (30-50% improvement) Synchroniseer oproepe wat na thread pools verskuif word uvloop se thread pool bestuur is meer doeltreffend Beter vir Dave se erfenisstelsels wat nie asynchroniseer kan word nie 4. Timer and Queue Precision Meer akkurate tydsberekening vir metrikes en koersbeperking Beter volgprestasie monitoring Help Dave om op te spoor of sy stelsel aan die vraag voldoen Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! Die beste deel? Dave ontdek per ongeluk dat soms die beste optimisasie diegene is wat die minste werk vereis. Zero code changes Dave se reaksie: "Waak, dit is dit? ek importeer net uvloop en alles word vinniger? Narrator: Dit is nie bedrog nie, Dave. Dit is net goeie ingenieurswese. When uvloop matters most: Hoë abonnee getal: 100+ abonnee per kanaal I/O-heavy callbacks: Database skryf, API oproepe, lêer bedrywighede Gemengde werkloosheid: 'n kombinasie van vinnige en stadig abonneer Latentie-gevoelig: wanneer elke millisekonde tel Terwyl uvloop asyncio vinniger maak, verkies sommige ontwikkelaars Trio vir komplekse stelsels met duisende gelyktydige take. Trio se gestruktureerde concurrent en ingebouwde backpressure hantering kan betroubaarder wees onder uiterste las - dit is ontwerp om gracieus te misluk eerder as geheimsinnig te hang wanneer jy 10,000+ simultan bedrywighede het. Vir Dave se hotel, asyncio+uvloop is perfek. Vir Dave se volgende onderneming ('n real-time handelsstelsel vir onderwater grondstowwe), kan Trio 'n paar 3am debugging sessies voorkom. A note about Trio: Dave probeer om uvloop te installeer en word verward deur die foutboodskappe. Hier is die ding - uvloop benodig kompilasie, so jy benodig ontwikkelingsinstrumente geïnstalleer. Op macOS met Homebrew: Op Windows ... nou, Dave besluit om aan die standaard gebeurtenisloop op Windows te hou en te implementeer na Linux in produksie. The uvloop installation gotcha: apt-get install build-essential brew install python3-dev Going Nuclear: Memory-Mapped Pub / Sub Vir uiterste prestasie of multi-proses scenario's, kan ons gedeelde geheue met volledige abonnee bestuur gebruik: import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: Ultrasnel publiseer: direkte geheue skryf, geen stelsel oproepe nie Automatiese boodskaprouting: agtergrondtrommel verwerk boodskappe en bel abonneers Meer abonnees per kanaal: Elke kanaal kan baie luisteraars hê Foute isolasie: Een slegte oproep val nie die stelsel nie Skone hulpbronne bestuur: outomatiese skoonmaak wanneer gedoen Dave se hotel kan nou miljoene gas kennisgewing per sekonde hanteer terwyl die eenvoudige "abonnement en vergeet" model wat sy stelsel beroemd gemaak het, gehandhaaf word. Prestasie vergelyking Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Basic Pub/Sub · 50k Kleine apps, prototipes Die eenvoudige Queued Pub/Sub · 200k Die meeste produksie stelsels Moderatiewe Memory-Mapped ~ 5M+ Hoge frekwensie handel, multi-proses External (Redis) ~ 100K+ Verspreide stelsels When to use each: Basis: Leer, klein projekte, <10K boodskappe / sek Kies: Die meeste werklike toepassings, hanteer verkeersspike goed Memory-Mapped: >500K boodskappe / sek, kruis-proses kommunikasie, ultra-lae latensie Eksterne: verskeie bedieners, volharding, bewese betroubaarheid Voeg foutbestuur, volharding en skaal toe, en jy het maatskappy-klas boodskap. Populêre Pub / Sub Tegnologieë Redis Pub/Sub: Eenvoudig, vinnig, groot vir real-time funksies Apache Kafka: Enterprise-grade, hanteer massiewe deurvoer RabbitMQ: betroubare boodskap seëninge met fleksibele routing Cloud oplossings: AWS SNS/SQS, Google Pub/Sub, Azure Service Bus Die einde van Dave se storie Zes maande later word Dave bevorder tot "Chief Communication Innovation Officer" omdat gastevredenheidskoerse deur die dak is. Dave dink nog steeds dat "kanale" verwys na die hotel se kabel-tv-stelsel, maar sy intercom sone leer rekenaarwetenskapstudente regoor die wêreld hoe boodskappatrone werk. Selfs 'n gestopte horlosie is twee keer per dag reg, en selfs Dave kan in goeie argitektuur struikel as hy eers genoeg dinge breek. Hy het teruggegaan na wat hy die beste ken - vervoerstelsels. sy nuwe onderneming is om onderwater openbare vervoer te revolusioneer met "Public Submarines" wat op "Data Busses" hardloop. And what about Dave? Wanneer gevra word oor die tegniese argitektuur, verduidelik Dave enthousiast: "Elke sub publiseer sy plek aan passasiers wat op spesifieke roetes abonneer. Dit is pub / sub, maar vir sub! Die stedelike vervoer afdeling probeer nog steeds uit te vind of hy 'n genie is of of hulle sy besigheidslisensie moet onttrek. Dave was so aangemoedig deur die sukses van sy nuwe stelsel dat hy uitgetrek het en 'n nuwe besigheid begin het. Volgende keer as jy by jou plaaslike watervloer is, kan jy die nuwe "Pub Sub" sien - met Dave se revolusionêre "asynchroniese verwerkte handmatige onderzeeese sandwiches met nie-blokkerende bestanddele seëne." Hy kry nog steeds Kersfeeskaarte van die hotelgaste.