デイヴはこれまで火災警報システムを見たことがなかったが、彼は「個人的な電話通話が847%の関与を増やす」という顧客サービスプロジェクトを完成させたばかりだったので、もちろん、デイヴは「火災警報は緊急事態のための顧客サービスにすぎない!」と考える。 彼の論理は、「母が直接私に電話をかけるとき、私は常に答える。一般的なアラームが消えるとき、私はそれが車のアラームであると仮定し、それを無視します。 第1段階:ポイント・ポイント・コミュニケーション(電話・ファイアスコ) デイヴの素晴らしいソリューション? 煙が検出されたとき、システムはホテルの各部屋に電話をかけます。 Room 237 gets a call: "Hi, there's smoke in the kitchen, please evacuate." Room 301: "Hi, there's smoke in the kitchen, please evacuate." All 847 rooms, one by one. Dave's proud - everyone gets the message! キッチンに小さな脂肪が燃え上がり、駐車場で煙が出るとどうなるのか? デイヴのシステムが部屋237に電話する「こんにちは、キッチンに煙がある」 そこで、すぐに「こんにちは、駐車場に煙がある」とゲストは戸惑います - どんな緊急事態ですか? 何が起きていますか? クリックです。 一方、512号室には、うつ病のゲストがいます。623号室は数時間前にチェックアウトしましたが、電話は鳴り続けています。108号室には、両親が怒っている寝ている赤ちゃんがいます。 これは古典的なポイント対ポイント通信 - 送信者1名、受信者1名、847回繰り返します。 消防士が現れ、「デイヴ、そんなことはできないよ、普通の消防警報を設置せよ」とデイヴは反発する。 段階2:放送コミュニケーション(原子力オプション) しかし、デイヴは自分の過ちから学びました! 「私はより良いアイデアを持っています!」彼は発表します。 デイヴは大規模な火災警報器を設置し、ホテル全体で爆発します! 成功! 誰もがそれを聞く! しかし、デイヴはこのシステムが発表に完璧であることに気づきます。 プール閉鎖? 火災警報。 朝食は10分で終わりますか? 火災警報。 ロビーで子供を失いましたか? 火災警報。 ヨガクラス開始? 火災警報。 消防署は今週47回呼び出されたが、誰かが緊急出口をチェックしようとした。 これは、彼らがそれを望むかどうか、誰にでも1つのメッセージである - これは、街全体の緊急放送システムを使用してガレージの販売を発表するのと同じです。 ステージ3:Dave Discovers Something Revolutionary デイヴの第3の試みは、「異なるエリアの異なるインターコムがあったらどうだろうか?」彼はホテル全体にスピーカーをインストールしますが、それらを別々のチャンネルに接続します。 プールエリアインターコム「プール10分で閉鎖、タオルサービス終了」 レストランインターコム「Happy hour start, kitchen closing soon」 会議室インターコム「会議室Bが利用可能、WiFiパスワード変更」 スパインターコム:「マッサージ予約、サウナメンテナンスは午後3時」 ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > ホーム > プールのゲストはプールの更新を聞く、レストランのディナーは食事情報を得る、会議参加者は会議室の発表を得る。 What Dave Actually Built: Publish-Subscribe (Pub/Sub) それに気づかなければ、デイヴは創造した。 現代のソフトウェアアーキテクチャにおける最も重要なパターンの一つです。 publish-subscribe system こちらは、それがどのように機能するか: 出版社(ホテルのスタッフ)は、特定のテーマやチャンネル(プールの更新、レストランのニュースなど)にメッセージを送信します。 Subscribers (guests) choose which topics they want to listen to Publishers don't know who's listening, subscribers don't know who's sending - 彼らは完全に切り離されている これはN×M通信です:多くの出版社は多くのサブスクリプトに送信することができますが、それは組織化され、フィルタリングされています. No more noise, no more missed messages, no more calling each room individually. なぜコードが重要なのか デイヴは偶然、3つの基本的なメッセージパターンを構築した。 サービス間の直接接続:複数のシステムに通知する必要があるときにスケールしない。 Point-to-Point すべての人にメッセージを送る 騒音と緊密な接続を生み出す Broadcast Publishers send messages to topics/channels, subscribers choose what to listen to. スケーラブル、分離、効率的です。 Pub/Sub 実際のシステムでは、 pub/sub は Dave が直面した同じ問題を解決します。 電子商取引: inventory updates go to multiple services (recommendations, analytics, notifications) without the inventory service knowing who's listening. inventory updates go to multiple services (recommendations, analytics, notifications) without the inventory service knowing who's listening. チャットアプリ:チャンネルに投稿されたメッセージ、ユーザーが参加している会話にサブスクリプト マイクロサービス:サービスは、他のサービスが独立して反応できるイベント(ユーザー登録、注文完了)を公開します。 Building Your Own Pub/Sub System (It's Easier Than You Think) Pub/sub は恐ろしいものではありませんので、Dave が誇りに思う生産レベルのシステムを構築しましょう。 from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") それはそれ! 20行で働くパブ/サブシステム。 Async Queues による最適化 しかし、デイヴのホテルが本当に忙しい場合はどうでしょうか? 私たちのシンプルなバージョンは、サブスクリプトが遅いときに出版社をブロックします。 import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) 現在、出版社は決してブロックしません - 彼らはただメッセージを列にし続けるだけです. バックグラウンドワーカーは並行してすべてのサブスクリプトに配信を処理します. Fast subscribers get their messages quickly (0.1s), while slow ones take their time (2.5s), but neither blocks the other. 簡単なパフォーマンスハック: uvloop デイヴのアシンクシステムはうまく機能していますが、彼は魔法の何かを発見します:コードの1行を変更すると、すべてが2〜4倍速くなります。 入る - libuv に基づいて Cython で書かれた Python のデフォルト イベント ループの drop-in 置き換え (Node.js を速くする同じこと) uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) データベース書き込み、API呼び出し、ファイル操作 uvloopのlibuvベースの実装により、数千件の同時I/O操作がより効率的に処理されます。 デイヴのホテルでは、ヘンダーソン夫人のクラウド日記のアップロードとピーターソン氏のInstagramの投稿を同時に処理できます。 2. Many Concurrent Subscribers (1.5-2x speedup) チャンネルごとに何百人または何千人ものサブスクリプトを持つシステム uvloopの最適化されたタスクスケジュールは、オーバーヘッドを減らす 500人以上の部屋通知サブスクリプトを持つDave's Conference Centerに最適 3. Thread Pool Operations (30-50% improvement) Sync callbacks that get moved to thread pools. トレードプールに移動するコールバック uvloopのトレイドプール管理がより効率的 アシンクできないDave's legacy systems 4. Timer and Queue Precision メトリックと割合制限のためのより正確なタイミング より良いコアパフォーマンスモニタリング Dave がシステムが需要に対応しているかどうかを追跡する手助け Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! 最高の部分? デイヴは偶然、時には最善の最適化は、最低限の努力を必要とするものだということを発見する。 Zero code changes デイヴの反応は「待ってろ、それでしょ?僕はただ uvloop を輸入し、すべてが速くなりますか? これは詐欺のようだ!」 それは詐欺ではない、デイブ、ただの良いエンジニアリングだ。 When uvloop matters most: 高サブスクリプト数:チャンネルあたり100+サブスクリプト I/O-heavy callbacks: Database writes, API calls, file operations (データベースの書き込み、API呼び出し、ファイル操作) Mixed workloads: Fast and Slow Subscribers の組み合わせ Latency-sensitive: When Every Millisecond Count uvloop は asyncio をより速く行う一方で、一部の開発者は複雑なシステムで何千もの同時課題を扱うために Trio を好みます。Trio の構造化された同期および内蔵のバックプレッシャー操作は、極度の負荷下でより信頼性が高くできます - あなたが 10,000 以上の同時操作を持っているときに神秘的に掛かるのではなく、優雅に失敗するように設計されています。 Dave のホテルでは asyncio +uvloop が完璧です。 A note about Trio: Dave は uvloop をインストールしようとし、エラー メッセージに困惑します. Here's the thing - uvloop requires compilation, so you need development tools installed. Ubuntu/Debian: . Homebrew で macOS で: Windows では、Dave は Windows の標準イベント ループに従い、生産中に Linux にデプロイすることに決めます。 The uvloop installation gotcha: apt-get install build-essential brew install python3-dev Going Nuclear: Memory-Mapped Pub/Sub 極端なパフォーマンスまたは複数のプロセスシナリオでは、完全なサブスクリプター管理で共有メモリを使用できます。 import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: Ultra-fast publishing: Direct memory writes, no system calls. システム呼び出しなし 自動メッセージルーティング: Background thread processes messages and calls subscribers チャンネルごとに複数のサブスクリプター:各チャンネルに複数の聴衆がいる可能性があります。 Error isolation: One bad callback doesn't crash the system. エラー隔離: One bad callback doesn't crash the system. エラー隔離: One bad callback doesn't crash the system. クリーン・リソース・マネジメント:クリーニング終了後自動化 メモリマッピングされたバージョンは、エンタープライズグレードのパフォーマンスを備えた既知のパブ/サブインターフェイスを提供します. Dave's hotel can now handle millions of guest notifications per second whileining the simple "subscribe and forget" model that made his system famous. Dave's hotel can now handle millions of guest notifications per second. パフォーマンス比較 Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Basic Pub/Sub 【50K】 小型アプリ、プロトタイプ シンプル Queued Pub/Sub 〜200K ほとんどの生産システム 適度 Memory-Mapped 5M+ 高周波数取引、マルチプロセス ↓↓↓↓↓↓↓↓↓↓ External (Redis) 100K+ 分散システム When to use each: 基本: 学習、小さなプロジェクト、 <10K メッセージ/sec 列:ほとんどのリアルなアプリケーション、トラフィックのピークをうまく処理 メモリマッピング: >500Kメッセージ/秒、クロスプロセス通信、超低遅延 外部:複数のサーバー、持続性、証明された信頼性 エラー処理、継続性、スケーリングを追加すると、エンタープライズクラスのメッセージが表示されます。 人気のPub/Sub Technologies Redis Pub/Sub:シンプル、高速、リアルタイム機能に最適 Apache Kafka:エンタープライズグレード、大規模なトランスポートに対応 RabbitMQ: 柔軟なルーティングで信頼できるメッセージ列 クラウド ソリューション: AWS SNS/SQS、Google Pub/Sub、Azure Service Bus デイヴの物語の終わり 6カ月後、デイヴは「コミュニケーション・イノベーション・オフィシャル・シェフ」に昇進するが、ゲスト満足度は屋根を越え、ホテルは「革命的なメッセージング・システム」で有名になった。 デイヴはまだ「チャンネル」はホテルのケーブルテレビシステムを指していると考えていますが、彼のインターコムゾーンは世界中のコンピュータサイエンスの学生にメッセージパターンの働き方を教えています。 ストーリーの道徳? 止まっている時計さえ1日2回は正しいし、デイヴさえ最初に十分なものを破れば良い建築にぶつかるかもしれない。 彼の新しい起業は「データバス」で走る「公共潜水艦」で水下公共交通を革命させている。 And what about Dave? テクニカルアーキテクチャについて尋ねると、デイヴは熱心に「各サブは、特定のルートにサブスクリプトする乗客にその位置を公開する。 市の交通部門はまだ彼が天才かどうか、あるいは彼のビジネスライセンスを取り消す必要があるかどうかを調べようとしている。 次回、あなたがあなたの地元のウォーキングホールにいるとき、あなたはデイヴの革命的な「非同期的に加工された工芸用潜水艦サンドイッチと非ブロック成分の列を含む」を含む新しい「Pub Sub」を見ることができます。 ホテルのお客様からクリスマスカードが届きました。