Meet Dave, a developer who got the hotel fire alarm contract because he was the CEO's nephew. Dave had never seen a fire alarm system before, but he HAD just finished a customer service project where "personal phone calls increase engagement by 847%!" So naturally, Dave thinks: "Fire alarms are just customer service for emergencies!" His reasoning: "When my mom calls me directly, I always answer. When some generic alarm goes off, I assume it's a car alarm and ignore it. Personal touch is KEY!" Phase 1: Point-to-Point Communication (The Phone Call Fiasco) Dave's brilliant solution? When smoke is detected, the system calls every single room in the hotel. Room 237 gets a call: "Hi, there's smoke in the kitchen, please evacuate." Room 301: "Hi, there's smoke in the kitchen, please evacuate." All 847 rooms, one by one. Dave's proud - everyone gets the message! But here's where it gets messy. What happens when there's a small grease fire in the kitchen AND smoke in the parking garage? Dave's system calls room 237: "Hi, there's smoke in the kitchen." Click. Then immediately: "Hi, there's smoke in the parking garage." The guest is confused - which emergency? What's happening? Click. Meanwhile, room 512 has a guest who's deaf. Room 623 checked out hours ago but the phone keeps ringing. Room 108 has a sleeping baby whose parents are now furious. After the third false alarm in a week, guests start unplugging their phones. This is classic point-to-point communication - one sender, one receiver, repeated 847 times. It doesn't scale, it's inefficient, and it annoys everyone involved. The fire inspector shows up: "Dave, you can't do this. Install a normal fire alarm." Dave reluctantly complies. Phase 2: Broadcast Communication (The Nuclear Option) But Dave has learned from his mistakes. "I have a BETTER idea!" he announces. Dave installs a MASSIVE fire alarm that blasts throughout the entire hotel. Success! Everyone hears it! But then Dave realizes this system is PERFECT for announcements. Pool closing? FIRE ALARM. Breakfast ending in 10 minutes? FIRE ALARM. Lost child in the lobby? FIRE ALARM. Yoga class starting? FIRE ALARM. Guests are now evacuating for continental breakfast notifications. The fire department has been called 47 times this week. Someone tried to check out through the emergency exit. This is broadcast communication - one message to everyone, whether they want it or not. It's like using a city-wide emergency broadcast system to announce your garage sale. Phase 3: Dave Discovers Something Revolutionary Dave's third attempt: "What if we had different intercoms for different areas?" He installs speakers throughout the hotel but wires them to separate channels: Pool Area Intercom: "Pool closing in 10 minutes, towel service ending" Restaurant Intercom: "Happy hour starting, kitchen closing soon" Conference Room Intercom: "Meeting room B is available, WiFi password changed" Spa Intercom: "Massage appointments available, sauna maintenance at 3pm" Floor-Specific Intercoms: "Housekeeping on floor 3, ice machine broken on floor 7" Pool Area Intercom: "Pool closing in 10 minutes, towel service ending" Pool Area Intercom Restaurant Intercom: "Happy hour starting, kitchen closing soon" Restaurant Intercom Conference Room Intercom: "Meeting room B is available, WiFi password changed" Conference Room Intercom Spa Intercom: "Massage appointments available, sauna maintenance at 3pm" Spa Intercom Floor-Specific Intercoms: "Housekeeping on floor 3, ice machine broken on floor 7" Floor-Specific Intercoms Guests naturally tune in to the intercoms in areas they're actually using. Pool guests hear pool updates, restaurant diners get dining info, conference attendees get meeting room announcements. What Dave Actually Built: Publish-Subscribe (Pub/Sub) Without realizing it, Dave created a publish-subscribe system - one of the most important patterns in modern software architecture. publish-subscribe system Here's how it works: Publishers (the hotel staff) send messages to specific topics or channels (pool updates, restaurant news, etc.) Subscribers (guests) choose which topics they want to listen to Publishers don't know who's listening, subscribers don't know who's sending - they're completely decoupled Publishers (the hotel staff) send messages to specific topics or channels (pool updates, restaurant news, etc.) Publishers topics channels Subscribers (guests) choose which topics they want to listen to Subscribers Publishers don't know who's listening, subscribers don't know who's sending - they're completely decoupled This is N×M communication: many publishers can send to many subscribers, but it's organized and filtered. No more noise, no more missed messages, no more calling every room individually. Why This Matters for Your Code Dave accidentally built the three fundamental messaging patterns: Point-to-Point: Direct connections between services. Doesn't scale when you need to notify multiple systems. Point-to-Point Broadcast: One message to everyone. Creates noise and tight coupling. Broadcast Pub/Sub: Publishers send messages to topics/channels, subscribers choose what to listen to. Scalable, decoupled, and efficient. Pub/Sub In real systems, pub/sub solves the same problems Dave faced: E-commerce: Inventory updates go to multiple services (recommendations, analytics, notifications) without the inventory service knowing who's listening Chat apps: Messages published to channels, users subscribe to conversations they're in Microservices: Services publish events (user registered, order completed) that other services can react to independently E-commerce: Inventory updates go to multiple services (recommendations, analytics, notifications) without the inventory service knowing who's listening E-commerce Chat apps: Messages published to channels, users subscribe to conversations they're in Chat apps Microservices: Services publish events (user registered, order completed) that other services can react to independently Microservices Building Your Own Pub/Sub System (It's Easier Than You Think) Pub/sub doesn't have to be intimidating. Let's build a production-grade system that Dave would be proud of: from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") That's it! A working pub/sub system in 20 lines. Optimizing with Async Queues But what if Dave's hotel gets REALLY busy? Our simple version blocks publishers when subscribers are slow. Let's add queuing: import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) Now publishers never block - they just queue messages and keep going. The background worker handles delivery to all subscribers in parallel. Fast subscribers get their messages quickly (0.1s), while slow ones take their time (2.5s), but neither blocks the other. The Easy Performance Hack: uvloop Dave's async system is working great, but then he discovers something magical: changing one line of code can make everything 2-4x faster. Dave thinks this is obviously too good to be true, but decides to try it anyway. Enter uvloop - a drop-in replacement for Python's default event loop that's written in Cython and based on libuv (the same thing that makes Node.js fast). uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) I/O-Heavy Subscribers (2-4x speedup) Database writes, API calls, file operations uvloop's libuv-based implementation handles thousands of concurrent I/O operations more efficiently Dave's hotel can now handle Mrs. Henderson's cloud diary uploads AND Mr. Peterson's Instagram posts simultaneously Database writes, API calls, file operations uvloop's libuv-based implementation handles thousands of concurrent I/O operations more efficiently Dave's hotel can now handle Mrs. Henderson's cloud diary uploads AND Mr. Peterson's Instagram posts simultaneously 2. Many Concurrent Subscribers (1.5-2x speedup) Many Concurrent Subscribers (1.5-2x speedup) Systems with hundreds or thousands of subscribers per channel uvloop's optimized task scheduling reduces overhead Perfect for Dave's conference center with 500+ room notification subscribers Systems with hundreds or thousands of subscribers per channel uvloop's optimized task scheduling reduces overhead Perfect for Dave's conference center with 500+ room notification subscribers 3. Thread Pool Operations (30-50% improvement) Thread Pool Operations (30-50% improvement) Sync callbacks that get moved to thread pools uvloop's thread pool management is more efficient Better for Dave's legacy systems that can't be made async Sync callbacks that get moved to thread pools uvloop's thread pool management is more efficient Better for Dave's legacy systems that can't be made async 4. Timer and Queue Precision Timer and Queue Precision More accurate timing for metrics and rate limiting Better queue performance monitoring Helps Dave track whether his system is keeping up with demand More accurate timing for metrics and rate limiting Better queue performance monitoring Helps Dave track whether his system is keeping up with demand Real-world uvloop impact for Dave's hotel: Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! The best part? Zero code changes beyond that one line. Dave accidentally discovers that sometimes the best optimizations are the ones that require the least work. Zero code changes Dave's reaction: "Wait, that's it? I just import uvloop and everything gets faster? This feels like cheating!" Narrator: It's not cheating, Dave. It's just good engineering. Narrator: It's not cheating, Dave. It's just good engineering. When uvloop matters most: When uvloop matters most: High subscriber counts: 100+ subscribers per channel I/O-heavy callbacks: Database writes, API calls, file operations Mixed workloads: Combination of fast and slow subscribers Latency-sensitive: When every millisecond counts High subscriber counts: 100+ subscribers per channel High subscriber counts I/O-heavy callbacks: Database writes, API calls, file operations I/O-heavy callbacks Mixed workloads: Combination of fast and slow subscribers Mixed workloads Latency-sensitive: When every millisecond counts Latency-sensitive A note about Trio: While uvloop makes asyncio faster, some developers prefer Trio for complex systems with thousands of concurrent tasks. Trio's structured concurrency and built-in backpressure handling can be more reliable under extreme load - it's designed to fail gracefully rather than mysteriously hang when you have 10,000+ simultaneous operations. For Dave's hotel, asyncio+uvloop is perfect. For Dave's next venture (a real-time trading system for underwater commodities), Trio might prevent some 3am debugging sessions. A note about Trio: The uvloop installation gotcha: Dave tries to install uvloop and gets confused by the error messages. Here's the thing - uvloop requires compilation, so you need development tools installed. On Ubuntu/Debian: apt-get install build-essential. On macOS with Homebrew: brew install python3-dev. On Windows... well, Dave decides to stick with the standard event loop on Windows and deploy to Linux in production. Sometimes the path of least resistance is the right choice. The uvloop installation gotcha: apt-get install build-essential brew install python3-dev Going Nuclear: Memory-Mapped Pub/Sub For extreme performance or multi-process scenarios, we can use shared memory with full subscriber management: import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: What makes this special: Ultra-fast publishing: Direct memory writes, no system calls Automatic message routing: Background thread processes messages and calls subscribers Multiple subscribers per channel: Each channel can have many listeners Error isolation: One bad callback doesn't crash the system Clean resource management: Automatic cleanup when done Ultra-fast publishing: Direct memory writes, no system calls Ultra-fast publishing Automatic message routing: Background thread processes messages and calls subscribers Automatic message routing Multiple subscribers per channel: Each channel can have many listeners Multiple subscribers per channel Error isolation: One bad callback doesn't crash the system Error isolation Clean resource management: Automatic cleanup when done Clean resource management The memory-mapped version gives you the familiar pub/sub interface with enterprise-grade performance. Dave's hotel can now handle millions of guest notifications per second while maintaining the simple "subscribe and forget" model that made his system famous. Performance Comparison Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Solution Messages/Second Use Case Complexity Solution Solution Messages/Second Messages/Second Use Case Use Case Complexity Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Basic Pub/Sub Basic Pub/Sub Basic Pub/Sub ~50K ~50K Small apps, prototypes Small apps, prototypes ⭐ Simple ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Queued Pub/Sub Queued Pub/Sub Queued Pub/Sub ~200K ~200K Most production systems Most production systems ⭐⭐ Moderate ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert Memory-Mapped Memory-Mapped Memory-Mapped ~5M+ ~5M+ High-frequency trading, multi-process High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate External (Redis) External (Redis) External (Redis) ~100K+ ~100K+ Distributed systems Distributed systems ⭐⭐⭐ Moderate ⭐⭐⭐ Moderate When to use each: When to use each: Basic: Learning, small projects, <10K messages/sec Queued: Most real applications, handles traffic spikes well Memory-Mapped: >500K messages/sec, cross-process communication, ultra-low latency External: Multiple servers, persistence, proven reliability Basic: Learning, small projects, <10K messages/sec Basic Queued: Most real applications, handles traffic spikes well Queued Memory-Mapped: >500K messages/sec, cross-process communication, ultra-low latency Memory-Mapped External: Multiple servers, persistence, proven reliability External Add error handling, persistence, and scaling, and you've got enterprise-grade messaging. Popular Pub/Sub Technologies Redis Pub/Sub: Simple, fast, great for real-time features Apache Kafka: Enterprise-grade, handles massive throughput RabbitMQ: Reliable message queuing with flexible routing Cloud solutions: AWS SNS/SQS, Google Pub/Sub, Azure Service Bus Redis Pub/Sub: Simple, fast, great for real-time features Redis Pub/Sub Apache Kafka: Enterprise-grade, handles massive throughput Apache Kafka RabbitMQ: Reliable message queuing with flexible routing RabbitMQ Cloud solutions: AWS SNS/SQS, Google Pub/Sub, Azure Service Bus Cloud solutions The End of Dave's Story Six months later, Dave gets promoted to "Chief Communication Innovation Officer" because guest satisfaction scores are through the roof. The hotel is now famous for its "revolutionary messaging system." Dave still thinks "channels" refers to the hotel's cable TV system, but his intercom zones are teaching computer science students worldwide how messaging patterns work. The moral of the story? Even a stopped clock is right twice a day, and even Dave can stumble into good architecture if he breaks enough things first. And what about Dave? He's gone back to what he knows best - transportation systems. His new venture is revolutionizing underwater public transit with "Public Submarines" that run on "Data Busses." And what about Dave? When asked about the technical architecture, Dave enthusiastically explains: "Each sub publishes its location to passengers who subscribe to specific routes. It's pub/sub, but for subs! And instead of regular engines, they're powered by data busses - you know, for maximum throughput!" The city transportation department is still trying to figure out if he's a genius or if they need to revoke his business license. Dave remains convinced he's solving "the last-mile problem for aquatic commuters." Dave was so encouraged by the success of his new system that he branched out and started a new business. Next time you're at your local watering hole, you might see the new "Pub Sub" - featuring Dave's revolutionary "asynchronously processed artisanal submarine sandwiches with non-blocking ingredient queues." The meat sits in lukewarm brine for "optimal message persistence," and Dave insists the soggy bread is actually "moisture-enhanced for better throughput." He still gets Christmas cards from the hotel guests.