在本文中,我想分享我为名为 Swetrix 的网络分析平台开发 AI 服务的经验。
我的目标是开发一个机器学习模型,根据以下屏幕截图显示的数据预测未来的网站流量
最终目标是让客户清楚地了解未来他们的网站上会出现什么流量,从而使他们获得更好的洞察力并增强整体业务规划。
在规划期间,我们决定采用微服务架构,并使用 RabbitMQ 消息代理来实现 AI 和 API 服务之间的通信。
首先,我们需要使用每小时的 cron 任务将数据收集到单独的数据库中。我们决定选择 ClickHouse,因为 Swetrix 上的网站的原始数据存储在其中。有关格式的详细信息将在下一节中介绍。
选择 RabbitMQ 作为消息代理是因为它的简单性,我们需要在 AI 和 API 服务之间建立通信。让我们分解所有内容并检查主要逻辑
Swetrix-AI 服务将使用 NestJs 框架作为后端,并使用 Python 脚本进行数据预处理和模型预测。
我们将有关项目的以下数据收集到analytics
表中。 您已经在文章的第一部分看到了该数据的渲染版本。
我能够通过以下查询获得这个(几乎可以接受的)结果:
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
该函数使用 Cron Job 安排每小时运行一次。它收集分析数据并将其插入到 clickhouse analytics.hourly_projects_data
中。
输出
由于 ClickHouse 的限制,我无法实现所需的数据格式。因此我决定使用pandas
来完成模型训练所需的预处理。
具体来说,我使用 Python 执行以下操作:
将与一个类别相关的键和值组合成一个 JSON 字段,例如将设备的键和值组合成一个对象。
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
进入:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
附加代码和输出:
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
我想说,这种格式的数据不会用于预测本身,它更多地用于将其存储在数据库中并用于调试目的,以验证没有缺失值,此外,还要仔细检查模型是否产生准确的结果。
为了训练一个合适的模型,我决定为不同的类别定义其他组。这意味着,如果某个特定类别中某个组的实例数量在全球范围内低于某个百分比 (%),它将被添加为其他组的一部分。
例如,在os
类别中,我们有:
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
由于 Linux 和 TempleOS 都极为罕见,因此它们将被合并到其他组中,因此最终结果将是:
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
并且“稀有度”是根据类别不同而不同地确定的,并且基于为该类别指定的阈值。
它可以根据客户的偏好和所需数据进行配置
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
为了实现这一点,我们实现了两个函数
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
输出
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
使用机器学习模型时,输入数据必须采用模型可以理解的格式,这一点至关重要。机器学习模型通常需要数值(整数、浮点数),而不是像 JSON 这样的复杂数据结构。
因此,再次强调,最好对我们的数据进行一些预处理以满足这一要求。
我创建了一个函数create_exploded_df
,其中每个特征都表示为单独的列,行包含相应的数值。(虽然还不理想,但这是我能想到的最佳解决方案)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 填写小时数
我们拥有的数据格式的另一个问题是,如果某个项目在特定小时内没有流量,那么就不会创建一个空白行,而会根本没有行,考虑到该模型旨在预测即将到来的时间范围(例如,下一个小时)的数据,这很不方便。但是,如果初始时间范围内没有可用的数据,则无法训练模型进行预测。
因此我编写了一个脚本,可以查找缺失的小时数,并在跳过一个小时时插入空白行
在模型训练方面,主要方法是使用前一小时的数据作为模型的目标。这使得模型能够根据当前数据预测未来的流量。
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
输出
statisticsGathered
收集到单独的列中采用这种方法的主要原因是statisticsGathered
是一个datetime
对象,我尝试使用的模型(查看后续部分)无法处理它并识别正确的模式。
这导致MSE/MRSE
指标非常糟糕。因此,在开发过程中,我们决定将特征分为day
、 month
和hour
,这显著提高了结果。
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
就这样!让我们开始训练吧!🎉🎉🎉
嗯,我猜,实际预测是构建此应用程序过程中最具挑战性的部分。
我想尝试的第一件事是使用LinearRegression
模型:
我实现了以下功能:
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
对于每个目标列,我们将数据分为训练集和测试集。然后我们在训练数据上训练LinearRegression
模型,并对测试数据进行预测。
为了评估结果是否正确,我添加了收集所需指标并生成输出的函数
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
我编写了一个脚本,生成输出并将其保存到 excel 文件中,计算mse
、 rmse
、 mae
和mean_y
值
正如您所看到的,这些指标并不令人满意,预测的交通数据远非准确,并且不适合我的交通预测目标。
因此,我决定预测每小时的访客总数,因此创建了以下函数
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
此函数采用特定类别并据此计算访客总数。此方法有效,因为设备值的总数与操作系统值的总数相同。
通过这种方法,模型表现出的结果比以前好了 10 倍。
如果我们谈论的是这种情况,那么它几乎是可以接受的,并且可以立即使用的功能。客户现在可以根据这些预测的结果来规划预算分配和服务器扩展
预测值与实际值相差约 2.45 名访客(因为 RMSE = √MSE ) 。这对营销需求不会产生任何负面的关键影响。
由于本文内容已经非常丰富,并且应用程序仍在开发中,我们将在此暂停。我们将继续改进此方法,并随时向您更新最新情况!
感谢您的阅读和关注!我期待在评论部分听到您的反馈和想法。我希望这些信息对您的目标有用!
还有祝你好运!