In diesem Artikel möchte ich meine Erfahrungen mit der Entwicklung eines KI-Dienstes für eine Webanalyseplattform namens Swetrix teilen.
Mein Ziel war die Entwicklung eines maschinellen Lernmodells, das den zukünftigen Website-Verkehr auf Grundlage der im folgenden Screenshot angezeigten Daten vorhersagt
Das Endziel besteht darin, dem Kunden eine klare Vorstellung davon zu vermitteln, welcher Datenverkehr künftig auf seiner Website erscheinen wird, um ihm dadurch bessere Einblicke zu ermöglichen und seine Geschäftsplanung allgemein zu verbessern.
Während der Planung wurde entschieden, mit einer Microservice-Architektur mit RabbitMQ-Nachrichtenbroker für die Kommunikation zwischen KI- und API-Diensten fortzufahren.
Zunächst müssen wir mit einem stündlichen Cron-Task Daten in einer separaten Datenbank sammeln. Wir haben uns für ClickHouse entschieden, da dort die Originaldaten von Websites auf Swetrix gespeichert sind. Details zum Format werden in den nächsten Abschnitten behandelt.
RabbitMQ wurde aufgrund seiner Einfachheit als Nachrichtenbroker ausgewählt und wir müssen eine Kommunikation zwischen KI- und API-Diensten herstellen. Lassen Sie uns alles aufschlüsseln und die Hauptlogik überprüfen
Der Swetrix-AI-Dienst verwendet das NestJs-Framework für die Backend-Seite und Python-Skripte für die Vorverarbeitung der Daten und Modellvorhersagen.
Wir erfassen die folgenden Daten zu Projekten in einer analytics
. Die gerenderte Version dieser Daten haben Sie bereits im ersten Abschnitt des Artikels gesehen.
Dieses (fast akzeptable) Ergebnis konnte ich mit folgender Abfrage erzielen:
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
Die Funktion wird stündlich über einen Cron-Job ausgeführt. Sie sammelt Analysedaten und fügt sie in ein Clickhouse- analytics.hourly_projects_data
ein.
Ausgabe
Aufgrund von ClickHouse-Einschränkungen konnte ich das gewünschte Datenformat nicht erreichen. Daher habe ich beschlossen, pandas
zu verwenden, um die für das Training des Modells erforderliche Vorverarbeitung durchzuführen.
Insbesondere habe ich Python für Folgendes verwendet:
Kombinieren Sie Schlüssel und Werte, die sich auf eine Kategorie beziehen, in einem JSON-Feld. Kombinieren Sie beispielsweise Schlüssel und Werte von Geräten in einem einzigen Objekt.
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
Hinein:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
Anhängen des Codes und der Ausgabe:
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
Dieses Datenformat wird nicht für die Vorhersage selbst verwendet. Ich würde sagen, es dient eher zum Speichern in der Datenbank und zu Debugging-Zwecken, um zu überprüfen, dass keine Werte fehlen, und darüber hinaus, um noch einmal zu kontrollieren, dass das Modell ein genaues Ergebnis liefert.
Um ein geeignetes Modell zu trainieren, habe ich beschlossen, andere Gruppen für verschiedene Kategorien zu definieren. Das bedeutet, dass, wenn die Anzahl der Instanzen einer Gruppe in einer bestimmten Kategorie insgesamt unter einem bestimmten Prozentsatz (%) liegt, diese als Teil der anderen hinzugefügt wird.
Beispielsweise haben wir in der Kategorie os
:
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
Da sowohl Linux als auch TempleOS in diesem Fall extrem selten sind, werden sie in einer anderen Gruppe zusammengefasst. Das Endergebnis lautet daher:
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
Und die „Seltenheit“ wird je nach Kategorie unterschiedlich bestimmt und basiert auf dem dieser Kategorie zugewiesenen Schwellenwert.
Es kann basierend auf den Präferenzen und gewünschten Daten für den Kunden konfiguriert werden
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
Um dies zu erreichen, wurden 2 Funktionen implementiert
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
Ausgabe
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
Bei der Arbeit mit Modellen für maschinelles Lernen ist es entscheidend, dass die Eingabedaten in einem Format vorliegen, das das Modell verstehen kann. Modelle für maschinelles Lernen erfordern in der Regel numerische Werte (Ganzzahlen, Gleitkommazahlen) und keine komplexen Datenstrukturen wie JSON.
Daher ist es wiederum vorzuziehen, unsere Daten etwas gründlicher vorzuverarbeiten, um dieser Anforderung gerecht zu werden.
Ich habe eine Funktion create_exploded_df
erstellt, bei der jedes Feature als separate Spalte dargestellt wird und die Zeilen die entsprechenden numerischen Werte enthalten. (Das ist noch nicht ideal, aber das war die beste Lösung, die ich finden konnte.)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 Stunden eintragen
Ein weiteres Problem mit dem Datenformat, das wir hatten, war, dass, wenn in einer bestimmten Stunde kein Verkehr für ein Projekt vorhanden war, statt einer leeren Zeile überhaupt keine Zeile erstellt wurde. Das ist unpraktisch, wenn man bedenkt, dass das Modell darauf ausgelegt ist, Daten für den kommenden Zeitraum (z. B. die nächste Stunde) vorherzusagen. Es ist jedoch nicht möglich, das Modell für Vorhersagen zu trainieren, wenn für den anfänglichen Zeitraum keine Daten verfügbar sind.
Deshalb habe ich ein Skript geschrieben, das fehlende Stunden findet und leere Zeilen einfügt, wenn eine Stunde übersprungen wird
Beim Modelltraining bestand der primäre Ansatz darin, Daten der letzten Stunde als Ziel für das Modell zu verwenden. Dadurch kann das Modell den zukünftigen Verkehr auf Grundlage der aktuellen Daten vorhersagen.
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
Ausgabe
statisticsGathered
separaten Spalten zusammengefasst Der Hauptgrund für einen solchen Ansatz besteht darin, dass statisticsGathered
ein datetime
war und die Modelle, die ich zu verwenden versucht habe (siehe die folgenden Abschnitte), nicht in der Lage waren, es zu verarbeiten und das richtige Muster zu erkennen.
Das Ergebnis waren schreckliche MSE/MRSE
-Kennzahlen. Daher wurde während der Entwicklung entschieden, die Funktionen für day
, month
und hour
zu trennen, was die Ergebnisse deutlich verbesserte.
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
Und das ist es! Kommen wir nun zum Training selbst! 🎉🎉🎉
Nun, ich denke, die eigentliche Vorhersage war der schwierigste Teil beim Erstellen dieser Anwendung.
Als Erstes wollte ich versuchen, LinearRegression
Modell zu verwenden:
Folgende Funktionen habe ich implementiert:
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
Für jede Zielspalte teilen wir die Daten in Trainings- und Testdatensätze auf. Anschließend trainieren wir ein LinearRegression
Modell anhand der Trainingsdaten und treffen Vorhersagen anhand der Testdaten.
Um zu beurteilen, ob die Ergebnisse korrekt sind, habe ich die Funktion hinzugefügt, die die erforderlichen Metriken sammelt und die Ausgabe erzeugt
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
Ich habe ein Skript geschrieben, das die Ausgabe generiert und in einer Excel-Datei gespeichert hat, wobei die Werte mse
, rmse
, mae
und mean_y
berücksichtigt wurden
Wie Sie sehen, sind die Messwerte nicht zufriedenstellend und die vorhergesagten Verkehrsdaten werden alles andere als genau sein und für meine Ziele der Verkehrsprognosen nicht geeignet sein.
Daher habe ich mich entschieden, die Gesamtzahl der Besucher pro Stunde vorherzusagen, sodass die folgenden Funktionen erstellt wurden
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
Diese Funktion verwendet eine bestimmte Kategorie und berechnet darauf basierend die Gesamtzahl der Besucher. Dies funktioniert, weil die Gesamtzahl der Gerätewerte mit der Gesamtzahl der Betriebssystemwerte übereinstimmt.
Mit einem solchen Ansatz zeigte das Modell zehnmal bessere Ergebnisse als zuvor .
In diesem Fall handelt es sich um eine fast akzeptable und einsatzbereite Funktion. Kunden können nun ihre Budgetzuweisung und Serverskalierung abhängig vom Ergebnis dieser Vorhersagen planen.
Die Vorhersagen weichen von den tatsächlichen Werten um ca. 2,45 Besucher ab (da RMSE = √MSE ) . Dies kann für die Marketingzwecke keinen wesentlichen negativen Einfluss haben.
Da dieser Artikel ziemlich umfangreich geworden ist und die App noch in der Entwicklung ist, machen wir hier eine Pause. Wir werden diesen Ansatz in Zukunft weiter verfeinern und ich halte Sie auf dem Laufenden!
Vielen Dank fürs Lesen und Ihre Aufmerksamkeit! Ich freue mich auf Ihr Feedback und Ihre Gedanken im Kommentarbereich. Ich hoffe, diese Informationen sind für Ihre Ziele nützlich!
Und viel Glück!