paint-brush
Wie ich einen KI-Dienst für Analysen aufbauevon@pro1code1hack
617 Lesungen
617 Lesungen

Wie ich einen KI-Dienst für Analysen aufbaue

von Yehor Dremliuha12m2024/05/23
Read on Terminal Reader

Zu lang; Lesen

In diesem Artikel möchte ich meine Erfahrungen mit der Entwicklung eines KI-Dienstes für eine Webanalyseplattform namens Swetrix teilen. Mein Ziel war es, ein maschinelles Lernmodell zu entwickeln, das den zukünftigen Website-Verkehr auf der Grundlage der im folgenden Screenshot angezeigten Daten vorhersagt. Das Endziel besteht darin, dem Kunden eine klare Vorstellung davon zu geben, welcher Verkehr in Zukunft auf seiner Website erscheinen wird.
featured image - Wie ich einen KI-Dienst für Analysen aufbaue
Yehor Dremliuha HackerNoon profile picture
0-item
1-item

In diesem Artikel möchte ich meine Erfahrungen mit der Entwicklung eines KI-Dienstes für eine Webanalyseplattform namens Swetrix teilen.


Mein Ziel war die Entwicklung eines maschinellen Lernmodells, das den zukünftigen Website-Verkehr auf Grundlage der im folgenden Screenshot angezeigten Daten vorhersagt

Abbildung 1 - Das Projekt

Das Endziel besteht darin, dem Kunden eine klare Vorstellung davon zu vermitteln, welcher Datenverkehr künftig auf seiner Website erscheinen wird, um ihm dadurch bessere Einblicke zu ermöglichen und seine Geschäftsplanung allgemein zu verbessern.

2. Anforderungen und Architektur

Während der Planung wurde entschieden, mit einer Microservice-Architektur mit RabbitMQ-Nachrichtenbroker für die Kommunikation zwischen KI- und API-Diensten fortzufahren.


Abbildung 2 - Architektur


Zunächst müssen wir mit einem stündlichen Cron-Task Daten in einer separaten Datenbank sammeln. Wir haben uns für ClickHouse entschieden, da dort die Originaldaten von Websites auf Swetrix gespeichert sind. Details zum Format werden in den nächsten Abschnitten behandelt.


RabbitMQ wurde aufgrund seiner Einfachheit als Nachrichtenbroker ausgewählt und wir müssen eine Kommunikation zwischen KI- und API-Diensten herstellen. Lassen Sie uns alles aufschlüsseln und die Hauptlogik überprüfen

Swetrix-API-Dienst:

  • Sammelt stündlich Datenstatistiken per Crontask und sendet Rohdaten an den KI-Dienst.
  • Fügt vorverarbeitete Daten von ClickHouse ein und empfängt diese.

Swetrix-AI-Dienst:

  • Verarbeitet die Rohdaten und ausgewählten Präferenzen (Intervall und Unterkategorie) für die Prognose.
  • Konvertiert die Prognosedaten in das JSON-Format und sendet sie über RabbitMQ an den API-Dienst zurück.


Der Swetrix-AI-Dienst verwendet das NestJs-Framework für die Backend-Seite und Python-Skripte für die Vorverarbeitung der Daten und Modellvorhersagen.

3. Vorverarbeitung

Wir erfassen die folgenden Daten zu Projekten in einer analytics . Abbildung 3 - Rohdaten in der Datenbank Die gerenderte Version dieser Daten haben Sie bereits im ersten Abschnitt des Artikels gesehen.

Dieses (fast akzeptable) Ergebnis konnte ich mit folgender Abfrage erzielen:

 @Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )

Die Funktion wird stündlich über einen Cron-Job ausgeführt. Sie sammelt Analysedaten und fügt sie in ein Clickhouse- analytics.hourly_projects_data ein.

Ausgabe

Abbildung 4 - Verarbeitete Daten
Aufgrund von ClickHouse-Einschränkungen konnte ich das gewünschte Datenformat nicht erreichen. Daher habe ich beschlossen, pandas zu verwenden, um die für das Training des Modells erforderliche Vorverarbeitung durchzuführen.


Insbesondere habe ich Python für Folgendes verwendet:

3.1 Schlüssel und Werte kombinieren

