फरवरी में रेरुन की ओपन सोर्सिंग ने उन लोगों के लिए एक महत्वपूर्ण कदम उठाया जो सुलभ लेकिन शक्तिशाली पायथन विज़ुअलाइज़ेशन लाइब्रेरी की तलाश कर रहे थे।
विज़ुअलाइज़ेशन आवश्यक है क्योंकि स्केल.ई, वेट्स एंड बायसेज़ और हगिंग फेस जैसी कंपनियों ने डेटासेट लेबलिंग, प्रयोग ट्रैकिंग और पूर्व-प्रशिक्षित मॉडल को संबोधित करके गहन शिक्षा को सुव्यवस्थित किया है। हालाँकि, तेजी से डेटा कैप्चर और विज़ुअलाइज़ेशन में एक शून्य अभी भी मौजूद है।
कई कंपनियां इन-हाउस डेटा विज़ुअलाइज़ेशन समाधान विकसित करती हैं लेकिन उच्च विकास लागतों के कारण अक्सर उप-इष्टतम उपकरणों के साथ समाप्त हो जाती हैं। इसके अलावा, स्ट्रीमिंग डेटा पर पायथन विज़ुअलाइज़ेशन एक समस्या है जो या तो अच्छी तरह से हल नहीं हुई है, जिससे नोटबुक में JavaScrip टी-आधारित समाधान हो जाता है। रेरन एक उच्च प्रदर्शन वाले रस्ट विज़ुअलाइज़ेशन इंजन (बाइटवैक्स की तरह!) में एक पायथन इंटरफ़ेस का लाभ उठाता है जो स्ट्रीमिंग डेटा का विश्लेषण करना आसान बनाता है।
इस ब्लॉग पोस्ट में, हम यह पता लगाएंगे कि पायथन में रीयल-टाइम स्ट्रीमिंग डेटा की कल्पना करने के लिए बाइटवैक्स और रीरन का उपयोग कैसे करें और वास्तविक समय में विसंगति का पता लगाने वाला विज़ुअलाइज़ेशन बनाएं।
हमने एनोमली डिटेक्शन उर्फ आउटलाइयर डिटेक्शन को चुना क्योंकि यह कई अनुप्रयोगों में एक महत्वपूर्ण घटक है, जैसे साइबर सुरक्षा, धोखाधड़ी का पता लगाना और औद्योगिक प्रक्रियाओं की निगरानी। वास्तविक समय में इन विसंगतियों को देखने से संभावित मुद्दों की शीघ्रता से पहचान करने और उन्हें कम करने के लिए आवश्यक कार्रवाई करने में मदद मिल सकती है।
गोता लगाने के इच्छुक लोगों के लिए, हमारे गिटहब पर हमारे एंड-टू-एंड पायथन समाधान की जांच करें। बाइटवैक्स को तारांकित करना न भूलें!
यहाँ वह है जिसे हम कवर करेंगे:
चल दर!
यह ब्लॉग पोस्ट Bytewax और Rerun के निम्नलिखित संस्करणों पर आधारित है:
bytewax==0.15.1 rerun-sdk==0.4.0
Rerun और Bytewax के रूप में स्थापित करने योग्य हैं
pip install rerun-sdk pip install bytewax
अपडेट के लिए बाइटवैक्स का पालन करें क्योंकि हम एक नया संस्करण तैयार कर रहे हैं जो आगे पायथन में डेटा स्ट्रीमिंग ऐप्स के विकास को आसान करेगा।
समाधान अपेक्षाकृत कॉम्पैक्ट है, इसलिए हम पूरे कोड उदाहरण को यहां कॉपी करते हैं। यदि यह भारी लगता है तो कृपया इस बड़े हिस्से को छोड़ने में संकोच न करें; हम बाद में प्रत्येक समारोह पर चर्चा करेंगे।
import random # pip install rerun-sdk import rerun as rr from time import sleep from datetime import datetime from bytewax.dataflow import Dataflow from bytewax.execution import spawn_cluster from bytewax.inputs import ManualInputConfig, distribute from bytewax.outputs import ManualOutputConfig rr.init("metrics") rr.spawn() start = datetime.now() def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in this_workers_keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0) class ZTestDetector: """Anomaly detector. Use with a call to flow.stateful_map(). Looks at how many standard deviations the current item is away from the mean (Z-score) of the last 10 items. Mark as anomalous if over the threshold specified. """ def __init__(self, threshold_z): self.threshold_z = threshold_z self.last_10 = [] self.mu = None self.sigma = None def _push(self, value): self.last_10.insert(0, value) del self.last_10[10:] def _recalc_stats(self): last_len = len(self.last_10) self.mu = sum(self.last_10) / last_len sigma_sq = sum((value - self.mu) ** 2 for value in self.last_10) / last_len self.sigma = sigma_sq**0.5 def push(self, key__value__t): key, value, t = key__value__t is_anomalous = False if self.mu and self.sigma: is_anomalous = abs(value - self.mu) / self.sigma > self.threshold_z self._push(value) self._recalc_stats() rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1) return self, (value, self.mu, self.sigma, is_anomalous) def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector if __name__ == '__main__': flow = Dataflow() flow.input("input", ManualInputConfig(generate_random_metrics)) # ("metric", value) flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push) # ("metric", (value, mu, sigma, is_anomalous)) flow.capture(ManualOutputConfig(output_builder)) spawn_cluster(flow)
प्रदान किया गया कोड दर्शाता है कि बाइटवैक्स और रेरुन का उपयोग करके वास्तविक समय की विसंगति का पता लगाने वाली पाइपलाइन कैसे बनाई जाए।
आइए इस कोड के आवश्यक घटकों को तोड़ दें:
gener_random_metrics : यह फ़ंक्शन वास्तविक दुनिया डेटा स्ट्रीम का अनुकरण करने वाले यादृच्छिक मेट्रिक्स उत्पन्न करता है। यह विसंगति (दोगुने मान) होने की एक छोटी सी संभावना के साथ डेटा बिंदु उत्पन्न करता है।
ZTestDetector : यह वर्ग Z-स्कोर पद्धति का उपयोग करके एक विसंगति डिटेक्टर को लागू करता है। यह पिछले 10 मानों के माध्य और मानक विचलन को बनाए रखता है और यदि इसका Z-स्कोर निर्दिष्ट सीमा से अधिक है तो यह मान को विषम के रूप में चिह्नित करता है।
output_builder : इस फ़ंक्शन का उपयोग डेटा पाइपलाइन के आउटपुट व्यवहार को परिभाषित करने के लिए किया जाता है। इस मामले में, यह मीट्रिक नाम, मान, माध्य, मानक विचलन और क्या मान विषम है, प्रिंट करता है।
डेटा प्रवाह : कोड का मुख्य भाग बाइटवैक्स का उपयोग करके डेटा प्रवाह का निर्माण करता है, जो रैंडममैट्रिक इनपुट, जेडटेस्ट डिटेक्टर और आउटपुट बिल्डर को जोड़ता है।
रीरन विज़ुअलाइज़ेशन : रीरन विज़ुअलाइज़ेशन को ZTestDetector वर्ग में एकीकृत किया गया है। rr.log_scalar और rr.log_point फ़ंक्शंस का उपयोग डेटा पॉइंट्स और उनकी संबंधित विसंगति स्थिति को प्लॉट करने के लिए किया जाता है।
अब, कोड के मुख्य घटकों की समझ के साथ, आइए चर्चा करें कि विज़ुअलाइज़ेशन चरण दर चरण कैसे बनाया जाता है।
डेटा प्रवाह पाइपलाइन बनाने के लिए, आपको चाहिए:
flow = Dataflow()
के साथ एक नया डेटा प्रवाह प्रारंभ करें।flow.input("input", ManualInputConfig(generate_random_metrics))
उपयोग करके इनपुट स्रोत को परिभाषित करें।flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
उपयोग करके स्टेटफुल एनोमली डिटेक्टर लागू करें।flow.capture(ManualOutputConfig(output_builder))
के साथ कॉन्फ़िगर करें।spawn_cluster(flow, proc_count=3)
के साथ डेटाफ्लो निष्पादित करने के लिए स्पॉन करें।
परिणामी डेटा प्रवाह input_builder
से बेतरतीब ढंग से उत्पन्न मीट्रिक मानों को पढ़ता है, उन्हें विसंगति का पता लगाने के लिए ZTestDetector
माध्यम से पास करता है, और output_builder
फ़ंक्शन का उपयोग करके परिणामों को आउटपुट करता है। आइए प्रत्येक चरण के विवरण को स्पष्ट करें।
generate_random_metrics
फ़ंक्शन generate_random_metrics
फ़ंक्शन डेटा प्रवाह पाइपलाइन के लिए एक वैकल्पिक इनपुट स्रोत के रूप में कार्य करता है, जो कई श्रमिकों में वितरित तरीके से यादृच्छिक मीट्रिक मान उत्पन्न करता है। यह तीन मापदंडों को स्वीकार करता है: worker_index
, worker_count
और resume_state
।
def generate_random_metrics(worker_index, worker_count, resume_state): assert resume_state is None keys = ["1", "2", "3", "4", "5", "6"] this_workers_keys = distribute(keys, worker_index, worker_count) for _ in range(1000): for key in keys: value = random.randrange(0, 10) if random.random() > 0.9: value *= 2.0 yield None, (key, (key, value, (datetime.now() - start).total_seconds())) sleep(random.random() / 10.0)
worker_index
: डेटाफ्लो पाइपलाइन में वर्तमान वर्कर का इंडेक्स।
worker_count
: डेटा प्रवाह पाइपलाइन में श्रमिकों की कुल संख्या।
resume_state
: इनपुट स्रोत की वह स्थिति जिससे फिर से शुरू करना है। इस मामले में, इसे None
होने का दावा किया जाता है, यह दर्शाता है कि इनपुट स्रोत पिछली स्थिति से फिर से शुरू करने का समर्थन नहीं करता है।
यहां generate_random_metrics
फ़ंक्शन का चरण-दर-चरण विवरण दिया गया है:
resume_state
None
है।0 और 10 के बीच एक यादृच्छिक मान उत्पन्न करें।
10% संभावना के साथ, विसंगति का अनुकरण करने के लिए मान को दोगुना करें।
कोई नहीं (किसी विशिष्ट विभाजन कुंजी को इंगित करने के लिए), कुंजी, उत्पन्न मान, और शुरुआती समय से बीता हुआ समय (कोड स्निपेट में प्रदान नहीं किया गया) युक्त टपल प्राप्त करें।
रीयल-टाइम डेटा जनरेशन को अनुकरण करने के लिए प्रत्येक जेनरेट किए गए मान के बीच स्लीप टाइम का परिचय दें।
generate_random_metrics
फ़ंक्शन का उपयोग डेटाफ़्लो में कोड की निम्न पंक्ति के साथ इनपुट स्रोत के रूप में किया जाता है:
flow.input("input", ManualInputConfig(generate_random_metrics))
यह रेखा डेटा प्रवाह को पाइपलाइन के लिए इनपुट डेटा उत्पन्न करने के लिए RandomMetricInput
वर्ग का उपयोग करने के लिए कहती है।
ZTestDetector
वर्ग ZTestDetector
वर्ग एक विसंगति डिटेक्टर है जो यह पहचानने के लिए Z-स्कोर विधि का उपयोग करता है कि कोई डेटा बिंदु विषम है या नहीं। जेड-स्कोर मानक विचलन की संख्या है जो डेटा बिंदु डेटासेट के माध्यम से होता है। यदि किसी डेटा बिंदु का Z-स्कोर निर्दिष्ट सीमा से अधिक है, तो इसे विषम माना जाता है।
कक्षा में निम्नलिखित विधियाँ हैं:
__init__(self, threshold_z)
: कंस्ट्रक्टर ZTestDetector को थ्रेशोल्ड Z-स्कोर वैल्यू के साथ इनिशियलाइज़ करता है। यह अंतिम 10 मानों की सूची (self.last_10), माध्य (self.mu), और मानक विचलन (self.sigma) को भी प्रारंभ करता है।
_push(self, value)
: इस निजी पद्धति का उपयोग पिछले 10 मानों की सूची को नए मान के साथ अद्यतन करने के लिए किया जाता है। यह सूची की शुरुआत में नया मान सम्मिलित करता है और सबसे पुराना मान हटाता है, सूची की लंबाई 10 पर बनाए रखता है।
_recalc_stats(self)
: यह निजी पद्धति self.last_10 सूची में वर्तमान मानों के आधार पर माध्य और मानक विचलन की पुनर्गणना करती है।
push(self, key__value__t)
: यह सार्वजनिक विधि इनपुट के रूप में एक कुंजी, एक मान और एक टाइमस्टैम्प युक्त टपल लेती है। यह मान के लिए Z-स्कोर की गणना करता है, अंतिम 10 मानों की सूची को अपडेट करता है, और माध्य और मानक विचलन की पुनर्गणना करता है। यह Rerun के विज़ुअलाइज़ेशन फ़ंक्शंस का उपयोग करके डेटा बिंदु और इसकी विसंगति स्थिति को भी लॉग करता है। अंत में, यह ZTestDetector वर्ग का अद्यतन उदाहरण और मूल्य, माध्य, मानक विचलन और विसंगति की स्थिति वाला एक टपल लौटाता है।
ZTestDetector वर्ग का उपयोग डेटाफ्लो पाइपलाइन में निम्नलिखित कोड के साथ स्टेटफुल मैप के रूप में किया जाता है:
flow.stateful_map("AnomalyDetector", lambda: ZTestDetector(2.0), ZTestDetector.push)
यह रेखा डेटा प्रवाह को ZTestDetector
2.0
के Z-स्कोर थ्रेशोल्ड के साथ लागू करने और डेटा बिंदुओं को संसाधित करने के लिए push
विधि का उपयोग करने के लिए कहती है।
विसंगतियों की कल्पना करने के लिए, ZTestDetector
वर्ग डेटा बिंदुओं और उनकी संगत विसंगति स्थिति को रेरन के विज़ुअलाइज़ेशन फ़ंक्शंस का उपयोग करके लॉग करता है। विशेष रूप से, rr.log_scalar
उपयोग स्केलर मान को प्लॉट करने के लिए किया जाता है, जबकि rr.log_point
उपयोग 3D बिंदुओं को प्लॉट करने के लिए किया जाता है।
निम्न कोड स्निपेट दिखाता है कि विज़ुअलाइज़ेशन कैसे बनाया जाता है:
rr.log_scalar(f"temp_{key}/data", value, color=[155, 155, 155]) if is_anomalous: rr.log_point(f"3dpoint/anomaly/{key}", [t, value, float(key) * 10], radius=0.3, color=[255,100,100]) rr.log_scalar( f"temp_{key}/data/anomaly", value, scattered=True, radius=3.0, color=[255, 100, 100], ) else: rr.log_point(f"3dpoint/data/{key}", [t, value, float(key) * 10], radius=0.1)
यहां, हम पहले मीट्रिक का प्रतिनिधित्व करने वाले स्केलर मान को लॉग करते हैं। फिर, इस पर निर्भर करते हुए कि क्या मान विषम है, हम एक अलग त्रिज्या और रंग के साथ एक 3D बिंदु लॉग करते हैं। विषम बिंदुओं को एक बड़े त्रिज्या के साथ लाल रंग में लॉग किया जाता है, जबकि गैर-विषम बिंदुओं को छोटे त्रिज्या के साथ लॉग किया जाता है।
output_builder
फ़ंक्शन output_builder
फ़ंक्शन का उपयोग डेटा पाइपलाइन के आउटपुट व्यवहार को परिभाषित करने के लिए किया जाता है। इस विशिष्ट उदाहरण में, यह मीट्रिक नाम, मान, माध्य, मानक विचलन, और क्या मान असंगत है, को प्रिंट करने के लिए ज़िम्मेदार है।
फ़ंक्शन दो तर्क लेता है: worker_index
और worker_count
। ये तर्क फ़ंक्शन को कार्यकर्ता के सूचकांक और डेटा प्रवाह पाइपलाइन में श्रमिकों की कुल संख्या को समझने में सहायता करते हैं।
यहाँ output_builder
फ़ंक्शन की परिभाषा दी गई है :
def output_builder(worker_index, worker_count): def inspector(input): metric, (value, mu, sigma, is_anomalous) = input print( f"{metric}: " f"value = {value}, " f"mu = {mu:.2f}, " f"sigma = {sigma:.2f}, " f"{is_anomalous}" ) return inspector
यह फ़ंक्शन एक उच्च-क्रम फ़ंक्शन है, जिसका अर्थ है कि यह inspector
नामक एक अन्य फ़ंक्शन देता है। inspector
फ़ंक्शन इनपुट डेटा टपल को संसाधित करने और वांछित आउटपुट को प्रिंट करने के लिए ज़िम्मेदार है।
आउटपुट बिल्डर फ़ंक्शन का उपयोग बाद में आउटपुट व्यवहार को कॉन्फ़िगर करते समय डेटाफ़्लो पाइपलाइन में किया जाता है
flow.capture(ManualOutputConfig(output_builder)).
बाइटवैक्स एकल प्रक्रिया या बहु-प्रक्रिया के रूप में चल सकता है। यह डेटा प्रवाह कई प्रक्रियाओं में स्केल करने के लिए बनाया गया है, लेकिन हम इसे spawn_cluster
निष्पादन मॉड्यूल के साथ एकल प्रक्रिया के रूप में चलाना शुरू कर देंगे।
spawn_cluster(flow)
यदि हम समानता को बढ़ाना चाहते हैं, तो हम केवल तर्कों के रूप में और प्रक्रियाएँ जोड़ेंगे।
उदाहरण के लिए - spawn_cluster(flow, proc_count=3)
।
प्रदान किए गए कोड को चलाने के लिए हम इसे केवल पायथन स्क्रिप्ट के रूप में चला सकते हैं, लेकिन पहले हमें निर्भरताओं को स्थापित करने की आवश्यकता है।
उसी निर्देशिका में dataflow.py के रूप में एक नई फ़ाइल बनाएँ और इसे आवश्यकताएँ.txt नाम दें।
निम्नलिखित सामग्री को आवश्यकताएँ। txt फ़ाइल में जोड़ें:
bytewax==0.15.1 rerun-sdk==0.4.0
आवश्यकताएँ.txt और dataflow.py फ़ाइलों वाली निर्देशिका में एक टर्मिनल खोलें।
निम्नलिखित कमांड का उपयोग करके निर्भरताएँ स्थापित करें:
pip install -r requirements.txt
और डेटाफ्लो चलाएं!
python dataflow.py
जबकि प्रदान किया गया कोड रीयल-टाइम विसंगति पहचान के मूल उदाहरण के रूप में कार्य करता है, आप अधिक जटिल परिदृश्यों को समायोजित करने के लिए इस पाइपलाइन का विस्तार कर सकते हैं।
उदाहरण के लिए:
वास्तविक दुनिया के डेटा स्रोतों को शामिल करें : RandomMetricInput वर्ग को एक कस्टम वर्ग से बदलें जो वास्तविक दुनिया के स्रोत से डेटा पढ़ता है, जैसे कि IoT सेंसर, लॉग फ़ाइलें या स्ट्रीमिंग API।
अधिक परिष्कृत विसंगति का पता लगाने वाली तकनीकों को लागू करें : आप ZTestDetector वर्ग को अन्य स्टेटफुल विसंगति का पता लगाने के तरीकों से बदल सकते हैं, जैसे मूविंग एवरेज, एक्सपोनेंशियल स्मूथिंग या मशीन लर्निंग-आधारित दृष्टिकोण।
विज़ुअलाइज़ेशन को अनुकूलित करें : अधिक डेटा आयाम जोड़कर, रंग योजनाओं को समायोजित करके, या अपनी आवश्यकताओं के अनुरूप प्लॉट शैलियों को संशोधित करके रीरन विज़ुअलाइज़ेशन को बेहतर बनाएं।
चेतावनी और निगरानी प्रणालियों के साथ एकीकृत करें : विसंगति के परिणामों को केवल प्रिंट करने के बजाय, आप किसी विसंगति का पता चलने पर उपयुक्त हितधारकों को सूचित करने के लिए चेतावनी या निगरानी प्रणाली के साथ पाइपलाइन को एकीकृत कर सकते हैं।
डेटा प्रवाह पाइपलाइन को अनुकूलित और विस्तारित करके, आप अपने विशिष्ट उपयोग मामले के अनुरूप एक शक्तिशाली रीयल-टाइम विसंगति पहचान और विज़ुअलाइज़ेशन समाधान बना सकते हैं। Bytewax और Rerun का संयोजन वास्तविक समय डेटा प्रोसेसिंग और विज़ुअलाइज़ेशन सिस्टम के निर्माण के लिए एक बहुमुखी और स्केलेबल नींव प्रदान करता है।
इस ब्लॉग पोस्ट ने प्रदर्शित किया है कि रीयल-टाइम विसंगति पहचान विज़ुअलाइज़ेशन बनाने के लिए बाइटवैक्स और रीरन का उपयोग कैसे करें। Bytewax के साथ एक डेटाफ्लो पाइपलाइन का निर्माण करके और Rerun की शक्तिशाली विज़ुअलाइज़ेशन क्षमताओं को एकीकृत करके, हम अपने डेटा में होने वाली विसंगतियों की निगरानी और पहचान कर सकते हैं।
मूल रूप से ज़ेंडर मैथेसन द्वारा यहाँ लिखा गया है।