इस लेख में मैं स्वेट्रिक्स नामक एक वेब एनालिटिक्स प्लेटफॉर्म के लिए एआई सेवा विकसित करने के अपने अनुभव को साझा करना चाहता हूं। मेरा उद्देश्य एक मशीन लर्निंग मॉडल विकसित करना था जो निम्नलिखित स्क्रीनशॉट पर प्रदर्शित डेटा के आधार पर भविष्य की वेबसाइट ट्रैफ़िक की भविष्यवाणी कर सके अंतिम लक्ष्य यह है कि ग्राहकों को यह स्पष्ट रूप से बताया जा सके कि भविष्य में उनकी वेबसाइट पर किस प्रकार का ट्रैफिक आएगा, जिससे उन्हें बेहतर जानकारी मिल सके और सामान्य रूप से व्यवसाय नियोजन में सुधार हो सके। 2. आवश्यकताएँ और वास्तुकला योजना के दौरान एआई और एपीआई सेवाओं के बीच संचार के लिए रैबिटएमक्यू मैसेज ब्रोकर के साथ माइक्रोसर्विस आर्किटेक्चर के साथ आगे बढ़ने का निर्णय लिया गया। सबसे पहले, हमें हर घंटे क्रॉन टास्क के साथ डेटा को एक अलग डेटाबेस में इकट्ठा करना होगा। हमने ClickHouse को चुनने का फैसला किया, क्योंकि स्वेट्रिक्स पर वेबसाइटों से मूल डेटा इसमें संग्रहीत है। प्रारूप के बारे में विवरण अगले अनुभागों के दौरान कवर किया जाएगा। रैबिटएमक्यू को इसकी सरलता के कारण संदेश ब्रोकर के रूप में चुना गया था और हमें एआई और एपीआई सेवाओं के बीच संचार स्थापित करने की आवश्यकता है। आइए सब कुछ तोड़ें और मुख्य तर्क की जाँच करें स्वेट्रिक्स-एपीआई सेवा: क्रॉन टास्क के माध्यम से प्रति घंटे डेटा आंकड़े एकत्र करता है और एआई सेवा को कच्चा डेटा भेजता है। ClickHouse से पूर्व-संसाधित डेटा सम्मिलित करता है और प्राप्त करता है। स्वेट्रिक्स-एआई सेवा: पूर्वानुमान के लिए कच्चे डेटा और चयनित प्राथमिकताओं (अंतराल और उपश्रेणी) को संसाधित करता है। पूर्वानुमान डेटा को JSON प्रारूप में परिवर्तित करता है और इसे RabbitMQ के माध्यम से API सेवा पर वापस भेजता है। स्वेट्रिक्स-एआई सेवा बैकएंड पक्ष के लिए नेस्टजेएस फ्रेमवर्क और डेटा प्री-प्रोसेसिंग और मॉडल भविष्यवाणियों के लिए पायथन स्क्रिप्ट का उपयोग करेगी। 3. पूर्व प्रसंस्करण हम परियोजनाओं के बारे में निम्नलिखित डेटा को एक तालिका में एकत्रित करते हैं। आप इस डेटा का प्रस्तुत संस्करण पहले ही लेख के प्रथम भाग में देख चुके हैं। analytics मैं निम्नलिखित क्वेरी के साथ यह (लगभग स्वीकार्य) परिणाम प्राप्त करने में सक्षम था: @Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, ) यह फ़ंक्शन क्रॉन जॉब का उपयोग करके हर घंटे चलने के लिए शेड्यूल किया गया है। यह एनालिटिक्स डेटा को इकट्ठा करता है और उसे क्लिकहाउस में डालता है। analytics.hourly_projects_data उत्पादन ClickHouse की सीमाओं के कारण मैं डेटा का वांछित प्रारूप प्राप्त करने में सक्षम नहीं था। इसलिए मैंने मॉडल के प्रशिक्षण के लिए आवश्यक प्रीप्रोसेसिंग को पूरा करने के लिए उपयोग करने का निर्णय लिया। pandas विशेष रूप से मैंने निम्नलिखित कार्य करने के लिए पायथन का उपयोग किया: 3.1 कुंजियाँ और मान संयोजित करें एक श्रेणी से संबंधित कुंजियों और मानों को एक JSON फ़ील्ड में संयोजित करें, उदाहरण के लिए डिवाइसों की कुंजियों और मानों को एक ऑब्जेक्ट में संयोजित करें। os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5} में: os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5} कोड और आउटपुट संलग्न कर रहा हूँ: def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result) मैं कहूंगा कि डेटा का यह प्रारूप भविष्यवाणी के लिए उपयोग नहीं किया जाएगा, बल्कि इसका उपयोग डेटाबेस में भंडारण और डिबगिंग उद्देश्यों के लिए किया जाएगा ताकि यह सत्यापित किया जा सके कि कोई मूल्य गायब नहीं है और इसके अलावा, यह दोबारा जांचने के लिए कि मॉडल सटीक परिणाम देता है। उत्पादन 3.2 कुंजियाँ और मान संयोजित करें एक उपयुक्त मॉडल को प्रशिक्षित करने के लिए मैंने विभिन्न श्रेणियों के लिए अन्य समूहों को परिभाषित करने का निर्णय लिया। जिसका अर्थ है कि यदि वैश्विक स्तर पर किसी विशिष्ट श्रेणी में किसी समूह के उदाहरणों की संख्या एक निश्चित प्रतिशत (%) से कम है, तो उसे अन्य के हिस्से के रूप में जोड़ा जाएगा। उदाहरण के लिए, श्रेणी में हमारे पास है: os {“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10} चूंकि इस मामले में लिनक्स और टेम्पलओएस दोनों अत्यंत दुर्लभ हैं, इसलिए उन्हें में जोड़ा जाएगा, इसलिए अंतिम परिणाम होगा: अन्य समूह {“MacOS”: 300, “Windows”: 400, “other”: 33}. और "दुर्लभता" को श्रेणी के आधार पर और इस श्रेणी के लिए निर्धारित सीमा के आधार पर अलग-अलग तरीके से निर्धारित किया जाता है। इसे ग्राहक की प्राथमिकताओं और वांछित डेटा के आधार पर कॉन्फ़िगर किया जा सकता है other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 } इस लक्ष्य को प्राप्त करने के लिए 2 कार्य कार्यान्वित किए गए def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values उत्पादन ['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other'] मशीन लर्निंग मॉडल के साथ काम करते समय, यह महत्वपूर्ण है कि इनपुट डेटा ऐसे प्रारूप में हो जिसे मॉडल समझ सके। मशीन लर्निंग मॉडल को आमतौर पर JSON जैसी जटिल डेटा संरचनाओं के बजाय संख्यात्मक मानों (पूर्णांक, फ़्लोट) की आवश्यकता होती है। इसलिए, पुनः, इस आवश्यकता को पूरा करने के लिए हमारे डेटा का थोड़ा और पूर्व-प्रसंस्करण करना बेहतर है। मैंने एक फ़ंक्शन बनाया है जहाँ प्रत्येक विशेषता को एक अलग कॉलम के रूप में दर्शाया गया है, और पंक्तियों में संबंधित संख्यात्मक मान शामिल हैं। (यह अभी तक आदर्श नहीं है, लेकिन यह सबसे अच्छा समाधान था जो मैं बनाने में सक्षम था) create_exploded_df def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df उत्पादन 3.3 घंटे भरें हमारे पास मौजूद डेटा के प्रारूप के साथ एक और समस्या यह है कि यदि किसी विशिष्ट घंटे में किसी प्रोजेक्ट के लिए कोई ट्रैफ़िक नहीं था, तो रिक्त पंक्ति बनाने के बजाय, कोई पंक्ति ही नहीं होगी, जो इस तथ्य को देखते हुए असुविधाजनक है कि मॉडल को आगामी समय सीमा (उदाहरण के लिए, अगले घंटे) के लिए डेटा की भविष्यवाणी करने के लिए डिज़ाइन किया गया है। हालाँकि, यदि प्रारंभिक समय सीमा के लिए कोई डेटा उपलब्ध नहीं है, तो मॉडल को पूर्वानुमान लगाने के लिए प्रशिक्षित करना संभव नहीं है। इसलिए मैंने एक स्क्रिप्ट लिखी जो गायब घंटों को ढूंढ लेगी और जब कोई घंटा छूट जाए तो रिक्त पंक्तियाँ डाल देगी 3.4 लक्ष्य कॉलम जोड़ें और स्थानांतरित करें मॉडल प्रशिक्षण के संबंध में, प्राथमिक दृष्टिकोण मॉडल के लिए लक्ष्य के रूप में पिछले घंटे के डेटा का उपयोग करना था। इससे मॉडल को वर्तमान डेटा के आधार पर भविष्य के ट्रैफ़िक की भविष्यवाणी करने की अनुमति मिलती है। def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df) उत्पादन 3.5 विभाजित आँकड़ेअलग-अलग कॉलम में statisticsGathered इस दृष्टिकोण का मुख्य कारण यह है कि एक ऑब्जेक्ट था, जिसे मैंने जिन मॉडलों का उपयोग करने की कोशिश की है (बाद के अनुभागों की जांच करें) वे इसे संसाधित करने और सही पैटर्न की पहचान करने में सक्षम नहीं हैं। statisticsGathered datetime इसका परिणाम भयानक मेट्रिक्स में हुआ। इसलिए विकास के दौरान , और के लिए अलग-अलग सुविधाओं का निर्णय लिया गया, जिससे परिणाम काफी बेहतर हुए। MSE/MRSE day month hour def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df उत्पादन और बस इतना ही! चलिए सीधे प्रशिक्षण पर चलते हैं! 🎉🎉🎉 4. रैखिक प्रतिगमन खैर, मेरा अनुमान है कि इस एप्लिकेशन के निर्माण के दौरान वास्तविक भविष्यवाणी सबसे चुनौतीपूर्ण हिस्सा थी। पहली चीज़ जो मैं आज़माना चाहता था वह है मॉडल का उपयोग करना: LinearRegression मैंने निम्नलिखित कार्य कार्यान्वित किये: def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data स्पष्टीकरण प्रत्येक लक्ष्य कॉलम के लिए, हम डेटा को प्रशिक्षण और परीक्षण सेट में विभाजित करते हैं। फिर हम प्रशिक्षण डेटा पर एक मॉडल को प्रशिक्षित करते हैं और परीक्षण डेटा पर पूर्वानुमान लगाते हैं। LinearRegression यह मूल्यांकन करने के लिए कि परिणाम सही हैं, मैंने वह फ़ंक्शन जोड़ा जो आवश्यक मेट्रिक्स एकत्र करता है और आउटपुट तैयार करता है def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation) उत्पादन मैंने एक स्क्रिप्ट लिखी जो आउटपुट उत्पन्न करती है और इसे एक्सेल फ़ाइल में सहेजती है, , , और मानों को ध्यान में रखती है mse rmse mae mean_y जैसा कि आप देख सकते हैं, मेट्रिक्स संतोषजनक नहीं हैं और अनुमानित यातायात डेटा सटीक नहीं होगा और यातायात पूर्वानुमान के मेरे लक्ष्यों के लिए उपयुक्त नहीं होगा। इसलिए, मैंने प्रति घंटे आगंतुकों की कुल संख्या की भविष्यवाणी करने का निर्णय लिया, ताकि निम्नलिखित कार्य बनाए जा सकें def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']] उत्पादन यह फ़ंक्शन एक विशिष्ट श्रेणी लेता है और उसके आधार पर कुल आगंतुकों की गणना करता है। यह इसलिए काम करता है क्योंकि डिवाइस मानों की कुल संख्या OS मानों की कुल संख्या के समान होगी। । इस दृष्टिकोण से, मॉडल ने पहले की तुलना में 10 गुना बेहतर परिणाम दिखाए 5। उपसंहार अगर हम इस मामले की बात करें तो यह लगभग स्वीकार्य और उपयोग के लिए तैयार सुविधा है। ग्राहक अब इन पूर्वानुमानों के परिणाम के आधार पर अपने बजट आवंटन और सर्वर स्केलिंग की योजना बना सकते हैं पूर्वानुमान । वास्तविक मूल्यों से लगभग 2.45 विज़िटर तक विचलित होते हैं (चूंकि RMSE = √MSE ) जिसका विपणन आवश्यकताओं पर कोई नकारात्मक महत्वपूर्ण प्रभाव नहीं हो सकता है। चूंकि यह लेख काफी विस्तृत हो चुका है और ऐप अभी भी विकास के अधीन है, इसलिए हम यहां रुकेंगे। हम आगे बढ़ते हुए इस दृष्टिकोण को और बेहतर बनाते रहेंगे और मैं आपको अपडेट करता रहूंगा! पढ़ने और ध्यान देने के लिए धन्यवाद! मैं टिप्पणी अनुभाग में आपकी प्रतिक्रिया और विचार सुनने के लिए उत्सुक हूँ। मुझे उम्मीद है कि यह जानकारी आपके उद्देश्यों के लिए उपयोगी साबित होगी! और सौभाग्य!