Kombinieren Sie Schlüssel und Werte, die sich auf eine Kategorie beziehen, in einem JSON-Feld. Kombinieren Sie beispielsweise Schlüssel und Werte von Geräten in einem einzigen Objekt.

 os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}

Hinein:

 os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}

Anhängen des Codes und der Ausgabe:

 def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)


Dieses Datenformat wird nicht für die Vorhersage selbst verwendet. Ich würde sagen, es dient eher zum Speichern in der Datenbank und zu Debugging-Zwecken, um zu überprüfen, dass keine Werte fehlen, und darüber hinaus, um noch einmal zu kontrollieren, dass das Modell ein genaues Ergebnis liefert.

Ausgabe
Abbildung 5 - Gespeicherte Daten Pandas Darstellung 3.2 Schlüssel und Werte kombinieren

Um ein geeignetes Modell zu trainieren, habe ich beschlossen, andere Gruppen für verschiedene Kategorien zu definieren. Das bedeutet, dass, wenn die Anzahl der Instanzen einer Gruppe in einer bestimmten Kategorie insgesamt unter einem bestimmten Prozentsatz (%) liegt, diese als Teil der anderen hinzugefügt wird.


Beispielsweise haben wir in der Kategorie os :

 {“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}

Da sowohl Linux als auch TempleOS in diesem Fall extrem selten sind, werden sie in einer anderen Gruppe zusammengefasst. Das Endergebnis lautet daher:

 {“MacOS”: 300, “Windows”: 400, “other”: 33}.

Und die „Seltenheit“ wird je nach Kategorie unterschiedlich bestimmt und basiert auf dem dieser Kategorie zugewiesenen Schwellenwert.

Es kann basierend auf den Präferenzen und gewünschten Daten für den Kunden konfiguriert werden

 other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }

Um dies zu erreichen, wurden 2 Funktionen implementiert

 def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values

Ausgabe

 ['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']

Bei der Arbeit mit Modellen für maschinelles Lernen ist es entscheidend, dass die Eingabedaten in einem Format vorliegen, das das Modell verstehen kann. Modelle für maschinelles Lernen erfordern in der Regel numerische Werte (Ganzzahlen, Gleitkommazahlen) und keine komplexen Datenstrukturen wie JSON.


Daher ist es wiederum vorzuziehen, unsere Daten etwas gründlicher vorzuverarbeiten, um dieser Anforderung gerecht zu werden.


Ich habe eine Funktion create_exploded_df erstellt, bei der jedes Feature als separate Spalte dargestellt wird und die Zeilen die entsprechenden numerischen Werte enthalten. (Das ist noch nicht ideal, aber das war die beste Lösung, die ich finden konnte.)


 def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df

Ausgabe

Abbildung 6 - Modellfunktionen 3.3 Stunden eintragen

Ein weiteres Problem mit dem Datenformat, das wir hatten, war, dass, wenn in einer bestimmten Stunde kein Verkehr für ein Projekt vorhanden war, statt einer leeren Zeile überhaupt keine Zeile erstellt wurde. Das ist unpraktisch, wenn man bedenkt, dass das Modell darauf ausgelegt ist, Daten für den kommenden Zeitraum (z. B. die nächste Stunde) vorherzusagen. Es ist jedoch nicht möglich, das Modell für Vorhersagen zu trainieren, wenn für den anfänglichen Zeitraum keine Daten verfügbar sind.


Deshalb habe ich ein Skript geschrieben, das fehlende Stunden findet und leere Zeilen einfügt, wenn eine Stunde übersprungen wird

Abbildung 7 - Eingetragene Stunden

3.4 Zielspalten hinzufügen und verschieben

Beim Modelltraining bestand der primäre Ansatz darin, Daten der letzten Stunde als Ziel für das Modell zu verwenden. Dadurch kann das Modell den zukünftigen Verkehr auf Grundlage der aktuellen Daten vorhersagen.

 def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)

Ausgabe

Figure 8 - Model Predictions









3.5 Aufgeteilte statisticsGathered separaten Spalten zusammengefasst

Der Hauptgrund für einen solchen Ansatz besteht darin, dass statisticsGathered ein datetime war und die Modelle, die ich zu verwenden versucht habe (siehe die folgenden Abschnitte), nicht in der Lage waren, es zu verarbeiten und das richtige Muster zu erkennen.


