この記事では、Swetrix と呼ばれる Web 分析プラットフォーム向けの AI サービスの開発経験を共有したいと思います。 私の目的は、次のスクリーンショットに表示されているデータに基づいて将来のウェブサイトトラフィックを予測する機械学習モデルを開発することでした。 最終目標は、将来的にウェブサイトにどのようなトラフィックが発生するかについて顧客に明確なビジョンを示し、それによって顧客がより優れた洞察を得て、ビジネス計画全体を強化できるようにすることです。 2. 要件とアーキテクチャ 計画中に、AI と API サービス間の通信に RabbitMQ メッセージ ブローカーを使用したマイクロサービス アーキテクチャを採用することが決定されました。 まず、1 時間ごとの cron タスクを使用してデータを別のデータベースに収集する必要があります。Swetrix の Web サイトの元のデータが ClickHouse に保存されているため、ClickHouse を選択することにしました。形式の詳細については、次のセクションで説明します。 RabbitMQはシンプルさからメッセージブローカーとして選ばれ、AIとAPIサービス間の通信を確立する必要があります。すべてを分解して、主なロジックを確認しましょう。 Swetrix-API サービス: Cron タスクを介して 1 時間ごとにデータ統計を収集し、生データを AI サービスに送信します。 ClickHouse から前処理されたデータを挿入し、受信します。 Swetrix-AI サービス: 予測のために生データと選択した設定 (間隔とサブカテゴリ) を処理します。 予測データを JSON 形式に変換し、RabbitMQ 経由で API サービスに送り返します。 Swetrix-AI サービスでは、バックエンド側に NestJs フレームワークを使用し、データの前処理とモデル予測に Python スクリプトを使用します。 3. 前処理 プロジェクトに関する次のデータを テーブルに収集します。 このデータのレンダリングされたバージョンは、この記事の最初のセクションですでにご覧になっています。 analytics 次のクエリで、この (ほぼ許容できる) 結果を達成できました。 @Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, ) この関数は、Cron ジョブを使用して 1 時間ごとに実行されるようにスケジュールされています。分析データを収集し、clickhouse に挿入します。 analytics.hourly_projects_data 出力 ClickHouse の制限により、データの希望する形式を実現できませんでした。そのため、モデルのトレーニングに必要な前処理を完了するために、 を使用することにしました。 pandas 具体的には、Python を使用して次の操作を実行しました。 3.1 キーと値を組み合わせる 1 つのカテゴリに関連するキーと値を 1 つの JSON フィールドに結合します。たとえば、デバイスのキーと値を 1 つのオブジェクトに結合します。 os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5} の中へ: os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5} コードと出力を添付します: def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result) この形式のデータは予測自体には使用されず、むしろデータベースに保存してデバッグし、欠損値がないことを確認し、さらにモデルが正確な結果を生成していることを再確認するために使用されます。 出力 3.2 キーと値を組み合わせる 適切なモデルをトレーニングするために、さまざまなカテゴリに他のグループを定義することにしました。つまり、特定のカテゴリ内のグループのインスタンス数が全体的に特定のパーセント (%) を下回る場合、そのグループは他のグループの一部として追加されます。 たとえば、 カテゴリには次のものがあります。 os {“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10} この場合、Linux と TempleOS はどちらも非常にまれであるため、 に結合され、最終結果は次のようになります。 他の グループ {“MacOS”: 300, “Windows”: 400, “other”: 33}. そして、「希少性」はカテゴリーによって異なり、そのカテゴリーに指定されたしきい値に基づいて決定されます。 顧客の好みや希望するデータに基づいて設定できます other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 } これを実現するために2つの機能が実装されました def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values 出力 ['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other'] 機械学習モデルを使用する場合、入力データがモデルが理解できる形式であることが重要です。機械学習モデルでは通常、JSON のような複雑なデータ構造ではなく、数値 (整数、浮動小数点数) が必要です。 したがって、この要件を満たすには、もう一度、データをもう少し前処理することが望ましいです。 私は、各特徴が別々の列として表され、行に対応する数値が含まれる関数 を作成しました。(まだ理想的ではありませんが、これが私が生み出すことができた最善の解決策でした) create_exploded_df def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df 出力 3.3 勤務時間を記入する 私たちが持っていたデータの形式に関するもう 1 つの問題は、特定の時間にプロジェクトのトラフィックがなかった場合、空白の行が作成されるのではなく、行がまったく存在しないことです。これ は、モデルが今後の時間枠 (次の 1 時間など) のデータを予測するように設計されているという事実を考慮すると不便です。ただし、最初の時間枠に利用可能なデータがない場合、予測を行うようにモデルをトレーニングすることは現実的ではありません。 そこで、私は、欠落している時間を見つけて、1時間がスキップされたときに空白行を挿入するスクリプトを作成しました。 3.4 ターゲット列の追加と移動 モデルのトレーニングに関しては、 主なアプローチは、前の 1 時間のデータをモデルのターゲットとして使用することでした。これにより、モデルは現在のデータに基づいて将来のトラフィックを予測できます。 def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df) 出力 を分割する 3.5 statisticsGathered 別々の列に収集する このようなアプローチの主な理由は、 が オブジェクトであり、私が使用しようとしたモデル (後続のセクションを参照) ではそれを処理できず、正しいパターンを識別できないことです。 statisticsGathered datetime その結果、 メトリックはひどいものになりました。そのため、開発中に、 、 、 ごとに機能を分離するという決定が下され、結果が大幅に向上しました。 MSE/MRSE day month hour def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df 出力 以上です!それではトレーニングそのものに移りましょう!🎉🎉🎉 4. 線形回帰 そうですね、このアプリケーションを構築する上で最も困難だったのは、実際の予測だったと思います。 最初に試してみたかったのは、 モデルを使用することです。 LinearRegression 以下の機能を実装しました。 def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data 説明 各ターゲット列について、データをトレーニング セットとテスト セットに分割します。次に、トレーニング データで モデルをトレーニングし、テスト データで予測を行います。 LinearRegression 結果が正しいかどうかを評価するために、必要なメトリックを収集して出力を生成する関数を追加しました。 def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation) 出力 私は出力を生成し、それをExcelファイルに保存するスクリプトを書きました 、 、 、 値を計算します。 mse rmse mae mean_y ご覧のとおり、メトリックは満足できるものではなく、予測されるトラフィック データは正確とはほど遠く、トラフィック予測の目的には適していません。 そこで、1時間あたりの訪問者数を予測することにし、以下の関数を作成しました。 def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']] 出力 この関数は特定のカテゴリを取得し、それに基づいて合計訪問者数を計算します。デバイス値の合計数が OS 値の合計数と同じになるため、これが機能します。 。 このようなアプローチにより、モデルは以前よりも 10 倍優れた結果を示しました 5。結論 このケースについて言えば、それはほぼ受け入れられ、すぐに使用できる機能です。顧客はこれらの予測の結果に応じて予算配分とサーバーの拡張を計画できるようになりました。 予測値は 。 実際の値から約 2.45 人の訪問者数だけずれています (RMSE = √MSE であるため ) これはマーケティング ニーズに重大な悪影響を及ぼすことはありません。 この記事はかなり長くなり、アプリはまだ開発中なので、ここで一旦中断します。今後もこのアプローチを改良し続けますので、引き続き最新情報をお伝えします。 読んでいただき、また注目していただき、ありがとうございます。コメント欄で皆さんのフィードバックやご意見をお待ちしております。この情報が皆さんの目的に役立つことを願っています。 そして成功を祈る!