ডেভ, একজন ডেভ ডেভ, যিনি হোটেলের অগ্নিকাণ্ড অ্যালার্ম চুক্তি পেয়েছিলেন কারণ তিনি সিইও এর নাতনামা ছিলেন. ডেভ আগে কোনও অগ্নিকাণ্ড অ্যালার্ম সিস্টেম দেখেনি, কিন্তু তিনি কেবলমাত্র একটি গ্রাহক সেবা প্রকল্প সম্পন্ন করেছিলেন যেখানে "ব্যক্তিগত ফোন কল 847% আবেদন বৃদ্ধি! তার কারণ: "যখন আমার মা আমাকে সরাসরি কল করে, আমি সবসময় উত্তর দিই. যখন কোনও জেনারেল অ্যালার্ম বন্ধ হয়ে যায়, আমি এটি একটি গাড়ি অ্যালার্ম অনুমান করি এবং এটি অবহেলা করি। ধাপ 1: পয়েন্ট-টু-পয়েন্ট যোগাযোগ (টেলিফোন কল ফায়াস্কো) ডেভের চমৎকার সমাধান? যখন ধোঁয়া সনাক্ত করা হয়, সিস্টেমটি হোটেলের প্রতিটি রুমে কল করে। রুম ২৩৭ ফোন করে: "হ্যালো, রান্নাঘরে ধোঁয়া আছে, দয়া করে অপসারণ করুন" রুম ৩০১: "হ্যালো, রান্নাঘরে ধোঁয়া আছে, দয়া করে অপসারণ করুন" সব 847 রুম, এক-এক। রান্নাঘরে সামান্য তেলের আগুন এবং গ্যারেজে ধোঁয়া থাকলে কী হয়? তারপর অবিলম্বে: "হ্যালো, গ্যারেজে ধোঁয়া আছে." অতিথি বিভ্রান্ত - কোন জরুরি অবস্থা? ক্লিক করুন রুম ৬২৩ ঘণ্টা আগে চেক করা হয়েছিল, কিন্তু ফোনটি ক্রমাগত চোখে পড়ে থাকে। রুম ১০৮ একটি ঘুমন্ত শিশু রয়েছে যার বাবা-মা এখন রাগী। এটি একটি ক্লাসিক পয়েন্ট-পয়েন্ট যোগাযোগ - এক বিতরণকারী, এক রিসিভার, 847 বার পুনরাবৃত্তি। ফায়ার ইনফেক্টর আসে: "ডেভ, আপনি এটি করতে পারবেন না. একটি স্বাভাবিক ফায়ার অ্যালার্ম ইনস্টল করুন." ডেভ বিরক্তিকরভাবে অনুসরণ করে। ধাপ 2: সম্প্রচার যোগাযোগ (নাক্তকরণ বিকল্প) কিন্তু ডেভ তার ভুল থেকে শিখেছে. "আমার একটি ভাল ধারণা আছে! ডেভ একটি বিশাল ফায়ার অ্যালার্ম ইনস্টল করে যা পুরো হোটেলটিতে বিস্ফোরণ করে। সাফল্য! সবাই এটি শুনে! কিন্তু তারপর ডেভ জানায় যে এই সিস্টেমটি বিজ্ঞপ্তিগুলির জন্য নিখুঁত। ক্লাস বন্ধ করা? ফায়ার অ্যালার্ম. 10 মিনিটের মধ্যে নাস্তা শেষ? ফায়ার অ্যালার্ম. লবিতে হারানো শিশু? ফায়ার অ্যালার্ম. ইয়োগা ক্লাস শুরু? ফায়ার অ্যালার্ম। অগ্নিকান্ডের জন্য এখন অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান্ডের জন্য অগ্নিকান এটি সম্প্রচার যোগাযোগ - সবাইকে একটি বার্তা, তারা এটি চান বা না. এটি একটি শহর জুড়ে জরুরি সম্প্রচার সিস্টেম ব্যবহার করে আপনার গ্যারেজ বিক্রয় ঘোষণা করার মত। ৩. ডেভ আবিষ্কার করে বিপ্লবী কিছু ডেভের তৃতীয় চেষ্টা: "আমরা যদি বিভিন্ন অঞ্চলের জন্য বিভিন্ন intercoms থাকতাম? পুল অঞ্চল ইন্টারকোম: "পুল 10 মিনিটের মধ্যে বন্ধ, টুপি পরিষেবা শেষ" রেস্টুরেন্ট ইন্টারকোম: "খুশি ঘন্টা শুরু, রান্নাঘর শীঘ্রই বন্ধ" কনফারেন্স রুম ইন্টারকোম: "মিটিং রুম বি উপলব্ধ, ওয়াইফাই পাসওয়ার্ড পরিবর্তিত" স্পা ইন্টারকোম: "ম্যাসেজ ডেটিং উপলব্ধ, 3pm এ সাবান রক্ষণাবেক্ষণ" Next article‘৩ তলায় হাউজিং, ৭ তলায় বরফ মেশিন ভেঙে’ অতিথিরা স্বাভাবিকভাবে তাদের বাস্তব ব্যবহার করা এলাকায় ইন্টারকোমের সাথে সংশ্লিষ্ট হয়. সাঁতারের অতিথিরা সাঁতারের আপডেট শুনে, রেস্তোরাঁর ডিনাররা খাবারের তথ্য পায়, সম্মেলন উপস্থিতরা মিটিং রুম বিজ্ঞপ্তি পায়। কি সত্যিই ডেভ নির্মাণ: Publish-Subscribe (Pub/Sub) এটা বুঝতে না পারলে ডেভিড একটি - আধুনিক সফটওয়্যার আর্কাইভের সবচেয়ে গুরুত্বপূর্ণ প্যাটার্নগুলির মধ্যে একটি। publish-subscribe system এখানে কিভাবে এটি কাজ করে: প্রকাশক (হোটেল কর্মী) নির্দিষ্ট থিম বা চ্যানেলগুলিতে বার্তা পাঠায় (পুল আপডেট, রেস্তোরাঁ সংবাদ, ইত্যাদি)। অতিথি (অতিথি) কোন বিষয়গুলি তারা শুনতে চায় তা নির্বাচন করুন প্রকাশকরা জানে না কে শুনছে, সাবস্ক্রাইবরা জানে না কে পাঠিয়েছে - তারা সম্পূর্ণরূপে আলাদা এটি N × M যোগাযোগ: অনেক প্রকাশক অনেক সদস্যের কাছে পাঠাতে পারেন, কিন্তু এটি সংগঠিত এবং ফিল্টার করা হয়. আর কোনও শব্দ নেই, আর কোনও মিস বার্তা নেই, আর প্রতিটি রুমকে আলাদাভাবে কল করা যায় না। কেন আপনার কোডের জন্য এটি গুরুত্বপূর্ণ ডেভ দুর্ভাগ্যবশত তিনটি মেসেজিং প্যাটার্ন তৈরি করেছিলেন: : পরিষেবাগুলির মধ্যে সরাসরি সংযোগ. যখন আপনি একাধিক সিস্টেমের বিজ্ঞপ্তি দিতে চান তখন স্ক্যালার করবেন না। Point-to-Point : সবার জন্য একটি বার্তা. শব্দ শব্দ এবং কঠোর সংযোগ তৈরি করে। Broadcast Publishers send messages to topics/channels, subscribers choose what to listen to. প্রদর্শনকারীরা থিম/চ্যানেলগুলিতে বার্তা পাঠায়, সাবস্ক্রাইবকারীরা কী শুনতে চান। Pub/Sub বাস্তব সিস্টেমে, pub/sub একই সমস্যার সমাধান করে যা ডেভের মুখোমুখি ছিল: ই-কমার্স: ইনভেন্টরি আপডেটগুলি কয়েকটি পরিষেবাগুলিতে (নামা, বিশ্লেষণ, বিজ্ঞপ্তি) যাতে ইনভেন্টরি পরিষেবাটি জানে না কে শুনছে। চ্যাট অ্যাপ্লিকেশন: চ্যানেলগুলিতে প্রকাশিত বার্তা, ব্যবহারকারীরা তাদের কথোপকথনের জন্য সাবস্ক্রাইব করে মাইক্রো সেবা: পরিষেবাগুলি ঘটনাগুলি প্রকাশ করে (ব্যবহারকারী নিবন্ধিত, অর্ডার সম্পন্ন) যা অন্যান্য পরিষেবাগুলি স্বাধীনভাবে প্রতিক্রিয়া করতে পারে আপনার নিজস্ব প্যাব / সাব সিস্টেম নির্মাণ করুন (আপনি ভাবার চেয়ে এটি সহজ) Pub/sub শঙ্কিত হতে হবে না. চলুন একটি উৎপাদন স্তরের সিস্টেম তৈরি করি যা ডেভ গর্বিত হবেন: from collections import defaultdict from typing import Dict, List, Callable class HotelPubSub: def __init__(self): self.channels: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, channel: str, callback: Callable): """Guest subscribes to hotel updates""" self.channels[channel].append(callback) print(f"Subscribed to {channel}") def publish(self, channel: str, message: str): """Hotel staff publishes updates""" if channel in self.channels: for callback in self.channels[channel]: callback(message) print(f"Published to {channel}: {message}") # Dave's hotel in action hotel = HotelPubSub() # Guests subscribe to what they care about def pool_guest(msg): print(f"🏊 Pool Guest: {msg}") def restaurant_guest(msg): print(f"🍽️ Restaurant Guest: {msg}") hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Hotel publishes updates hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") একটি কাজ pub / sub সিস্টেম 20 লাইন। Async Queues এর ব্যবহার কিন্তু যদি ডেভের হোটেলটি সত্যিই ব্যস্ত হয়ে যায়? আমাদের সহজ সংস্করণটি প্রকাশকদের ব্লক করে, যখন সাবস্ক্রাইব লম্বা হয়। import asyncio from asyncio import Queue from collections import defaultdict class AsyncPubSub: def __init__(self): self.channels = defaultdict(list) self.message_queue = Queue() # Start background worker asyncio.create_task(self._worker()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) async def publish(self, channel: str, message: str): # Non-blocking publish - just queue it await self.message_queue.put((channel, message)) async def _worker(self): # Background worker processes messages while True: channel, message = await self.message_queue.get() if channel in self.channels: # Run all subscribers in parallel tasks = [callback(message) for callback in self.channels[channel]] await asyncio.gather(*tasks, return_exceptions=True) # Dave's async hotel in action async def demo(): hotel = AsyncPubSub() # Fast and slow subscribers async def fast_guest(msg): await asyncio.sleep(0.1) # Quick processing print(f"🏊 Fast: {msg}") async def slow_guest(msg): await asyncio.sleep(2.5) # Simulate slow database write print(f"🍽️ Slow: {msg}") hotel.subscribe("updates", fast_guest) hotel.subscribe("updates", slow_guest) # Publishers don't wait for slow subscribers await hotel.publish("updates", "Pool closing!") await hotel.publish("updates", "Happy hour!") await asyncio.sleep(0.2) # Let everything finish # asyncio.run(demo()) এখন প্রকাশকরা কখনো ব্লক করে না - তারা শুধু বার্তাগুলি রেখে এবং চলতে থাকে। ব্যাকআপ কর্মী সমস্ত সাবস্ক্রাইবের কাছে সরবরাহ পরিচালনা করে। দ্রুত সাবস্ক্রাইবরা তাদের বার্তাগুলি দ্রুত (0.1s) পায়, যখন ধীরে ধীরে তাদের সময় নেয় (2.5s), কিন্তু উভয়ই অন্যকে ব্লক করে না। সহজ পারফরম্যান্স হ্যাক: uvloop ডেভের অসিনক সিস্টেমটি চমৎকারভাবে কাজ করে, কিন্তু তারপর তিনি কিছু জাদুকরী আবিষ্কার করেন: কোডের একটি লাইন পরিবর্তন করে সবকিছু 2-4 গুণ দ্রুত করতে পারে। প্রবেশ - Python এর ডিফল্ট ইভেন্ট লুকের জন্য একটি ড্রপ-ইন প্রতিস্থাপন যা Cython এ লিখিত এবং libuv উপর ভিত্তি করে (এটি একই জিনিস যা Node.js দ্রুত করে তোলে)। uvloop import uvloop import asyncio from collections import defaultdict import time # The magic line that makes Dave's hotel 2-4x faster asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class TurboHotelPubSub: def __init__(self, max_queue_size=10000): self.channels = defaultdict(list) self.message_queue = asyncio.Queue(maxsize=max_queue_size) self.running = True self.performance_metrics = { 'messages_per_second': 0, 'avg_latency_ms': 0, 'active_subscribers': 0 } # Start background worker and metrics collector asyncio.create_task(self._worker()) asyncio.create_task(self._metrics_collector()) def subscribe(self, channel: str, callback): self.channels[channel].append(callback) self.performance_metrics['active_subscribers'] = sum(len(subs) for subs in self.channels.values()) print(f"Subscribed to {channel} (Total subscribers: {self.performance_metrics['active_subscribers']})") async def publish(self, channel: str, message: str): timestamp = time.time() message_with_timestamp = (channel, message, timestamp) try: self.message_queue.put_nowait(message_with_timestamp) except asyncio.QueueFull: # Backpressure - with uvloop, this is much faster await self.message_queue.put(message_with_timestamp) async def _worker(self): """Background worker - uvloop makes this significantly faster""" while self.running: channel, message, publish_time = await self.message_queue.get() # Measure end-to-end latency processing_start = time.time() if channel in self.channels: # uvloop excels at handling many concurrent tasks tasks = [] for callback in self.channels[channel]: if asyncio.iscoroutinefunction(callback): tasks.append(callback(message)) else: # uvloop's thread pool is much more efficient tasks.append(asyncio.get_event_loop().run_in_executor(None, callback, message)) # uvloop's gather is optimized for many concurrent operations await asyncio.gather(*tasks, return_exceptions=True) # Track latency total_latency = (time.time() - publish_time) * 1000 # Convert to ms self.performance_metrics['avg_latency_ms'] = ( self.performance_metrics['avg_latency_ms'] * 0.9 + total_latency * 0.1 ) self.message_queue.task_done() async def _metrics_collector(self): """Track messages per second - uvloop's timer precision helps here""" last_time = time.time() last_count = 0 while self.running: await asyncio.sleep(1) current_time = time.time() # In uvloop, queue.qsize() is more accurate and faster current_count = getattr(self.message_queue, '_finished', 0) if current_time - last_time >= 1: messages_processed = current_count - last_count self.performance_metrics['messages_per_second'] = messages_processed last_time, last_count = current_time, current_count def get_performance_stats(self): return self.performance_metrics.copy() # Dave's hotel with uvloop superpowers async def benchmark_uvloop_vs_standard(): """Demonstrate uvloop performance improvements""" # Simulate I/O-heavy subscribers (database writes, API calls) async def database_subscriber(msg): # Simulate database write await asyncio.sleep(0.001) # 1ms "database" call return f"DB: {msg}" async def api_subscriber(msg): # Simulate API call await asyncio.sleep(0.002) # 2ms "API" call return f"API: {msg}" def analytics_subscriber(msg): # Simulate CPU-heavy sync work time.sleep(0.0005) # 0.5ms CPU work return f"Analytics: {msg}" hotel = TurboHotelPubSub() # Subscribe multiple handlers to same channel for i in range(10): # 10 database subscribers hotel.subscribe("orders", database_subscriber) for i in range(5): # 5 API subscribers hotel.subscribe("orders", api_subscriber) for i in range(20): # 20 analytics subscribers hotel.subscribe("orders", analytics_subscriber) print("Starting benchmark with uvloop...") start_time = time.time() # Publish lots of messages for i in range(1000): await hotel.publish("orders", f"Order #{i}") # Wait for processing to complete await hotel.message_queue.join() end_time = time.time() stats = hotel.get_performance_stats() print(f"Benchmark complete in {end_time - start_time:.2f} seconds") print(f"Performance stats: {stats}") print(f"Total subscriber callbacks: {stats['active_subscribers'] * 1000:,}") return end_time - start_time # Uncomment to run benchmark # asyncio.run(benchmark_uvloop_vs_standard()) What uvloop supercharges: 1. I/O-Heavy Subscribers (2-4x speedup) ডাটাবেস লিখে, API কল, ফাইল অপারেশন uvloop এর libuv ভিত্তিক বাস্তবায়ন হাজার হাজার একসাথে I / O অপারেশন আরও দক্ষভাবে পরিচালনা করে ডেভ হোটেল এখন মিসেস হেন্ডার্সনের ক্লাউড ডায়েরি আপলোড এবং মিসেস পিটার্সনের ইনস্টাগ্রাম পোস্টগুলি একই সময়ে পরিচালনা করতে পারে 2. Many Concurrent Subscribers (1.5-2x speedup) প্রতিটি চ্যানেলের জন্য শত শত বা হাজার সাবস্ক্রাইবের সাথে সিস্টেম uvloop এর অপ্টিমাইজড কাজ পরিকল্পনা overhead কমায় ডেভের কনফারেন্স সেন্টারের জন্য উপযুক্ত, 500+ রুম নোটিশ সাবস্ক্রাইবের সাথে 3. Thread Pool Operations (30-50% improvement) Sync callbacks যা thread pools এ স্থানান্তরিত হয় uvloop এর thread pool ম্যানেজমেন্ট আরও কার্যকরী ডেভের ঐতিহ্যগত সিস্টেমগুলির জন্য ভাল যা অ্যাসিনাক করা যাবে না 4. Timer and Queue Precision মেট্রিক এবং হার সীমাবদ্ধ করার জন্য আরও সঠিক সময়সূচি উন্নত ক্রেডিট কর্মক্ষমতা নিরীক্ষা ডেভ অনুসন্ধান করতে সাহায্য করে যে তার সিস্টেম চাহিদা অনুসরণ করছে কিনা Real-world uvloop impact for Dave's hotel: # Before uvloop: 15,000 notifications/second # After uvloop: 35,000 notifications/second # Same code, 2.3x faster! সবচেয়ে ভালো অংশ? ডেভ দুর্ভাগ্যবশত আবিষ্কার করে যে কখনও কখনও সেরা অপ্টিমাইজেশনগুলি এমনগুলি হয় যা সর্বনিম্ন কাজের প্রয়োজন। Zero code changes ডেভের প্রতিক্রিয়া: "তাহলে অপেক্ষা করুন, এটা কি? আমি শুধু uvloop আমদানি করি এবং সবকিছু দ্রুত হয়ে যায়? এটা প্রতারণা নয়, ডেভ, এটা শুধু ভাল প্রকৌশল। When uvloop matters most: উচ্চ সাবস্ক্রাইবার সংখ্যা: 100+ সাবস্ক্রাইবার প্রতি চ্যানেল : Database writes, API calls, file operations I/O-heavy callbacks মিশ্র ওয়ার্ক লোড: দ্রুত এবং ধীরে ধীরে সাবস্ক্রাইবের সংমিশ্রণ দীর্ঘস্থায়ী: যখন প্রতিটি মিলিসেকেন্ড হিসাব করে যদিও uvloop asyncio দ্রুত করে তুলতে পারে, কিছু ডেভেলপারগুলি একই সময়ে হাজার হাজার কাজের সাথে জটিল সিস্টেমগুলির জন্য Trio পছন্দ করে। Trio এর গঠনগত সমন্বয় এবং ভিত্তিক পিছনে চাপ হ্যান্ডলিং অত্যন্ত লোডের অধীনে আরও নির্ভরযোগ্য হতে পারে - এটি মজাদারভাবে ব্যর্থ হওয়ার পরিবর্তে রহস্যময়ভাবে আটকে যাওয়ার জন্য ডিজাইন করা হয়েছে যখন আপনার 10,000+ একই সময়ে অপারেশন আছে। A note about Trio: ডেভ uvloop ইনস্টল করার চেষ্টা করে এবং ত্রুটি বার্তা দ্বারা বিভ্রান্ত হয়. এখানে জিনিস - uvloop compilation প্রয়োজন, তাই আপনি ডেভেলপমেন্ট টুল ইনস্টল প্রয়োজন. Ubuntu / Debian: . Homebrew সঙ্গে macOS এ: উইন্ডোজ ... ভাল, ডেভ সিদ্ধান্ত নেয় উইন্ডোজের স্ট্যান্ডার্ড ইভেন্ট লুকের সাথে থাকতে এবং উৎপাদনে লিনাক্সে বিতরণ করতে। The uvloop installation gotcha: apt-get install build-essential brew install python3-dev পারমাণবিক হওয়া: মেমরি-ম্যাপিং Pub/Sub চরম কর্মক্ষমতা বা মাল্টি-প্রসেস পরিস্থিতিতে, আমরা সম্পূর্ণ সাবস্ক্রাইবার ম্যানেজমেন্টের সাথে ভাগ করা মেমরি ব্যবহার করতে পারি: import mmap import struct import multiprocessing import threading import time from collections import defaultdict from typing import Callable, Dict, List class MemoryMappedPubSub: def __init__(self, buffer_size=1024*1024): # 1MB buffer # Create shared memory buffer self.buffer_size = buffer_size self.shared_file = f'/tmp/pubsub_{multiprocessing.current_process().pid}' # Initialize memory-mapped file with open(self.shared_file, 'wb') as f: f.write(b'\x00' * buffer_size) self.mmap = mmap.mmap(open(self.shared_file, 'r+b').fileno(), 0) # Layout: [head_pos][tail_pos][message_data...] self.head_offset = 0 self.tail_offset = 8 self.data_offset = 16 # Subscriber management self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.listening = False self.listener_thread = None def subscribe(self, channel: str, callback: Callable): """Subscribe to a channel with a callback function""" self.subscribers[channel].append(callback) print(f"Subscribed to channel: {channel}") # Start listener if not already running if not self.listening: self.start_listening() def start_listening(self): """Start background thread to listen for messages""" if self.listening: return self.listening = True self.listener_thread = threading.Thread(target=self._listen_loop, daemon=True) self.listener_thread.start() print("Started listening for messages...") def stop_listening(self): """Stop the message listener""" self.listening = False if self.listener_thread: self.listener_thread.join() def _listen_loop(self): """Background loop that processes incoming messages""" while self.listening: messages = self.read_messages() for message in messages: self._process_message(message) time.sleep(0.001) # Small delay to prevent excessive CPU usage def _process_message(self, message: str): """Process a single message and notify subscribers""" try: if ':' in message: channel, content = message.split(':', 1) if channel in self.subscribers: for callback in self.subscribers[channel]: try: callback(content) except Exception as e: print(f"Error in callback for channel {channel}: {e}") except Exception as e: print(f"Error processing message: {e}") def publish(self, channel: str, message: str): """Ultra-fast direct memory write""" data = f"{channel}:{message}".encode() # Get current tail position tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] # Check if we have enough space (simple wraparound) available_space = self.buffer_size - self.data_offset - tail if available_space < len(data) + 4: # Reset to beginning if we're near the end tail = 0 # Write message length + data struct.pack_into('I', self.mmap, self.data_offset + tail, len(data)) self.mmap[self.data_offset + tail + 4:self.data_offset + tail + 4 + len(data)] = data # Update tail pointer new_tail = tail + 4 + len(data) struct.pack_into('Q', self.mmap, self.tail_offset, new_tail) def read_messages(self): """Ultra-fast direct memory read""" head = struct.unpack('Q', self.mmap[self.head_offset:self.head_offset+8])[0] tail = struct.unpack('Q', self.mmap[self.tail_offset:self.tail_offset+8])[0] messages = [] current = head while current < tail: try: # Read message length msg_len = struct.unpack('I', self.mmap[self.data_offset + current:self.data_offset + current + 4])[0] # Safety check if msg_len > self.buffer_size or msg_len <= 0: break # Read message data data = self.mmap[self.data_offset + current + 4:self.data_offset + current + 4 + msg_len] messages.append(data.decode()) current += 4 + msg_len except Exception as e: print(f"Error reading message: {e}") break # Update head pointer struct.pack_into('Q', self.mmap, self.head_offset, current) return messages def __del__(self): """Cleanup when object is destroyed""" self.stop_listening() if hasattr(self, 'mmap'): self.mmap.close() # Dave's ultra-fast hotel messaging in action hotel = MemoryMappedPubSub() # Define subscriber callbacks def pool_guest(message): print(f"🏊 Pool Guest received: {message}") def restaurant_guest(message): print(f"🍽️ Restaurant Guest received: {message}") # Subscribe to channels (automatically starts background listener) hotel.subscribe("pool", pool_guest) hotel.subscribe("restaurant", restaurant_guest) # Publish messages (ultra-fast memory writes) hotel.publish("pool", "Pool closing in 10 minutes!") hotel.publish("restaurant", "Happy hour starts now!") What makes this special: অত্যন্ত দ্রুত প্রকাশ: সরাসরি মেমরি লিখে, কোন সিস্টেম কল স্বয়ংক্রিয় বার্তা রুটিং: ব্যাকগ্রাউন্ড থ্রেড বার্তা প্রক্রিয়াকরণ করে এবং সাবস্ক্রাইবকে কল করে চ্যানেলের প্রতি একাধিক সাবস্ক্রাইবার: প্রতিটি চ্যানেলের একাধিক শ্রোতা থাকতে পারে ত্রুটি সনাক্তকরণ: একটি খারাপ কলব্যাগ সিস্টেমটি বিধ্বস্ত করে না পরিষ্কার সম্পদ ব্যবস্থাপনা: সম্পন্ন হলে স্বয়ংক্রিয় পরিষ্কার ডেভ হোটেল এখন প্রতি সেকেন্ডে লক্ষ লক্ষের অতিথি বিজ্ঞপ্তিগুলি পরিচালনা করতে পারে, যখন এটি তার সিস্টেমের বিখ্যাত করার সহজ "সাবস্ক্রাইব এবং ভুলে যান" মডেল বজায় রাখে। কর্মক্ষমতা তুলনা Solution Messages/Second Use Case Complexity Basic Pub/Sub ~50K Small apps, prototypes ⭐ Simple Queued Pub/Sub ~200K Most production systems ⭐⭐ Moderate Memory-Mapped ~5M+ High-frequency trading, multi-process ⭐⭐⭐⭐⭐ Expert External (Redis) ~100K+ Distributed systems ⭐⭐⭐ Moderate Basic Pub/Sub ৫০ ক ছোট অ্যাপস, প্রোটোটাইপ সহজ Queued Pub/Sub ২০০ কেজি অধিকাংশ উৎপাদন ব্যবস্থা ⭐⭐ আধুনিক Memory-Mapped ৫ম + উচ্চ ফ্রিকোয়েন্সি ট্রেডিং, মাল্টি প্রক্রিয়া ✔ ✔ ✔ ✔ ✔ ✔ বিশেষজ্ঞ External (Redis) ১০০ কেজি বিতৃত সিস্টেম ⭐⭐⭐ আধুনিক When to use each: মৌলিক: শিক্ষা, ছোট প্রকল্প, <10K বার্তা / সেকেন্ড লাইন: অধিকাংশ বাস্তব অ্যাপ্লিকেশন, ট্র্যাফিক স্পিক ভালভাবে পরিচালনা করে মেমরি ম্যাপিং: >500K বার্তা / সেকেন্ড, ক্রস-প্রসেস যোগাযোগ, অত্যন্ত কম দেরি বহিরাগত: একাধিক সার্ভার, স্থিতিশীলতা, প্রমাণিত নির্ভরযোগ্যতা ত্রুটি প্রক্রিয়াকরণ, স্থিতিশীলতা এবং স্ক্যালিং যোগ করুন, এবং আপনি এন্টারপ্রাইজ গ্রেড মেসেজিং পাবেন। জনপ্রিয় প্যাব / সাব প্রযুক্তি Redis Pub/Sub: সহজ, দ্রুত, বাস্তব সময় বৈশিষ্ট্য জন্য মহান অ্যাপ্যাচা ক্যাফকা: এন্টারপ্রাইজ গ্রেড, বিশাল পাসপোর্ট পরিচালনা করে RabbitMQ: নমনীয় রুটিং সহ নির্ভরযোগ্য বার্তা রেখা ক্লাউড সমাধান: AWS SNS / SQS, Google Pub / Sub, Azure সার্ভিস বাস ডেভির গল্পের শেষ ছয় মাস পরে, ডেভকে "চিফ কমিউনিকেশন ইন্ডোভেশন এজেন্সি" নামে পুনর্গঠিত করা হয় কারণ অতিথি সন্তুষ্টির স্কোরগুলি ছাদের মাধ্যমে হয়। ডেভ এখনও মনে করেন যে "চ্যানেল" হোটেলের কেবল টেলিভিশন সিস্টেমের সাথে সম্পর্কিত, কিন্তু তার ইন্টারকোম সোনায় বিশ্বব্যাপী কম্পিউটার বিজ্ঞান ছাত্রদের শিক্ষা দেয় মেসেজিং প্যাটার্নগুলি কীভাবে কাজ করে। এমনকি একটি থামানো ঘড়ি দিনে দুইবার সঠিক, এবং এমনকি ডেভ ভাল স্থাপত্যে ঝাঁপিয়ে পড়তে পারে যদি তিনি প্রথমে যথেষ্ট জিনিস ভাঙেন। তাঁর নতুন উদ্যোগটি ‘ডাটা বাস’ দিয়ে চালিত ‘পরিবারের সাবমেরিন’ দিয়ে পানির নিচে পাবলিক ট্রান্সপোর্টে বিপ্লব চালাচ্ছে। And what about Dave? প্রযুক্তিগত আর্কিটেকচার সম্পর্কে জিজ্ঞাসা করা হলে, ডেভ উত্তেজিতভাবে ব্যাখ্যা করে: "প্রতিটি সাব তার অবস্থান প্রকাশ করে যাত্রীদের যারা নির্দিষ্ট রুটের জন্য সাবস্ক্রিপশন। শহরের পরিবহন বিভাগ এখনও খুঁজে বের করার চেষ্টা করছে যে তিনি একজন জিনিয়াস নাকি তাদের তার ব্যবসায়িক লাইসেন্স বাতিল করতে হবে কিনা। ডেভ তার নতুন সিস্টেমের সাফল্য দ্বারা এতটাই উৎসাহিত হয়েছিল যে তিনি একটি নতুন ব্যবসা শুরু করেছিলেন. পরেরবার আপনি আপনার স্থানীয় পানির গর্তে থাকবেন, আপনি নতুন "পুব সাব" দেখতে পাবেন - ডেভের বিপ্লবী "অসিনক্রোনিকভাবে প্রক্রিয়াকৃত শিল্পী সাবমেরিন স্যান্ডউইচগুলি এবং অ-ব্লকিং উপাদানগুলির রেখাগুলি। তিনি হোটেলের অতিথিদের কাছ থেকে ক্রিসমাস কার্ডও পেয়েছেন।