Kenalilah Dave, seorang pengembang yang mendapat kontrak alarm kebakaran hotel kerana dia adalah keponakan CEO. Dave tidak pernah melihat sistem alarm kebakaran sebelum ini, tetapi dia baru selesai projek perkhidmatan pelanggan di mana "panggilan telefon peribadi meningkatkan keterlibatan dengan 847%!" Pertimbangan beliau: "Ketika ibu saya memanggil saya secara langsung, saya sentiasa menjawab.Ketika sesetengah amaran generik dimatikan, saya menganggap bahawa ia adalah amaran kereta dan mengabaikannya. sentuhan peribadi adalah Kunci!" Tahap 1: Komunikasi Point-to-Point (Pelanggaran Panggilan Telefon) Apabila asap dikesan, sistem memanggil setiap bilik di hotel. Bilik 237 menerima panggilan: "Hi, terdapat asap di dapur, sila evakuasi."Bilik 301: "Hi, terdapat asap di dapur, sila evakuasi." Semua 847 bilik, satu demi satu. Apa yang berlaku apabila terdapat api kecil lemak di dapur DAN asap di garasi? sistem Dave memanggil bilik 237: "Halo, ada asap di dapur." Kemudian segera: "Halo, ada asap di garasi." tetamu bingung - apa kecemasan? Klik sahaja. Sementara itu, bilik 512 mempunyai tetamu yang tuli. bilik 623 memeriksa keluar berjam-jam lalu tetapi telefon terus berdering. bilik 108 mempunyai bayi tidur yang ibu bapa sekarang marah. Selepas alarm palsu ketiga dalam seminggu, tetamu mula mematikan telefon mereka. Ini adalah komunikasi point-to-point klasik - satu penghantar, satu penerima, diulang 847 kali. ia tidak meluas, ia tidak berkesan, dan ia mengganggu semua yang terlibat. Inspektor kebakaran muncul: "Dave, anda tidak boleh melakukan ini. memasang alarm kebakaran biasa." Tahap 2: Komunikasi Penyiaran (Opsi Nuklear) Tetapi Dave telah belajar daripada kesilapan beliau. "Saya mempunyai idea yang lebih baik!" dia mengumumkan. Dave memasang alarm kebakaran MASSIVE yang meletup di seluruh hotel. Sukses! Semua orang mendengarnya! Tetapi kemudian Dave menyedari bahawa sistem ini Sempurna untuk pengumuman. penutupan kolam? ALARM AIR. Sarapan berakhir dalam 10 minit? ALARM AIR. Anak hilang di lobi? ALARM AIR. Kelas yoga bermula? ALARM AIR. Para tetamu kini mengevakuasi untuk pemberitahuan sarapan benua. Jabatan kebakaran telah dipanggil 47 kali minggu ini.Seseorang cuba untuk memeriksa keluar melalui pintu keluar kecemasan. Ini adalah komunikasi siaran - satu mesej kepada semua orang, sama ada mereka mahu atau tidak.Ini seperti menggunakan sistem siaran kecemasan di seluruh bandar untuk mengumumkan jualan garasi anda. Tahap 3: Dave Menemukan Sesuatu yang Revolusioner Upaya ketiga Dave: "Bagaimana jika kita mempunyai intercom yang berbeza untuk kawasan yang berbeza?"Dia memasang speaker di seluruh hotel tetapi mengalir mereka ke saluran yang berasingan: Pool Area Intercom: "Pulau ditutup dalam 10 minit, perkhidmatan handuk berakhir" Restoran Intercom: "Happy hour bermula, dapur segera ditutup" Konferensi Ruang Intercom: "Rumah Mesyuarat B tersedia, kata laluan WiFi berubah" Spa Intercom: "Perkh pijat boleh didapati, penyelenggaraan sauna pada 3pm" Intercoms spesifik lantai: "Kebersihan rumah di lantai 3, mesin ais pecah di lantai 7" Para tetamu secara semulajadi menyesuaikan diri dengan intercom di kawasan yang sebenarnya mereka gunakan. tetamu kolam mendengar kemas kini kolam, tetamu restoran mendapat maklumat makan, peserta persidangan mendapat pengumuman bilik mesyuarat. Apa yang sebenarnya telah dibina oleh Dave: Publish-Subscribe (Pub/Sub) Tanpa menyedari hal itu, beliau telah mencipta sebuah - salah satu corak yang paling penting dalam seni bina perisian moden. publish-subscribe system Berikut ialah bagaimana ia berfungsi: Penerbit (pegawai hotel) menghantar mesej kepada topik atau saluran tertentu (update kolam renang, berita restoran, dan lain-lain) Pengikut (pelawat) memilih topik yang mereka mahu dengar Penerbit tidak tahu siapa yang mendengarkan, pelanggan tidak tahu siapa yang menghantar - mereka benar-benar terlepas Ini adalah komunikasi N×M: banyak penerbit boleh menghantar kepada banyak pelanggan, tetapi ia dianjurkan dan difilter.Tiada lagi bunyi, tiada lagi mesej yang terlepas, tiada lagi memanggil setiap bilik secara individu. Mengapa ini penting untuk kod anda Dave secara tidak sengaja membina tiga corak mesej asas: : Sambungan langsung antara perkhidmatan. Tidak meluas apabila anda perlu memaklumkan beberapa sistem. Point-to-Point : Satu mesej kepada semua. Menciptakan bunyi bising dan penyambung yang ketat. Broadcast Penerbit menghantar mesej ke topik / saluran, pelanggan memilih apa yang boleh didengarkan. Pub/Sub Dalam sistem sebenar, pub/sub menyelesaikan masalah yang sama yang dihadapi Dave: E-dagang: Kemas kini persediaan pergi kepada beberapa perkhidmatan (rekomendasi, analisis, pemberitahuan) tanpa perkhidmatan persediaan tahu siapa yang mendengarkan Aplikasi Chat: Mesej yang diterbitkan kepada saluran, pengguna berlangganan kepada perbualan yang mereka berada dalam Microservices: Perkhidmatan menerbitkan peristiwa (pengguna terdaftar, pesanan selesai) yang perkhidmatan lain boleh bertindak balas secara bebas Membina sistem Pub / Sub anda sendiri (Ini lebih mudah daripada yang anda fikirkan) Pub/sub tidak perlu menakut-nakuti. mari kita membina sistem peringkat pengeluaran yang Dave akan bangga dengan: from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") Sistem pub / sub yang berfungsi dalam 20 baris. Menggunakan Async Queues Tetapi bagaimana jika hotel Dave benar-benar sibuk? versi mudah kami menghalang penerbit apabila langganan lambat. import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) Sekarang penerbit tidak pernah menghalang - mereka hanya raih mesej dan terus berjalan. pekerja latar belakang menangani penghantaran kepada semua pelanggan secara paralel. pelanggan cepat mendapat mesej mereka dengan cepat (0.1s), manakala yang lambat mengambil masa mereka (2.5s), tetapi tidak menghalang yang lain. Performa mudah hack: uvloop Sistem async Dave berfungsi dengan baik, tetapi kemudian dia mendapati sesuatu yang ajaib: mengubah satu baris kod boleh membuat segala-galanya 2-4 kali lebih cepat. Masuk - pengganti drop-in untuk laluan peristiwa lalai Python yang ditulis dalam Cython dan berdasarkan libuv (satu perkara yang membuat Node.js cepat). uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) Database menulis, panggilan API, operasi fail Implementasi berasaskan libuv uvloop menangani beribu-ribu operasi I/O bersamaan dengan lebih cekap Hotel Dave kini boleh menangani muat naik harian awan Mrs. Henderson DAN posting Instagram Mr. Peterson secara bersamaan 2. Many Concurrent Subscribers (1.5-2x speedup) Sistem dengan beratus-ratus atau ribuan pelanggan setiap saluran Jadual tugas yang dioptimumkan uvloop mengurangkan overhead Sempurna untuk pusat persidangan Dave dengan lebih daripada 500 pelanggan pemberitahuan bilik 3. Thread Pool Operations (30-50% improvement) Sync callbacks yang dipindahkan ke thread pools Pengurusan thread pool uvloop lebih cekap Lebih baik untuk sistem warisan Dave yang tidak boleh dibuat async 4. Timer and Queue Precision Waktu yang lebih tepat untuk metrik dan pembatasan kadar Pengawasan prestasi yang lebih baik Membantu Dave mengesan sama ada sistemnya mengikuti permintaan Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! Bahagian yang terbaik? Dave secara kebetulan mendapati bahawa kadang-kadang optimisasi yang terbaik ialah yang memerlukan kerja yang paling sedikit. Zero code changes Reaksi Dave: "Tunggu, itu sahaja? saya hanya mengimport uvloop dan segala-galanya menjadi lebih cepat? Ini bukan penipuan, Dave.Ini hanya kejuruteraan yang baik. When uvloop matters most: Jumlah pelanggan yang tinggi: 100+ pelanggan per saluran I/O-heavy callbacks: Database menulis, panggilan API, operasi fail beban kerja campuran: gabungan pelanggan cepat dan lambat Latency-sensitive: apabila setiap milisekund bergantung Walaupun uvloop membuat asyncio lebih cepat, sesetengah pengembang lebih suka Trio untuk sistem kompleks dengan beribu-ribu tugas bersamaan. Konvergensi terstruktur dan pengendalian backpressure bawaan Trio boleh menjadi lebih boleh dipercayai di bawah beban yang ekstrim - ia direka untuk gagal secara graceful daripada secara misteri apabila anda mempunyai 10,000+ operasi bersamaan. Untuk hotel Dave, asyncio+uvloop adalah sempurna. Untuk usaha seterusnya Dave (sistem dagangan real-time untuk komoditi bawah air), Trio mungkin menghalang beberapa sesi debug 3am. A note about Trio: Dave cuba memasang uvloop dan menjadi bingung oleh mesej ralat. Berikut adalah perkara - uvloop memerlukan kompilasi, jadi anda memerlukan alat pembangunan dipasang. Pada macOS dengan Homebrew: Pada Windows ... baiklah, Dave memutuskan untuk mematuhi laluan peristiwa standard pada Windows dan mengimplementasikan ke Linux dalam pengeluaran. The uvloop installation gotcha: apt-get install build-essential brew install python3-dev Going Nuclear: Memori-Mapped Pub / Sub Untuk skenario prestasi ekstrem atau pelbagai proses, kami boleh menggunakan memori bersama dengan pengurusan pelanggan penuh: import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: Penerbitan ultra cepat: memori langsung menulis, tiada panggilan sistem Routing mesej automatik: thread latar belakang memproses mesej dan memanggil pelanggan Pelanggan berbilang setiap saluran: Setiap saluran boleh mempunyai banyak penonton Isolasi Kesilapan: Satu panggilan balik yang buruk tidak menjejaskan sistem Pengurusan sumber bersih: Pembersihan automatik apabila selesai Versi yang dipaparkan dengan memori memberi anda antara muka pub / sub yang biasa dengan prestasi kelas syarikat. hotel Dave kini boleh menangani berjuta-juta notis tetamu setiap saat sambil mengekalkan model "mendaftar dan lupa" yang mudah yang menjadikan sistemnya terkenal. Perbandingan prestasi Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Basic Pub/Sub ~ 50k Perkhidmatan Kecil, Prototype Sederhana Queued Pub/Sub ~ 200k Kebanyakan sistem pengeluaran Moderat Memory-Mapped ~ 5M + Perdagangan frekuensi tinggi, pelbagai proses ✔ ✔ ✔ ✔ ✔ ✔ ✔ ✔ ✔ External (Redis) ~100k+ daripada Sistem yang didistribusi When to use each: Asas: Pembelajaran, projek kecil, <10K mesej/sec Kaki: Kebanyakan aplikasi sebenar, menangani puncak trafik dengan baik Memori dipaparkan: >500K mesej/sec, komunikasi lintas-proses, latensi ultra rendah Luar: Pelayan berbilang, ketekunan, kebolehpercayaan yang terbukti Tambah pengendalian ralat, kesinambungan, dan meluaskan, dan anda mempunyai mesej peringkat syarikat. Popular Pub / Sub Teknologi Redis Pub/Sub: Mudah, cepat, hebat untuk ciri-ciri masa nyata Apache Kafka: Enterprise-grade, menangani aliran besar RabbitMQ: Barisan mesej yang boleh dipercayai dengan routing yang fleksibel Penyelesaian awan: AWS SNS/SQS, Google Pub/Sub, Azure Service Bus Akhir Kisah Dave Enam bulan kemudian, Dave dipromosikan kepada "Chief Communication Innovation Officer" kerana penilaian kepuasan tetamu adalah melalui atap. Dave masih berfikir bahawa "saluran" merujuk kepada sistem TV kabel hotel, tetapi zon intercom beliau mengajar pelajar sains komputer di seluruh dunia bagaimana corak mesej berfungsi. Walaupun jam berhenti adalah betul dua kali sehari, dan walaupun Dave boleh terselip dalam seni bina yang baik jika dia memecahkan perkara yang cukup terlebih dahulu. Beliau telah kembali kepada apa yang dia tahu terbaik - sistem pengangkutan. usaha barunya adalah merevolusi pengangkutan awam bawah air dengan "Submarines Awam" yang berjalan pada "Data Bus". And what about Dave? Apabila ditanya mengenai seni bina teknikal, Dave dengan antusias menjelaskan: "Setiap sub menerbitkan lokasi mereka kepada penumpang yang mendaftar untuk laluan tertentu.Ia pub / sub, tetapi untuk sub!Dan alih-alih enjin biasa, mereka dikendalikan oleh bas data - anda tahu, untuk aliran maksimum!" Jabatan pengangkutan bandar masih cuba untuk mencari tahu sama ada beliau seorang jenius atau jika mereka perlu membatalkan lesen perniagaan beliau. Kali seterusnya anda berada di lubang pengairan tempatan anda, anda boleh melihat "Pub Sub" baru - yang menampilkan revolusioner Dave "sandwich kapal selam buatan yang diproses secara asynchronous dengan barisan bahan-bahan yang tidak menghalang." Beliau masih menerima kad Krismas daripada tetamu hotel.