Das Ergebnis waren schreckliche MSE/MRSE -Kennzahlen. Daher wurde während der Entwicklung entschieden, die Funktionen für day , month und hour zu trennen, was die Ergebnisse deutlich verbesserte.

 def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df

Ausgabe
Figure 9 - Converted statisticsGathered


Und das ist es! Kommen wir nun zum Training selbst! 🎉🎉🎉






4. Lineare Regression

Nun, ich denke, die eigentliche Vorhersage war der schwierigste Teil beim Erstellen dieser Anwendung.

Als Erstes wollte ich versuchen, LinearRegression Modell zu verwenden:


Folgende Funktionen habe ich implementiert:

 def create_model_for_target(train_df, target_series):    X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False)    reg = LinearRegression()    reg.fit(X_train, Y_train)    y_pred = reg.predict(x_test)    return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df):    models_data = dict()    df = df.dropna()    train_df = clear_df(df)    for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]:        models_data[target_name] = create_model_for_target(train_df, df[target_name])    return models_data


Erläuterung

Für jede Zielspalte teilen wir die Daten in Trainings- und Testdatensätze auf. Anschließend trainieren wir ein LinearRegression Modell anhand der Trainingsdaten und treffen Vorhersagen anhand der Testdaten.

Um zu beurteilen, ob die Ergebnisse korrekt sind, habe ich die Funktion hinzugefügt, die die erforderlichen Metriken sammelt und die Ausgabe erzeugt

 def evaluate_models(data):    evaluation = []    for target, results in data.items():        y_test, y_pred = results['y_test'], results['y_pred']        mse = mean_squared_error(y_test, y_pred)        rmse = mean_squared_error(y_test, y_pred) ** 0.5        mae = mean_absolute_error(y_test, y_pred)        mean_y = y_test.mean()        median_y = y_test.median()        evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y})    return pd.DataFrame(evaluation)

Ausgabe

Ich habe ein Skript geschrieben, das die Ausgabe generiert und in einer Excel-Datei gespeichert hat, wobei die Werte mse , rmse , mae und mean_y berücksichtigt wurden

Abbildung 10 - Erste Ergebnisse (ohne Gesamtergebnis)


Wie Sie sehen, sind die Messwerte nicht zufriedenstellend und die vorhergesagten Verkehrsdaten werden alles andere als genau sein und für meine Ziele der Verkehrsprognosen nicht geeignet sein.

Daher habe ich mich entschieden, die Gesamtzahl der Besucher pro Stunde vorherzusagen, sodass die folgenden Funktionen erstellt wurden


 def add_target_column(df, by):  totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1)  df['total'] = totals_series  df[f'total_{by}_target'] = totals_series  return df def shift_target_column(df, by):  df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True)  df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1)  return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]


Ausgabe

Figure 11 - Total Target Diese Funktion verwendet eine bestimmte Kategorie und berechnet darauf basierend die Gesamtzahl der Besucher. Dies funktioniert, weil die Gesamtzahl der Gerätewerte mit der Gesamtzahl der Betriebssystemwerte übereinstimmt.


Mit einem solchen Ansatz zeigte das Modell zehnmal bessere Ergebnisse als zuvor .



5. Schlussfolgerung

In diesem Fall handelt es sich um eine fast akzeptable und einsatzbereite Funktion. Kunden können nun ihre Budgetzuweisung und Serverskalierung abhängig vom Ergebnis dieser Vorhersagen planen.

Figure 12 -Total Results Die Vorhersagen weichen von den tatsächlichen Werten um ca. 2,45 Besucher ab (da RMSE = √MSE ) . Dies kann für die Marketingzwecke keinen wesentlichen negativen Einfluss haben.


Da dieser Artikel ziemlich umfangreich geworden ist und die App noch in der Entwicklung ist, machen wir hier eine Pause. Wir werden diesen Ansatz in Zukunft weiter verfeinern und ich halte Sie auf dem Laufenden!


Vielen Dank fürs Lesen und Ihre Aufmerksamkeit! Ich freue mich auf Ihr Feedback und Ihre Gedanken im Kommentarbereich. Ich hoffe, diese Informationen sind für Ihre Ziele nützlich!


Und viel Glück!