Dans cet article, je souhaite partager mon expérience dans le développement d'un service d'IA pour une plateforme d'analyse Web, appelée Swetrix. Mon objectif était de développer un modèle d'apprentissage automatique qui permettrait de prédire le trafic futur d'un site Web sur la base des données affichées sur la capture d'écran suivante. L'objectif final est d'avoir une vision claire pour le client du trafic qui apparaîtra sur son site Web à l'avenir, lui permettant ainsi d'obtenir de meilleures informations et d'améliorer la planification commerciale en général. 2. Exigences et architecture Au cours de la planification, il a été décidé de poursuivre l'architecture de microservices avec le courtier de messages RabbitMQ pour la communication entre les services IA et API. Tout d'abord, nous devons rassembler les données avec une tâche périodique horaire dans une base de données distincte. Nous avons décidé de choisir un ClickHouse, car les données originales des sites Web sur Swetrix y sont stockées. Les détails sur le format seront abordés dans les prochaines sections. RabbitMQ a été choisi comme courtier de messages en raison de sa simplicité et nous devons établir une communication entre les services IA et API. Décomposons tout et vérifions la logique principale Service API Swetrix : Rassemble des statistiques de données toutes les heures via Cron Task et envoie des données brutes au service AI. Insère et reçoit des données prétraitées de ClickHouse. Service Swetrix-AI : Traite les données brutes et les préférences sélectionnées (intervalle et sous-catégorie) à des fins de prévision. Convertit les données de prévision au format JSON et les renvoie au service API via RabbitMQ. Le service Swetrix-AI utilisera le framework NestJs pour le backend et des scripts Python pour le prétraitement des données et les prédictions de modèles. 3. Prétraitement Nous rassemblons les données suivantes sur les projets dans un tableau . Vous avez déjà vu la version rendue de ces données dans la première section de l'article. analytics J'ai pu obtenir ce résultat (presque acceptable) avec la requête suivante : @Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, ) La fonction est programmée pour s'exécuter toutes les heures à l'aide d'une tâche Cron. Il rassemble et insère des données analytiques dans un clickhouse . analytics.hourly_projects_data Sortir En raison des limitations de ClickHouse, je n'ai pas pu obtenir le format souhaité pour les données. J'ai donc décidé d'utiliser pour compléter le prétraitement, nécessaire à la formation du modèle. pandas Plus précisément, j'ai utilisé Python pour effectuer les opérations suivantes : 3.1 Combiner les clés et les valeurs Combinez les clés et les valeurs liées à une catégorie dans un seul champ JSON, par exemple en combinant les clés et les valeurs des appareils en un seul objet en tant que tel. os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5} Dans: os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5} Joindre le code et la sortie : def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result) Ce format de données ne sera pas utilisé pour la prédiction elle-même, je dirais, mais plutôt pour le stocker dans la base de données et à des fins de débogage afin de vérifier qu'il n'y a pas de valeurs manquantes et, en outre, de vérifier que le modèle produit un résultat précis. résultat. Sortir 3.2 Combiner les clés et les valeurs Pour former un modèle adéquat, j'ai décidé de définir d'autres groupes pour différentes catégories. Ce qui signifie que si globalement le nombre d'instances d'un groupe dans une catégorie spécifique est inférieur à un certain pourcentage (%), il sera ajouté dans l'autre. Par exemple, dans la catégorie nous avons : os {“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10} Étant donné que Linux et TempleOS sont extrêmement rares dans ce cas, ils seront combinés dans , le résultat final sera donc : un autre groupe {“MacOS”: 300, “Windows”: 400, “other”: 33}. Et la « rareté » est déterminée différemment selon la catégorie et en fonction du seuil désigné pour cette catégorie. Il peut être configurable en fonction des préférences et des données souhaitées pour le client other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 } Deux fonctions ont été implémentées pour y parvenir def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values Sortir ['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other'] Lorsque vous travaillez avec des modèles d'apprentissage automatique, il est essentiel que les données d'entrée soient dans un format que le modèle peut comprendre. Les modèles d'apprentissage automatique nécessitent généralement des valeurs numériques (entiers, flottants) plutôt que des structures de données complexes comme JSON. Par conséquent, encore une fois, il est préférable d’effectuer un peu plus de prétraitement de nos données pour répondre à cette exigence. J'ai créé une fonction où chaque fonctionnalité est représentée sous forme de colonne distincte et les lignes contiennent les valeurs numériques correspondantes. (Ce n'est pas encore idéal, mais c'est la meilleure solution que j'ai pu produire) create_exploded_df def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df Sortir 3.3 Remplir les heures Un autre problème avec le format des données que nous avions est que s'il n'y avait pas de trafic pour un projet au cours d'une heure spécifique au lieu de créer une ligne vide, il n'y aurait aucune ligne du tout, ce qui est gênant compte tenu du fait que le modèle est conçu pour prédire les données pour la période à venir (par exemple, l'heure suivante). Cependant, il n’est pas possible d’entraîner le modèle pour faire des prédictions si aucune donnée n’est disponible pour la période initiale. C'est pourquoi j'ai écrit un script qui trouverait les heures manquantes et insérerait des lignes vides lorsqu'une heure est sautée 3.4 Ajouter et déplacer des colonnes cibles Concernant la formation du modèle, l'approche principale consistait à utiliser les données de l'heure précédente comme cible du modèle. Cela permet au modèle de prédire le trafic futur sur la base des données actuelles. def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df) Sortir diviséesRassemblées 3.5 statisticsGathered en colonnes séparées La raison principale d'une telle approche est que était un objet , que les modèles que j'ai essayé d'utiliser (consultez les sections suivantes) ne sont pas capables de le traiter et d'identifier le modèle correct. statisticsGathered datetime Cela a abouti à de terribles mesures . Ainsi, au cours du développement, la décision a été prise de séparer les fonctionnalités pour , et , ce qui a considérablement amélioré les résultats. MSE/MRSE day month hour def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df Sortir Et c'est tout! Passons à la formation elle-même ! 🎉🎉🎉 4. Régression linéaire Eh bien, je suppose que la prédiction réelle a été la partie la plus difficile de la création de cette application. La première chose que je voulais essayer est d'utiliser le modèle : LinearRegression J'ai implémenté les fonctions suivantes : def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data Explication Pour chaque colonne cible, nous divisons les données en ensembles de formation et de test. Nous formons ensuite un modèle sur les données d'entraînement et effectuons des prédictions sur les données de test. LinearRegression Afin d'évaluer que les résultats sont corrects, j'ai ajouté la fonction qui rassemble les métriques requises et produit le résultat. def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation) Sortir J'ai écrit un script qui a généré la sortie et l'a enregistré dans un fichier Excel, comptabilisant les valeurs , , et . mse rmse mae mean_y Comme vous pouvez le constater, les mesures ne sont pas satisfaisantes et les données de trafic prévues seront loin d'être exactes et ne correspondront pas à mes objectifs de prévisions de trafic. Par conséquent, j'ai pris la décision de prédire le nombre total de visiteurs par heure, afin que les fonctions suivantes soient créées def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']] Sortir Cette fonction prend une catégorie spécifique et calcule le nombre total de visiteurs en fonction de celle-ci. Cela fonctionne car le nombre total de valeurs de périphérique serait le même que le nombre total de valeurs de système d'exploitation. . Avec une telle approche, le modèle a montré des résultats 10 fois meilleurs qu'auparavant 5. Conclusion Si nous parlons de ce cas, c'est une fonctionnalité presque acceptable et prête à l'emploi. Les clients peuvent désormais planifier leur allocation budgétaire et la mise à l'échelle de leur serveur en fonction du résultat de ces prévisions. Les prévisions . s'écartent des valeurs réelles d'environ 2,45 visiteurs (puisque RMSE = √MSE ) Ce qui ne peut avoir aucun impact négatif crucial sur les besoins marketing. Comme cet article est devenu assez long et que l'application est toujours en cours de développement, nous ferons une pause ici. Nous continuerons d’affiner cette approche à l’avenir et je vous tiendrai au courant ! Merci d'avoir lu et de votre attention! J'ai hâte d'entendre vos commentaires et vos réflexions dans la section commentaires. J'espère que ces informations s'avéreront utiles pour vos objectifs ! Et bonne chance!