इस लेख में मैं स्वेट्रिक्स नामक एक वेब एनालिटिक्स प्लेटफॉर्म के लिए एआई सेवा विकसित करने के अपने अनुभव को साझा करना चाहता हूं।
मेरा उद्देश्य एक मशीन लर्निंग मॉडल विकसित करना था जो निम्नलिखित स्क्रीनशॉट पर प्रदर्शित डेटा के आधार पर भविष्य की वेबसाइट ट्रैफ़िक की भविष्यवाणी कर सके
अंतिम लक्ष्य यह है कि ग्राहकों को यह स्पष्ट रूप से बताया जा सके कि भविष्य में उनकी वेबसाइट पर किस प्रकार का ट्रैफिक आएगा, जिससे उन्हें बेहतर जानकारी मिल सके और सामान्य रूप से व्यवसाय नियोजन में सुधार हो सके।
योजना के दौरान एआई और एपीआई सेवाओं के बीच संचार के लिए रैबिटएमक्यू मैसेज ब्रोकर के साथ माइक्रोसर्विस आर्किटेक्चर के साथ आगे बढ़ने का निर्णय लिया गया।
सबसे पहले, हमें हर घंटे क्रॉन टास्क के साथ डेटा को एक अलग डेटाबेस में इकट्ठा करना होगा। हमने ClickHouse को चुनने का फैसला किया, क्योंकि स्वेट्रिक्स पर वेबसाइटों से मूल डेटा इसमें संग्रहीत है। प्रारूप के बारे में विवरण अगले अनुभागों के दौरान कवर किया जाएगा।
रैबिटएमक्यू को इसकी सरलता के कारण संदेश ब्रोकर के रूप में चुना गया था और हमें एआई और एपीआई सेवाओं के बीच संचार स्थापित करने की आवश्यकता है। आइए सब कुछ तोड़ें और मुख्य तर्क की जाँच करें
स्वेट्रिक्स-एआई सेवा बैकएंड पक्ष के लिए नेस्टजेएस फ्रेमवर्क और डेटा प्री-प्रोसेसिंग और मॉडल भविष्यवाणियों के लिए पायथन स्क्रिप्ट का उपयोग करेगी।
हम परियोजनाओं के बारे में निम्नलिखित डेटा को एक analytics
तालिका में एकत्रित करते हैं। आप इस डेटा का प्रस्तुत संस्करण पहले ही लेख के प्रथम भाग में देख चुके हैं।
मैं निम्नलिखित क्वेरी के साथ यह (लगभग स्वीकार्य) परिणाम प्राप्त करने में सक्षम था:
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
यह फ़ंक्शन क्रॉन जॉब का उपयोग करके हर घंटे चलने के लिए शेड्यूल किया गया है। यह एनालिटिक्स डेटा को इकट्ठा करता है और उसे क्लिकहाउस analytics.hourly_projects_data
में डालता है।
उत्पादन
ClickHouse की सीमाओं के कारण मैं डेटा का वांछित प्रारूप प्राप्त करने में सक्षम नहीं था। इसलिए मैंने मॉडल के प्रशिक्षण के लिए आवश्यक प्रीप्रोसेसिंग को पूरा करने के लिए pandas
उपयोग करने का निर्णय लिया।
विशेष रूप से मैंने निम्नलिखित कार्य करने के लिए पायथन का उपयोग किया:
एक श्रेणी से संबंधित कुंजियों और मानों को एक JSON फ़ील्ड में संयोजित करें, उदाहरण के लिए डिवाइसों की कुंजियों और मानों को एक ऑब्जेक्ट में संयोजित करें।
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
में:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
कोड और आउटपुट संलग्न कर रहा हूँ:
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
मैं कहूंगा कि डेटा का यह प्रारूप भविष्यवाणी के लिए उपयोग नहीं किया जाएगा, बल्कि इसका उपयोग डेटाबेस में भंडारण और डिबगिंग उद्देश्यों के लिए किया जाएगा ताकि यह सत्यापित किया जा सके कि कोई मूल्य गायब नहीं है और इसके अलावा, यह दोबारा जांचने के लिए कि मॉडल सटीक परिणाम देता है।
एक उपयुक्त मॉडल को प्रशिक्षित करने के लिए मैंने विभिन्न श्रेणियों के लिए अन्य समूहों को परिभाषित करने का निर्णय लिया। जिसका अर्थ है कि यदि वैश्विक स्तर पर किसी विशिष्ट श्रेणी में किसी समूह के उदाहरणों की संख्या एक निश्चित प्रतिशत (%) से कम है, तो उसे अन्य के हिस्से के रूप में जोड़ा जाएगा।
उदाहरण के लिए, os
श्रेणी में हमारे पास है:
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
चूंकि इस मामले में लिनक्स और टेम्पलओएस दोनों अत्यंत दुर्लभ हैं, इसलिए उन्हें अन्य समूह में जोड़ा जाएगा, इसलिए अंतिम परिणाम होगा:
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
और "दुर्लभता" को श्रेणी के आधार पर और इस श्रेणी के लिए निर्धारित सीमा के आधार पर अलग-अलग तरीके से निर्धारित किया जाता है।
इसे ग्राहक की प्राथमिकताओं और वांछित डेटा के आधार पर कॉन्फ़िगर किया जा सकता है
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
इस लक्ष्य को प्राप्त करने के लिए 2 कार्य कार्यान्वित किए गए
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
उत्पादन
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
मशीन लर्निंग मॉडल के साथ काम करते समय, यह महत्वपूर्ण है कि इनपुट डेटा ऐसे प्रारूप में हो जिसे मॉडल समझ सके। मशीन लर्निंग मॉडल को आमतौर पर JSON जैसी जटिल डेटा संरचनाओं के बजाय संख्यात्मक मानों (पूर्णांक, फ़्लोट) की आवश्यकता होती है।
इसलिए, पुनः, इस आवश्यकता को पूरा करने के लिए हमारे डेटा का थोड़ा और पूर्व-प्रसंस्करण करना बेहतर है।
मैंने एक फ़ंक्शन create_exploded_df
बनाया है जहाँ प्रत्येक विशेषता को एक अलग कॉलम के रूप में दर्शाया गया है, और पंक्तियों में संबंधित संख्यात्मक मान शामिल हैं। (यह अभी तक आदर्श नहीं है, लेकिन यह सबसे अच्छा समाधान था जो मैं बनाने में सक्षम था)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 घंटे भरें
हमारे पास मौजूद डेटा के प्रारूप के साथ एक और समस्या यह है कि यदि किसी विशिष्ट घंटे में किसी प्रोजेक्ट के लिए कोई ट्रैफ़िक नहीं था, तो रिक्त पंक्ति बनाने के बजाय, कोई पंक्ति ही नहीं होगी, जो इस तथ्य को देखते हुए असुविधाजनक है कि मॉडल को आगामी समय सीमा (उदाहरण के लिए, अगले घंटे) के लिए डेटा की भविष्यवाणी करने के लिए डिज़ाइन किया गया है। हालाँकि, यदि प्रारंभिक समय सीमा के लिए कोई डेटा उपलब्ध नहीं है, तो मॉडल को पूर्वानुमान लगाने के लिए प्रशिक्षित करना संभव नहीं है।
इसलिए मैंने एक स्क्रिप्ट लिखी जो गायब घंटों को ढूंढ लेगी और जब कोई घंटा छूट जाए तो रिक्त पंक्तियाँ डाल देगी
मॉडल प्रशिक्षण के संबंध में, प्राथमिक दृष्टिकोण मॉडल के लिए लक्ष्य के रूप में पिछले घंटे के डेटा का उपयोग करना था। इससे मॉडल को वर्तमान डेटा के आधार पर भविष्य के ट्रैफ़िक की भविष्यवाणी करने की अनुमति मिलती है।
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
उत्पादन
statisticsGathered
इस दृष्टिकोण का मुख्य कारण यह है कि statisticsGathered
एक datetime
ऑब्जेक्ट था, जिसे मैंने जिन मॉडलों का उपयोग करने की कोशिश की है (बाद के अनुभागों की जांच करें) वे इसे संसाधित करने और सही पैटर्न की पहचान करने में सक्षम नहीं हैं।
इसका परिणाम भयानक MSE/MRSE
मेट्रिक्स में हुआ। इसलिए विकास के दौरान day
, month
और hour
के लिए अलग-अलग सुविधाओं का निर्णय लिया गया, जिससे परिणाम काफी बेहतर हुए।
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
और बस इतना ही! चलिए सीधे प्रशिक्षण पर चलते हैं! 🎉🎉🎉
खैर, मेरा अनुमान है कि इस एप्लिकेशन के निर्माण के दौरान वास्तविक भविष्यवाणी सबसे चुनौतीपूर्ण हिस्सा थी।
पहली चीज़ जो मैं आज़माना चाहता था वह है LinearRegression
मॉडल का उपयोग करना:
मैंने निम्नलिखित कार्य कार्यान्वित किये:
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
प्रत्येक लक्ष्य कॉलम के लिए, हम डेटा को प्रशिक्षण और परीक्षण सेट में विभाजित करते हैं। फिर हम प्रशिक्षण डेटा पर एक LinearRegression
मॉडल को प्रशिक्षित करते हैं और परीक्षण डेटा पर पूर्वानुमान लगाते हैं।
यह मूल्यांकन करने के लिए कि परिणाम सही हैं, मैंने वह फ़ंक्शन जोड़ा जो आवश्यक मेट्रिक्स एकत्र करता है और आउटपुट तैयार करता है
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
मैंने एक स्क्रिप्ट लिखी जो आउटपुट उत्पन्न करती है और इसे एक्सेल फ़ाइल में सहेजती है, mse
, rmse
, mae
और mean_y
मानों को ध्यान में रखती है
जैसा कि आप देख सकते हैं, मेट्रिक्स संतोषजनक नहीं हैं और अनुमानित यातायात डेटा सटीक नहीं होगा और यातायात पूर्वानुमान के मेरे लक्ष्यों के लिए उपयुक्त नहीं होगा।
इसलिए, मैंने प्रति घंटे आगंतुकों की कुल संख्या की भविष्यवाणी करने का निर्णय लिया, ताकि निम्नलिखित कार्य बनाए जा सकें
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
यह फ़ंक्शन एक विशिष्ट श्रेणी लेता है और उसके आधार पर कुल आगंतुकों की गणना करता है। यह इसलिए काम करता है क्योंकि डिवाइस मानों की कुल संख्या OS मानों की कुल संख्या के समान होगी।
इस दृष्टिकोण से, मॉडल ने पहले की तुलना में 10 गुना बेहतर परिणाम दिखाए ।
अगर हम इस मामले की बात करें तो यह लगभग स्वीकार्य और उपयोग के लिए तैयार सुविधा है। ग्राहक अब इन पूर्वानुमानों के परिणाम के आधार पर अपने बजट आवंटन और सर्वर स्केलिंग की योजना बना सकते हैं
पूर्वानुमान वास्तविक मूल्यों से लगभग 2.45 विज़िटर तक विचलित होते हैं (चूंकि RMSE = √MSE ) । जिसका विपणन आवश्यकताओं पर कोई नकारात्मक महत्वपूर्ण प्रभाव नहीं हो सकता है।
चूंकि यह लेख काफी विस्तृत हो चुका है और ऐप अभी भी विकास के अधीन है, इसलिए हम यहां रुकेंगे। हम आगे बढ़ते हुए इस दृष्टिकोण को और बेहतर बनाते रहेंगे और मैं आपको अपडेट करता रहूंगा!
पढ़ने और ध्यान देने के लिए धन्यवाद! मैं टिप्पणी अनुभाग में आपकी प्रतिक्रिया और विचार सुनने के लिए उत्सुक हूँ। मुझे उम्मीद है कि यह जानकारी आपके उद्देश्यों के लिए उपयोगी साबित होगी!
और सौभाग्य!