এই নিবন্ধে আমি Swetrix নামে একটি ওয়েব অ্যানালিটিক্স প্ল্যাটফর্মের জন্য একটি AI পরিষেবা তৈরি করার বিষয়ে আমার অভিজ্ঞতা শেয়ার করতে চাই৷
আমার লক্ষ্য ছিল একটি মেশিন লার্নিং মডেল তৈরি করা যা নিম্নলিখিত স্ক্রিনশটে প্রদর্শিত ডেটার উপর ভিত্তি করে ভবিষ্যতের ওয়েবসাইট ট্র্যাফিকের পূর্বাভাস দেবে
শেষ লক্ষ্য হল গ্রাহকের কাছে ভবিষ্যতে তাদের ওয়েবসাইটে কী ট্র্যাফিক উপস্থিত হবে সে সম্পর্কে একটি পরিষ্কার দৃষ্টিভঙ্গি থাকা, যার ফলে তারা আরও ভাল অন্তর্দৃষ্টি পেতে এবং সাধারণভাবে ব্যবসায়িক পরিকল্পনা উন্নত করতে দেয়।
পরিকল্পনার সময় AI এবং API পরিষেবাগুলির মধ্যে যোগাযোগের জন্য RabbitMQ বার্তা ব্রোকারের সাথে মাইক্রোসার্ভিস আর্কিটেকচারের সাথে এগিয়ে যাওয়ার সিদ্ধান্ত নেওয়া হয়েছিল।
প্রথমত, আমাদের একটি পৃথক ডাটাবেসে প্রতি ঘন্টা ক্রোন টাস্ক সহ ডেটা সংগ্রহ করতে হবে। আমরা একটি ক্লিক হাউস বেছে নেওয়ার সিদ্ধান্ত নিয়েছি, যেহেতু Swetrix-এর ওয়েবসাইটের মূল ডেটা এতে সংরক্ষিত আছে। ফরম্যাট সম্পর্কে বিস্তারিত পরবর্তী বিভাগগুলিতে কভার করা হবে।
RabbitMQ এর সরলতার কারণে একটি বার্তা ব্রোকার হিসেবে বেছে নেওয়া হয়েছে এবং আমাদের AI এবং API পরিষেবার মধ্যে যোগাযোগ স্থাপন করতে হবে। আসুন সবকিছু ভেঙ্গে ফেলি এবং মূল যুক্তি পরীক্ষা করি
Swetrix-AI পরিষেবা ব্যাকএন্ড সাইডের জন্য NestJs ফ্রেমওয়ার্ক এবং ডেটা প্রাক-প্রসেসিং এবং মডেল পূর্বাভাসের জন্য Python স্ক্রিপ্ট ব্যবহার করবে।
আমরা একটি analytics
টেবিলে প্রকল্প সম্পর্কে নিম্নলিখিত তথ্য সংগ্রহ করি। আপনি ইতিমধ্যে নিবন্ধের প্রথম বিভাগে এই ডেটার রেন্ডার করা সংস্করণ দেখেছেন৷
আমি নিম্নলিখিত প্রশ্নের সাথে এই (প্রায় গ্রহণযোগ্য) ফলাফল অর্জন করতে সক্ষম হয়েছি:
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
ফাংশনটি ক্রন জব ব্যবহার করে প্রতি ঘন্টায় চালানোর জন্য নির্ধারিত হয়। এটি একটি ক্লিকহাউস analytics.hourly_projects_data
এ বিশ্লেষণ ডেটা সংগ্রহ করে এবং সন্নিবেশ করে।
আউটপুট
ClickHouse সীমাবদ্ধতার কারণে আমি ডেটার পছন্দসই বিন্যাস অর্জন করতে পারিনি। তাই আমি মডেলের প্রশিক্ষণের জন্য প্রয়োজনীয় প্রিপ্রসেসিং সম্পূর্ণ করতে pandas
ব্যবহার করার সিদ্ধান্ত নিয়েছি।
বিশেষত আমি নিম্নলিখিত কাজ করতে পাইথন ব্যবহার করেছি:
একটি JSON ফিল্ডে একটি বিভাগের সাথে সম্পর্কিত কী এবং মানগুলিকে একত্রিত করুন, যেমন একটি বস্তুতে ডিভাইসগুলির কী এবং মানগুলিকে একত্রিত করা।
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
এর মধ্যে:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
কোড এবং আউটপুট সংযুক্ত করা হচ্ছে:
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
ডেটার এই বিন্যাসটি ভবিষ্যদ্বাণীর জন্য ব্যবহার করা হবে না, আমি বলব, এটি ডেটাবেসে সংরক্ষণ করার জন্য এবং ডিবাগিংয়ের উদ্দেশ্যে আরও বেশি কিছু যাচাই করার জন্য যে কোনও অনুপস্থিত মান নেই এবং উপরন্তু, মডেলটি একটি নির্ভুল উত্পাদন করে তা দুবার চেক করার জন্য ফলাফল.
একটি পর্যাপ্ত মডেল প্রশিক্ষণের জন্য আমি বিভিন্ন বিভাগের জন্য অন্যান্য গ্রুপ সংজ্ঞায়িত করার সিদ্ধান্ত নিয়েছি। যার মানে হল যে যদি বিশ্বব্যাপী একটি নির্দিষ্ট বিভাগে একটি গোষ্ঠীর দৃষ্টান্তের সংখ্যা একটি নির্দিষ্ট শতাংশ (%) এর নীচে হয় তবে এটি অন্যটির অংশ হিসাবে যোগ করা হবে।
উদাহরণস্বরূপ, os
বিভাগে আমাদের রয়েছে:
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
যেহেতু এই ক্ষেত্রে লিনাক্স এবং টেম্পলওএস উভয়ই অত্যন্ত বিরল তারা অন্য গ্রুপে একত্রিত হবে, তাই শেষ ফলাফল হবে:
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
এবং "বিরলতা" বিভাগের উপর নির্ভর করে এবং এই বিভাগের থ্রেশহোল্ডে মনোনীত হওয়ার উপর ভিত্তি করে আলাদাভাবে নির্ধারিত হয়।
এটি গ্রাহকের পছন্দ এবং পছন্দসই ডেটার উপর ভিত্তি করে কনফিগারযোগ্য হতে পারে
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
এটি অর্জন করার জন্য 2টি ফাংশন বাস্তবায়িত হয়েছিল
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
আউটপুট
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
মেশিন লার্নিং মডেলগুলির সাথে কাজ করার সময়, এটি অত্যন্ত গুরুত্বপূর্ণ যে ইনপুট ডেটা এমন একটি বিন্যাসে রয়েছে যা মডেলটি বুঝতে পারে৷ মেশিন লার্নিং মডেলের জন্য সাধারণত JSON-এর মতো জটিল ডেটা স্ট্রাকচারের পরিবর্তে সংখ্যাসূচক মান (পূর্ণসংখ্যা, ফ্লোট) প্রয়োজন।
অতএব, আবার, এই প্রয়োজনীয়তা পূরণ করার জন্য আমাদের ডেটার একটু বেশি প্রিপ্রসেসিং করা পছন্দনীয়।
আমি একটি ফাংশন create_exploded_df
তৈরি করেছি যেখানে প্রতিটি বৈশিষ্ট্য একটি পৃথক কলাম হিসাবে উপস্থাপিত হয় এবং সারিগুলিতে সংশ্লিষ্ট সংখ্যাসূচক মান রয়েছে। (এটি এখনও আদর্শ নয়, তবে এটিই সেরা সমাধান ছিল যা আমি উত্পাদন করতে সক্ষম হয়েছিলাম)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 ঘন্টা পূরণ করুন
আমাদের কাছে থাকা ডেটার বিন্যাসের সাথে আরেকটি সমস্যা হল যে একটি ফাঁকা সারি তৈরি করার পরিবর্তে একটি নির্দিষ্ট ঘন্টার মধ্যে যদি কোনও প্রকল্পের জন্য কোনও ট্র্যাফিক না থাকে তবে সেখানে কোনও সারি থাকবে না, যা মডেলটি ডিজাইন করা হয়েছে তা বিবেচনা করে অসুবিধাজনক। আসন্ন সময় ফ্রেমের জন্য ডেটা ভবিষ্যদ্বাণী করুন (যেমন, পরবর্তী ঘন্টা)। যাইহোক, প্রাথমিক সময় ফ্রেমের জন্য কোন ডেটা উপলব্ধ না থাকলে ভবিষ্যদ্বাণী করার জন্য মডেলটিকে প্রশিক্ষণ দেওয়া সম্ভব নয়।
তাই আমি একটি স্ক্রিপ্ট লিখেছি যা অনুপস্থিত ঘন্টা খুঁজে পাবে এবং একটি ঘন্টা এড়িয়ে গেলে ফাঁকা সারি সন্নিবেশ করবে
মডেল প্রশিক্ষণের বিষয়ে, প্রাথমিক পদ্ধতিটি ছিল মডেলের লক্ষ্য হিসাবে আগের ঘন্টার ডেটা ব্যবহার করা। এটি মডেলটিকে বর্তমান ডেটার উপর ভিত্তি করে ভবিষ্যতের ট্র্যাফিকের পূর্বাভাস দিতে দেয়।
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
আউটপুট
statisticsGathered
পৃথক কলামে সংগৃহীত এই ধরনের পদ্ধতির প্রধান কারণ হল যে statisticsGathered
একটি datetime
বস্তু ছিল, যে মডেলগুলি আমি ব্যবহার করার চেষ্টা করেছি (পরবর্তী বিভাগগুলি পরীক্ষা করুন) এটি প্রক্রিয়া করতে এবং সঠিক প্যাটার্ন সনাক্ত করতে সক্ষম নয়।
এর ফলে ভয়ানক MSE/MRSE
মেট্রিক্স হয়েছে। তাই বিকাশের সময় day
, month
এবং hour
জন্য বৈশিষ্ট্যগুলিকে আলাদা করার সিদ্ধান্ত নেওয়া হয়েছিল যা ফলাফলগুলিকে উল্লেখযোগ্যভাবে উন্নত করেছে।
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
এবং এটাই! এর প্রশিক্ষণ নিজেই ঝাঁপ দেওয়া যাক! 🎉🎉🎉
ঠিক আছে, আমি অনুমান করি, এই অ্যাপ্লিকেশনটি তৈরি করার সময় প্রকৃত ভবিষ্যদ্বাণীটি ছিল সবচেয়ে চ্যালেঞ্জিং অংশ।
প্রথম জিনিস যা আমি চেষ্টা করতে চেয়েছিলাম তা হল LinearRegression
মডেল ব্যবহার করা:
আমি নিম্নলিখিত ফাংশন বাস্তবায়ন করেছি:
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
প্রতিটি লক্ষ্য কলামের জন্য, আমরা প্রশিক্ষণ এবং পরীক্ষা সেটে ডেটা বিভক্ত করি। তারপরে আমরা প্রশিক্ষণ ডেটার উপর একটি LinearRegression
মডেল প্রশিক্ষিত করি এবং পরীক্ষার ডেটাতে ভবিষ্যদ্বাণী করি।
ফলাফলগুলি সঠিক তা মূল্যায়ন করার জন্য আমি ফাংশন যোগ করেছি যা প্রয়োজনীয় মেট্রিক্স সংগ্রহ করে এবং আউটপুট তৈরি করে
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
আমি একটি স্ক্রিপ্ট লিখেছি যা আউটপুট তৈরি করেছে এবং এটিকে এক্সেল ফাইল, অ্যাকাউন্টিং mse
, rmse
, mae
এবং mean_y
মানগুলিতে সংরক্ষণ করেছে
আপনি দেখতে পাচ্ছেন যে মেট্রিকগুলি সন্তোষজনক নয় এবং ভবিষ্যদ্বাণী করা ট্র্যাফিক ডেটা সঠিক থেকে অনেক দূরে এবং আমার ট্র্যাফিক পূর্বাভাসের লক্ষ্যগুলির জন্য উপযুক্ত নয়৷
অতএব, আমি প্রতি ঘন্টায় মোট দর্শকের ভবিষ্যদ্বাণী করার সিদ্ধান্ত নিয়েছি, যাতে নিম্নলিখিত ফাংশনগুলি তৈরি করা হয়
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
এই ফাংশনটি একটি নির্দিষ্ট বিভাগ নেয় এবং এর উপর ভিত্তি করে মোট দর্শক গণনা করে। এটি কাজ করে কারণ ডিভাইসের মানের মোট সংখ্যা OS মানের মোট সংখ্যার সমান হবে।
এই ধরনের পদ্ধতির সাথে, মডেলটি আগের তুলনায় 10 গুণ ভাল ফলাফল দেখিয়েছে ।
যদি আমরা এই ক্ষেত্রে কথা বলছি, এটি প্রায় গ্রহণযোগ্য এবং বৈশিষ্ট্য ব্যবহার করার জন্য প্রস্তুত। গ্রাহকরা এখন এই ভবিষ্যদ্বাণীগুলির ফলাফলের উপর নির্ভর করে তাদের বাজেট বরাদ্দ এবং সার্ভার স্কেলিং পরিকল্পনা করতে পারে
ভবিষ্যদ্বাণীগুলি প্রকৃত মান থেকে প্রায় 2.45 দর্শকদের দ্বারা বিচ্যুত হয় (যেহেতু RMSE = √MSE ) । যা বিপণনের প্রয়োজনের জন্য কোন নেতিবাচক গুরুত্বপূর্ণ প্রভাব ফেলতে পারে না।
যেহেতু এই নিবন্ধটি বেশ বিস্তৃত হয়েছে এবং অ্যাপটি বিকাশের অধীনে রয়েছে, আমরা এখানে বিরতি দেব। আমরা এই পদ্ধতির পরিমার্জন চালিয়ে যাব এবং আমি আপনাকে আপডেট রাখব!
পড়ার এবং আপনার মনোযোগের জন্য ধন্যবাদ! আমি মন্তব্য বিভাগে আপনার প্রতিক্রিয়া এবং চিন্তা শোনার জন্য উন্মুখ. আমি এই তথ্য আপনার উদ্দেশ্য জন্য দরকারী হতে প্রমাণিত আশা করি!
এবং সৌভাগ্য!