paint-brush
Cómo estoy construyendo una IA para el servicio de análisispor@pro1code1hack
619 lecturas
619 lecturas

Cómo estoy construyendo una IA para el servicio de análisis

por Yehor Dremliuha12m2024/05/23
Read on Terminal Reader

Demasiado Largo; Para Leer

En este artículo quiero compartir mi experiencia en el desarrollo de un servicio de inteligencia artificial para una plataforma de análisis web, llamado Swetrix. Mi objetivo era desarrollar un modelo de aprendizaje automático que predecira el tráfico futuro del sitio web en función de los datos que se muestran en la siguiente captura de pantalla. El objetivo final es tener una visión clara para el cliente del tráfico que aparecerá en su sitio web en el futuro.
featured image - Cómo estoy construyendo una IA para el servicio de análisis
Yehor Dremliuha HackerNoon profile picture
0-item
1-item

En este artículo quiero compartir mi experiencia en el desarrollo de un servicio de inteligencia artificial para una plataforma de análisis web, llamado Swetrix.


Mi objetivo era desarrollar un modelo de aprendizaje automático que pudiera predecir el tráfico futuro del sitio web en función de los datos que se muestran en la siguiente captura de pantalla.

Figura 1 - El proyecto

El objetivo final es tener una visión clara para el cliente del tráfico que aparecerá en su sitio web en el futuro, permitiéndole así obtener mejores conocimientos y mejorar la planificación empresarial en general.

2. Requisitos y arquitectura

Durante la planificación, se tomó la decisión de continuar con la arquitectura de microservicio con el intermediario de mensajes RabbitMQ para la comunicación entre los servicios de IA y API.


Figura 2 - Arquitectura


En primer lugar, necesitamos recopilar datos con una tarea cron cada hora en una base de datos separada. Decidimos elegir ClickHouse, ya que en él se almacenan los datos originales de los sitios web de Swetrix. Los detalles sobre el formato se cubrirán en las siguientes secciones.


Se eligió RabbitMQ como intermediario de mensajes debido a su simplicidad y a la necesidad de establecer una comunicación entre los servicios de IA y API. Analicemos todo y verifiquemos la lógica principal.

Servicio Swetrix-API:

  • Recopila estadísticas de datos cada hora a través de Cron Task y envía datos sin procesar al servicio de IA.
  • Inserta y recibe datos preprocesados de ClickHouse.

Servicio Swetrix-AI:

  • Procesa los datos sin procesar y las preferencias seleccionadas (intervalo y subcategoría) para realizar pronósticos.
  • Convierte los datos de pronóstico al formato JSON y los envía de regreso al servicio API a través de RabbitMQ.


El servicio Swetrix-AI utilizará el marco NestJs para el backend y scripts de Python para el preprocesamiento de datos y las predicciones de modelos.

3. Preprocesamiento

Reunimos los siguientes datos sobre proyectos en una tabla analytics . Figura 3: Datos sin procesar en la base de datos Ya has visto la versión renderizada de estos datos en la primera sección del artículo.

Pude lograr este resultado (casi aceptable) con la siguiente consulta:

 @Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )

La función está programada para ejecutarse cada hora mediante un trabajo cron. Recopila e inserta datos analíticos en un clickhouse analytics.hourly_projects_data .

Producción

Figura 4 - Datos procesados
Debido a las limitaciones de ClickHouse, no pude lograr el formato deseado de los datos. Por lo tanto, decidí utilizar pandas para completar el preprocesamiento necesario para el entrenamiento del modelo.


Específicamente usé Python para hacer lo siguiente:

3.1 Combinar claves y valores

Combine claves y valores relacionados con una categoría en un campo JSON, por ejemplo combinando claves y valores de dispositivos en un objeto como tal.

 os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}

En:

 os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}

Adjuntando el código y la salida:

 def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)


Este formato de datos no se usará para la predicción en sí, yo diría, es más para almacenarlos en la base de datos y con fines de depuración para verificar que no haya valores faltantes y, además, para verificar que el modelo produzca una predicción precisa. resultado.

Producción
Figura 5: Representación de Pandas de datos almacenados 3.2 Combinar claves y valores

Para entrenar un modelo adecuado decidí definir otros grupos para varias categorías. Lo que significa que si globalmente el número de instancias de un grupo en una categoría específica está por debajo de cierto porcentaje (%), se agregará como parte de la otra.


Por ejemplo, en la categoría os tenemos:

 {“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}

Dado que tanto Linux como TempleOS en este caso son extremadamente raros, se combinarán en otro grupo , por lo que el resultado final será:

 {“MacOS”: 300, “Windows”: 400, “other”: 33}.

Y la "rareza" se determina de forma diferente según la categoría y en función del umbral designado para esta categoría.

Puede ser configurable en función de las preferencias y datos deseados por el cliente.

 other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }

Se implementaron 2 funciones para lograr esto.

 def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values

Producción

 ['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']

Cuando se trabaja con modelos de aprendizaje automático, es fundamental que los datos de entrada estén en un formato que el modelo pueda comprender. Los modelos de aprendizaje automático normalmente requieren valores numéricos (enteros, flotantes) en lugar de estructuras de datos complejas como JSON.


Por lo tanto, nuevamente, es preferible realizar un poco más de preprocesamiento de nuestros datos para cumplir con este requisito.


He creado una función create_exploded_df donde cada característica se representa como una columna separada y las filas contienen los valores numéricos correspondientes. (Aún no es ideal, pero fue la mejor solución que pude producir)


 def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df

Producción

Figura 6 - Características del modelo 3.3 Rellenar horas

Otro problema con el formato de datos que tuvimos es que si no hubiera tráfico para un proyecto en una hora específica en lugar de crear una fila en blanco, no habría ninguna fila, lo cual es inconveniente considerando el hecho de que el modelo está diseñado para predecir datos para el próximo período de tiempo (por ejemplo, la próxima hora). Sin embargo, no es factible entrenar el modelo para que haga predicciones si no hay datos disponibles para el período de tiempo inicial.


Por lo tanto, escribí un script que encontraría las horas faltantes e insertaría filas en blanco cuando se omitiera una hora.

Figura 7: Horas completadas

3.4 Agregar y cambiar columnas de destino

Con respecto al entrenamiento del modelo, el enfoque principal fue utilizar datos de la hora anterior como objetivo del modelo. Esto permite que el modelo prediga el tráfico futuro basándose en los datos actuales.

 def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)

Producción

Figure 8 - Model Predictions









3.5 Dividir statisticsGathered reunidas en columnas separadas

La razón principal de este enfoque es que statisticsGathered era un objeto datetime , cuyos modelos que intenté usar (consulte las secciones siguientes) no pueden procesarlo ni identificar el patrón correcto.


Eso resultó en métricas terribles MSE/MRSE . Entonces, durante el desarrollo, se tomó la decisión de separar las funciones por day , month y hour , lo que mejoró significativamente los resultados.

 def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df

Producción
Figure 9 - Converted statisticsGathered


¡Y eso es! ¡Pasemos al entrenamiento en sí! 🎉🎉🎉






4. Regresión lineal

Bueno, supongo que la predicción real fue la parte más desafiante durante la creación de esta aplicación.

Lo primero que quería probar es utilizar el modelo LinearRegression :


Implementé las siguientes funciones:

 def create_model_for_target(train_df, target_series):    X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False)    reg = LinearRegression()    reg.fit(X_train, Y_train)    y_pred = reg.predict(x_test)    return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df):    models_data = dict()    df = df.dropna()    train_df = clear_df(df)    for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]:        models_data[target_name] = create_model_for_target(train_df, df[target_name])    return models_data


Explicación

Para cada columna de destino, dividimos los datos en conjuntos de entrenamiento y prueba. Luego entrenamos un modelo LinearRegression con los datos de entrenamiento y hacemos predicciones sobre los datos de prueba.

Para evaluar que los resultados son correctos, agregué la función que recopila las métricas requeridas y produce el resultado.

 def evaluate_models(data):    evaluation = []    for target, results in data.items():        y_test, y_pred = results['y_test'], results['y_pred']        mse = mean_squared_error(y_test, y_pred)        rmse = mean_squared_error(y_test, y_pred) ** 0.5        mae = mean_absolute_error(y_test, y_pred)        mean_y = y_test.mean()        median_y = y_test.median()        evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y})    return pd.DataFrame(evaluation)

Producción

Escribí un script que generó el resultado y lo guardé en un archivo de Excel, contabilizando los valores mse , rmse , mae y mean_y

Figura 10 - Resultados iniciales (sin total)


Como puede ver, las métricas no son satisfactorias y los datos de tráfico previstos estarán lejos de ser precisos y no serán adecuados para mis objetivos de pronósticos de tráfico.

Por lo tanto, tomé la decisión de predecir los totales de visitantes por hora, de modo que se crearon las siguientes funciones


 def add_target_column(df, by):  totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1)  df['total'] = totals_series  df[f'total_{by}_target'] = totals_series  return df def shift_target_column(df, by):  df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True)  df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1)  return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]


Producción

Figure 11 - Total Target Esta función toma una categoría específica y calcula el total de visitantes en función de ella. Esto funciona porque la cantidad total de valores del dispositivo sería la misma que la cantidad total de valores del sistema operativo.


Con este enfoque, el modelo mostró resultados 10 veces mejores que antes .



5. Conclusión

Si hablamos de este caso, es una característica casi aceptable y lista para usar. Los clientes ahora pueden planificar la asignación de su presupuesto y la ampliación del servidor en función del resultado de estas predicciones.

Figure 12 -Total Results Las predicciones se desvían de los valores reales en aproximadamente 2,45 visitantes (ya que RMSE = √MSE ) . Lo cual no puede tener ningún impacto negativo y crucial para las necesidades de marketing.


Como este artículo se ha vuelto bastante extenso y la aplicación aún está en desarrollo, haremos una pausa aquí. ¡Continuaremos perfeccionando este enfoque en el futuro y los mantendremos informados!


¡Gracias por leer y tu atención! Espero escuchar sus comentarios y opiniones en la sección de comentarios. ¡Espero que esta información te resulte útil para tus objetivos!


¡Y buena suerte!