paint-brush
Construyamos un pipeline de MLOps con Databricks y Spark (parte 2)por@neshom
Nueva Historia

Construyamos un pipeline de MLOps con Databricks y Spark (parte 2)

por Mohsen Jadidi42m2024/12/29
Read on Terminal Reader

Demasiado Largo; Para Leer

En la segunda parte de este blog, veremos cómo Databricks nos permite realizar implementaciones por lotes y servicios en línea. Dedicaremos un tiempo a configurar paneles de control de monitoreo de datos y modelos.
featured image - Construyamos un pipeline de MLOps con Databricks y Spark (parte 2)
Mohsen Jadidi HackerNoon profile picture
0-item
1-item
2-item

En la primera parte de esta serie de tutoriales , dimos los primeros pasos para crear una canalización de MLOps de extremo a extremo utilizando Databricks y Spark, guiados por la arquitectura de referencia de Databricks. A continuación, se incluye un resumen de los pasos clave que cubrimos:


  • Configuración del Catálogo de Unity para la arquitectura Medallion : organizamos nuestros datos en capas de bronce, plata y oro dentro del Catálogo de Unity, estableciendo un sistema de gestión de datos estructurado y eficiente.

  • Ingesta de datos en el catálogo de Unity : demostramos cómo importar datos sin procesar al sistema, garantizando la consistencia y la calidad para las etapas de procesamiento posteriores.

  • Entrenamiento del modelo : utilizando Databricks, entrenamos un modelo de aprendizaje automático adaptado a nuestro conjunto de datos, siguiendo las mejores prácticas para el desarrollo de modelos escalables y efectivos.

  • Ajuste de hiperparámetros con HyperOpt : para mejorar el rendimiento del modelo, empleamos HyperOpt para automatizar la búsqueda de hiperparámetros óptimos, mejorando la precisión y la eficiencia.

  • Seguimiento de experimentos con Databricks MLflow : utilizamos MLflow para registrar y monitorear nuestros experimentos, manteniendo un registro completo de versiones del modelo, métricas y parámetros para una fácil comparación y reproducibilidad.


Una vez completados estos pasos básicos, su modelo ya está listo para su implementación. En esta segunda parte, nos centraremos en integrar dos componentes críticos en nuestro sistema:


  1. Inferencia por lotes : implementación del procesamiento por lotes para generar predicciones en grandes conjuntos de datos, adecuado para aplicaciones como puntuación masiva e informes periódicos.
  2. Inferencia en línea (servicio de modelos) : configuración del servicio de modelos en tiempo real para proporcionar predicciones inmediatas, esenciales para aplicaciones y servicios interactivos.
  3. Monitoreo de modelos: para garantizar que los modelos implementados mantengan un rendimiento y una confiabilidad óptimos a lo largo del tiempo.


¡Vamos a ello!

Implementación del modelo

El punto de partida del último blog fue la evaluación del modelo. Ahora imaginemos que hicimos la comparación y descubrimos que nuestro modelo muestra un rendimiento superior en comparación con este modelo de producción. Como queremos (suponemos) utilizar el modelo en producción, queremos aprovechar todos los datos que tenemos. El siguiente paso es entrenar y probar el modelo utilizando el conjunto de datos completo. Luego, conservar nuestro modelo para su uso posterior implementándolo como nuestro modelo campeón. Dado que este es el modelo final que queremos utilizar para la inferencia, utilizamos el cliente de ingeniería de características para entrenar el modelo. De esta manera, no solo hacemos un seguimiento más fácil del linaje del modelo, sino que también descargamos la validación del esquema y la transformación de características (si corresponde) al cliente.


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


También podemos utilizar las API de Feature Store o Feature Engineering para entrenar y registrar los modelos.

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


Cuando usamos la API de ingeniería de características, podemos ver el linaje del modelo en el Explorador de catálogo

Linaje de datos en el catálogo de Unity de Dataticks


Ahora actualicemos la descripción del modelo y asignémosle una etiqueta de Campeón.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Ahora siga adelante y verifique el esquema en el que registró el modelo. Debería ver todas sus actualizaciones de la siguiente manera

Registro de modelos en el catálogo de Unity de Databricks

Etapas del modelo : si usa el espacio de trabajo para el registro de modelos, debe usar etapas para administrar sus modelos. El uso de alias no funcionará. Consulte aquí Para ver cómo funciona

Inferencia del modelo

Puntuación por lotes

Ahora imaginemos que queremos utilizar nuestro modelo en producción para inferencia. En este paso cargamos el modelo campeón y lo utilizamos para generar 20 recomendaciones de películas para cada usuario.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


y puedes ver que usamos los mismos datos de entrenamiento para la puntuación por lotes. Aunque en el caso de los sistemas de recomendación tiene sentido, en la mayoría de las aplicaciones queremos usar el modelo para puntuar algunos datos no vistos. Por ejemplo, imagina que estás en Netflix y quieres actualizar las recomendaciones de los usuarios al final del día en función de su nueva lista de seguimiento. Podemos programar un trabajo que ejecute la puntuación por lotes en un momento específico al final del día.


Ahora podemos continuar y generar las recomendaciones para cada usuario. Para ello, buscamos los 20 artículos más populares por usuario.

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


Así es como se ve el resultado

Finalmente, podemos almacenar la predicción como una etiqueta delta en nuestro UC o publicarla en un Mongo DB o Azure Cosmos DB de sistemas posteriores. Optamos por la primera opción.


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Inferencia en línea/transmisión

Ahora imaginemos un caso en el que queremos actualizar nuestras recomendaciones en función de las interacciones de los usuarios en tiempo real. Para este caso, podemos utilizar el servicio de modelos. Cuando alguien quiere utilizar su modelo, puede enviar datos al servidor. El servidor, a su vez, envía esos datos a su modelo implementado, que entra en acción, analiza los datos y genera una predicción. Se pueden utilizar en aplicaciones web, aplicaciones móviles o incluso sistemas integrados. Una de las aplicaciones de este enfoque es habilitar el enrutamiento del tráfico para las pruebas A/B.


El algoritmo ALS no se puede utilizar directamente para la inferencia en línea, ya que requiere volver a entrenar el modelo utilizando todos los datos (antiguos y nuevos) para actualizar las recomendaciones. Los algoritmos de aprendizaje de descenso de gradiente son ejemplos de modelos que se pueden utilizar para actualizaciones en línea. Es posible que analicemos algunos de estos algoritmos en futuras publicaciones.


Sin embargo, solo para ilustrar cómo funcionaría dicho modelo, estamos creando un punto final de servicio de modelo (inútil) que predice la calificación de películas en función del momento en que un usuario califica una película.


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


Esto creará y activará el clúster de servicio de modelos para nosotros, por lo que lleva algo de tiempo. Ahora, si abre la ventana Serving , debería ver su punto final.


Podemos usar un punto final para atender a varios modelos. Luego, podemos usar el enrutamiento de tráfico para escenarios como pruebas A/B o comparar el rendimiento de diferentes modelos en la producción.

Tabla de inferencia

Las tablas de inferencia en Databricks Model Serving actúan como un registro automático para nuestros modelos implementados. Cuando están habilitadas, capturan las solicitudes entrantes (datos enviados para predicción), las salidas del modelo correspondientes (predicciones) y otros metadatos como una tabla Delta dentro del Catálogo de Unity. Podemos usar la tabla de inferencia para monitoreo y depuración , seguimiento de linaje y un procedimiento de recopilación de datos para volver a entrenar o ajustar nuestros modelos.


Podemos habilitar la inference table en nuestro punto final de servicio para monitorear el modelo. Podemos hacerlo especificando las propiedades auto_capture_config en la carga útil cuando creamos el punto final por primera vez. O actualizamos nuestro punto final después usando el comando put y la URL del punto final config de la siguiente manera (más información aquí )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


Ahora, vamos a alimentar el punto final con algunos datos ficticios de interacción del usuario.

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


Podemos consultar los registros de los puntos finales en la tabla <catalog>.<schema>.<payload_table> . Se necesitan aproximadamente 10 minutos hasta que se puedan ver los datos en la tabla.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


Deberías ver algo como esto en tu tabla de carga útil.

Tabla de carga útil de servicio del modelo Databricks


Para comprender el esquema de esta tabla de inferencia, consulte “Esquema de la tabla de inferencia del catálogo de Unity==” aquí .==


Monitoreo de modelos

El monitoreo de modelos y datos es un tema complejo que requiere mucho tiempo para dominarlo. Databricks Lakehouse Monitoring (DLM) reduce los gastos generales que implica construir un sistema de monitoreo adecuado al proporcionar plantillas estándar y personalizables para casos de uso comunes. Sin embargo, dominar DLM y el monitoreo de modelos en general requiere mucha experimentación. No quiero darle una descripción general extensa del monitoreo de modelos aquí, sino más bien darle un punto de partida. Es posible que dedique un blog a este tema en el futuro.


Un breve resumen de las funciones y características de DLM

Ahora que tenemos nuestro modelo en funcionamiento, podemos usar la tabla de inferencia generada por nuestro punto de conexión de servicio para monitorear métricas clave, como el rendimiento y la desviación del modelo, para detectar cualquier desviación o anomalía en nuestros datos o en el modelo a lo largo del tiempo. Este enfoque proactivo nos ayuda a tomar medidas correctivas oportunas, como volver a entrenar el modelo o actualizar sus funciones, para mantener un rendimiento óptimo y la alineación con los objetivos comerciales.


Arquitectura de datos de monitoreo de Databricks Lakehouse Fuente: Databricks


DLM ofrece tres tipos de análisis o profile type : series temporales , instantáneas e inferencia . Dado que nos interesa analizar nuestra tabla de inferencia, nos centraremos en esta última. Para utilizar una tabla para el seguimiento (nuestra “ tabla principal ”), debemos asegurarnos de que la tabla tenga la estructura adecuada. Para la tabla de inferencia , cada fila debe corresponder a una solicitud con las siguientes columnas:

  • Características del modelo

  • predicción del modelo

  • Identificación del modelo

  • timestamp : marca de tiempo de la solicitud de inferencia

  • verdad fundamental (opcional)


El ID del modelo es importante en los casos en los que ofrecemos varios modelos y queremos hacer un seguimiento del rendimiento de cada uno de ellos en un único panel de control. Si hay más de un ID de modelo disponible, DLM lo utiliza para segmentar los datos y calcular métricas y estadísticas para cada segmento por separado.


DLM calcula cada estadística y métrica para un intervalo de tiempo específico. Para el análisis de inferencia, utilizó la columna de marca de tiempo , además de un tamaño de ventana definido por el usuario para identificar las ventanas de tiempo. Más información a continuación.


DLM admite dos problem type para las tablas de inferencia: “ clasificación ” o “ regresión ”. Calcula algunas de las métricas y estadísticas relevantes en función de esta especificación.


Para utilizar DLM, debemos crear un monitor y adjuntarlo a una tabla. Al hacer esto, DLM crea dos metric tables :

  • Tabla de métricas de perfil : esta tabla contiene estadísticas resumidas como mínimo, máximo, porcentaje de valores nulos y ceros. También contiene métricas adicionales basadas en el tipo de problema definido por el usuario. Por ejemplo, precisión , recuperación y f1_score para los modelos de clasificación, y error_cuadrado_medio y error_promedio_medio para los modelos de regresión.

  • Tabla de métricas de deriva : contiene estadísticas que miden cómo ha cambiado la distribución de datos a lo largo del tiempo o en relación con un valor de referencia (si se proporciona) . Calcula medidas como la prueba de Chi-cuadrado y la prueba KS.


Para ver la lista completa de métricas de cada tabla, consulte la página de documentación de la tabla de métricas del monitor . También es posible crear métricas personalizadas .


Un aspecto importante de la creación de un sistema de monitoreo es asegurarse de que nuestro panel de monitoreo tenga acceso a los últimos datos de inferencia a medida que llegan. Para eso, podemos usar la transmisión de tablas Delta para realizar un seguimiento de las filas procesadas en la tabla de inferencia. Usamos la tabla de inferencia del servidor de modelos como nuestra tabla de origen ( readStream ), y la tabla de monitoreo como la tabla de destino ( writeStream ). También nos aseguramos de que la captura de datos de cambios (CDC) esté habilitada en ambas tablas (está habilitada de manera predeterminada en la tabla de inferencia). De esta manera, procesamos solo los cambios (insertar/actualizar/eliminar) en la tabla de origen en lugar de volver a procesar la tabla completa en cada actualización.

Práctica

Para habilitar el monitoreo sobre nuestra tabla de inferencia realizamos los siguientes pasos:

  1. Lea la tabla de inferencia como una tabla de transmisión
  2. Cree una nueva tabla delta con el esquema correcto descomprimiendo la tabla de inferencia generada por nuestro punto final de servicio del modelo.
  3. Preparar la tabla de referencia (si la hay)
  4. Crea un monitor sobre la tabla resultante y actualiza la métrica
  5. Programe un flujo de trabajo para descomprimir la tabla de inferencia en la estructura correcta y actualizar las métricas


Primero, debemos instalar la API de monitoreo de Lakehouse. Debería estar instalada si usa Databricks rum time 15.3 LTS o superior:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Leamos la tabla de inferencia como una tabla de transmisión

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


A continuación, tenemos que poner la tabla en el formato correcto, como se describe arriba. Esta tabla debe tener una fila para cada predicción con las características y el valor de predicción correspondientes. La tabla de inferencia que obtenemos del punto de conexión que sirve al modelo almacena las solicitudes y respuestas del punto de conexión como un formato JSON anidado. Aquí hay un ejemplo de la carga útil JSON para la columna de solicitud y respuesta.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


Para descomprimir esta tabla en el esquema correcto, podemos utilizar el siguiente código que está adaptado de la documentación de Databricks ( Cuaderno de inicio de Inference table Lakehouse Monitoring ).


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


La tabla resultante se vería así:

Tabla de carga útil descomprimida

A continuación debemos inicializar nuestra tabla de receptores

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


y escribe los resultados

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Por último, creamos nuestra tabla de referencia. DLM utiliza esta tabla para calcular las desviaciones comparando la distribución de columnas similares de los modelos de referencia y primario. La tabla de referencia debe tener la misma columna de características que la columna primaria, así como la misma columna de identificación del modelo. Para la tabla de referencia, utilizamos la tabla de predicción de nuestro conjunto de datos de validación que almacenamos antes, después de entrenar nuestro modelo utilizando el mejor hiperparámetro. Para calcular la métrica de desviaciones, Databricks calcula las métricas de perfil tanto para la tabla primaria como para la de referencia. Aquí puede leer sobre la tabla primaria y la tabla de referencia .


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Ahora estamos listos para crear nuestro panel de monitoreo. Podemos hacerlo usando la interfaz de usuario o la API de monitoreo de Lakehouse. Aquí usamos la segunda opción:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


Después de ejecutar el código, Databricks tarda un tiempo en calcular todas las métricas. Para ver el panel, vaya a la pestaña Quality de la tabla de receptores (es decir, unpacked_requests_table_name ). Debería ver una página como la siguiente.

Vista de monitoreo de modelos de Databricks


Si hace clic en Ver refresh history verá las actualizaciones en ejecución, pendientes y pasadas. Haga clic en View Dashboard para abrir su panel.

Panel de control de supervisión de modelos de Databricks



Comenzamos con la tabla de inferencia ( my_endpoint_payload ), la procesamos y guardamos el resultado en my_endpoint_payload_unpacked y pasamos esta tabla junto con nuestra tabla de referencia ( base_table_als ) a nuestra API de monitoreo. El DLM calcula las métricas de perfil para cada tabla ( my_endpoint_payload_unpacked_profile_metric ) y las usa para calcular las métricas de desviación ( my_endpoint_payload_unpacked_drift_metrics ).


¡Ahí lo tienes! ¡Tienes todo lo que necesitas para servir y monitorear tu modelo!


¡En la siguiente parte te mostraré cómo automatizar este proceso usando Databricks Assets Bundle y Gitlab !

En la primera parte de esta serie de tutoriales , dimos los primeros pasos para crear una canalización de MLOps de extremo a extremo utilizando Databricks y Spark, guiados por la arquitectura de referencia de Databricks. A continuación, se incluye un resumen de los pasos clave que cubrimos:


  • Configuración del Catálogo de Unity para la arquitectura Medallion : organizamos nuestros datos en capas de bronce, plata y oro dentro del Catálogo de Unity, estableciendo un sistema de gestión de datos estructurado y eficiente.

  • Ingesta de datos en el catálogo de Unity : demostramos cómo importar datos sin procesar al sistema, garantizando la consistencia y la calidad para las etapas de procesamiento posteriores.

  • Entrenamiento del modelo : utilizando Databricks, entrenamos un modelo de aprendizaje automático adaptado a nuestro conjunto de datos, siguiendo las mejores prácticas para el desarrollo de modelos escalables y efectivos.

  • Ajuste de hiperparámetros con HyperOpt : para mejorar el rendimiento del modelo, empleamos HyperOpt para automatizar la búsqueda de hiperparámetros óptimos, mejorando la precisión y la eficiencia.

  • Seguimiento de experimentos con Databricks MLflow : utilizamos MLflow para registrar y monitorear nuestros experimentos, manteniendo un registro completo de versiones del modelo, métricas y parámetros para una fácil comparación y reproducibilidad.


Una vez completados estos pasos básicos, su modelo ya está listo para su implementación. En esta segunda parte, nos centraremos en integrar dos componentes críticos en nuestro sistema:


  1. Inferencia por lotes : implementación del procesamiento por lotes para generar predicciones en grandes conjuntos de datos, adecuado para aplicaciones como puntuación masiva e informes periódicos.
  2. Inferencia en línea (servicio de modelos) : configuración del servicio de modelos en tiempo real para proporcionar predicciones inmediatas, esenciales para aplicaciones y servicios interactivos.
  3. Monitoreo de modelos: para garantizar que los modelos implementados mantengan un rendimiento y una confiabilidad óptimos a lo largo del tiempo.


¡Vamos a ello!

Implementación del modelo

El punto de partida del último blog fue la evaluación del modelo. Ahora imaginemos que hicimos la comparación y descubrimos que nuestro modelo muestra un rendimiento superior en comparación con este modelo de producción. Como queremos (suponemos) utilizar el modelo en producción, queremos aprovechar todos los datos que tenemos. El siguiente paso es entrenar y probar el modelo utilizando el conjunto de datos completo. Luego, conservar nuestro modelo para su uso posterior implementándolo como nuestro modelo campeón. Dado que este es el modelo final que queremos utilizar para la inferencia, utilizamos el cliente de ingeniería de características para entrenar el modelo. De esta manera, no solo hacemos un seguimiento más fácil del linaje del modelo, sino que también descargamos la validación del esquema y la transformación de características (si corresponde) al cliente.


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


También podemos utilizar las API de Feature Store o Feature Engineering para entrenar y registrar los modelos.

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


Cuando usamos la API de ingeniería de características, podemos ver el linaje del modelo en el Explorador de catálogo

Linaje de datos en el catálogo de Unity de Dataticks


Ahora actualicemos la descripción del modelo y asignémosle una etiqueta de Campeón.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Ahora siga adelante y verifique el esquema en el que registró el modelo. Debería ver todas sus actualizaciones de la siguiente manera

Registro de modelos en el catálogo de Unity de Databricks

Etapas del modelo : si usa el espacio de trabajo para el registro de modelos, debe usar etapas para administrar sus modelos. El uso de alias no funcionará. Consulte aquí Para ver cómo funciona

Inferencia del modelo

Puntuación por lotes

Ahora imaginemos que queremos utilizar nuestro modelo en producción para inferencia. En este paso cargamos el modelo campeón y lo utilizamos para generar 20 recomendaciones de películas para cada usuario.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


y puedes ver que usamos los mismos datos de entrenamiento para la puntuación por lotes. Aunque en el caso de los sistemas de recomendación tiene sentido, en la mayoría de las aplicaciones queremos usar el modelo para puntuar algunos datos no vistos. Por ejemplo, imagina que estás en Netflix y quieres actualizar las recomendaciones de los usuarios al final del día en función de su nueva lista de seguimiento. Podemos programar un trabajo que ejecute la puntuación por lotes en un momento específico al final del día.


Ahora podemos continuar y generar las recomendaciones para cada usuario. Para ello, buscamos los 20 artículos más populares por usuario.

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


Así es como se ve el resultado

Finalmente, podemos almacenar la predicción como una etiqueta delta en nuestro UC o publicarla en un Mongo DB o Azure Cosmos DB de sistemas posteriores. Optamos por la primera opción.


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Inferencia en línea/transmisión

Ahora imaginemos un caso en el que queremos actualizar nuestras recomendaciones en función de las interacciones de los usuarios en tiempo real. Para este caso, podemos utilizar el servicio de modelos. Cuando alguien quiere utilizar su modelo, puede enviar datos al servidor. El servidor, a su vez, envía esos datos a su modelo implementado, que entra en acción, analiza los datos y genera una predicción. Se pueden utilizar en aplicaciones web, aplicaciones móviles o incluso sistemas integrados. Una de las aplicaciones de este enfoque es habilitar el enrutamiento del tráfico para las pruebas A/B.


El algoritmo ALS no se puede utilizar directamente para la inferencia en línea, ya que requiere volver a entrenar el modelo utilizando todos los datos (antiguos y nuevos) para actualizar las recomendaciones. Los algoritmos de aprendizaje de descenso de gradiente son ejemplos de modelos que se pueden utilizar para actualizaciones en línea. Es posible que analicemos algunos de estos algoritmos en futuras publicaciones.


Sin embargo, solo para ilustrar cómo funcionaría dicho modelo, estamos creando un punto final de servicio de modelo (inútil) que predice la calificación de películas en función del momento en que un usuario califica una película.


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


Esto creará y activará el clúster de servicio de modelos para nosotros, por lo que lleva algo de tiempo. Ahora, si abre la ventana Serving , debería ver su punto final.


Podemos usar un punto final para atender a varios modelos. Luego, podemos usar el enrutamiento de tráfico para escenarios como pruebas A/B o comparar el rendimiento de diferentes modelos en la producción.

Tabla de inferencia

Las tablas de inferencia en Databricks Model Serving actúan como un registro automático para nuestros modelos implementados. Cuando están habilitadas, capturan las solicitudes entrantes (datos enviados para predicción), las salidas del modelo correspondientes (predicciones) y otros metadatos como una tabla Delta dentro del Catálogo de Unity. Podemos usar la tabla de inferencia para monitoreo y depuración , seguimiento de linaje y un procedimiento de recopilación de datos para volver a entrenar o ajustar nuestros modelos.


Podemos habilitar la inference table en nuestro punto final de servicio para monitorear el modelo. Podemos hacerlo especificando las propiedades auto_capture_config en la carga útil cuando creamos el punto final por primera vez. O actualizamos nuestro punto final después usando el comando put y la URL del punto final config de la siguiente manera (más información aquí )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


Ahora, vamos a alimentar el punto final con algunos datos ficticios de interacción del usuario.

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


Podemos consultar los registros de los puntos finales en la tabla <catalog>.<schema>.<payload_table> . Se necesitan aproximadamente 10 minutos hasta que se puedan ver los datos en la tabla.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


Deberías ver algo como esto en tu tabla de carga útil.

Tabla de carga útil de servicio del modelo Databricks


Para comprender el esquema de esta tabla de inferencia, consulte “Esquema de tabla de inferencia del catálogo de Unity==” aquí .==


Monitoreo de modelos

El monitoreo de modelos y datos es un tema complejo que requiere mucho tiempo para dominarlo. Databricks Lakehouse Monitoring (DLM) reduce los gastos generales que implica construir un sistema de monitoreo adecuado al proporcionar plantillas estándar y personalizables para casos de uso comunes. Sin embargo, dominar DLM y el monitoreo de modelos en general requiere mucha experimentación. No quiero darle una descripción general extensa del monitoreo de modelos aquí, sino más bien darle un punto de partida. Es posible que dedique un blog a este tema en el futuro.


Un breve resumen de las funciones y características de DLM

Ahora que tenemos nuestro modelo en funcionamiento, podemos usar la tabla de inferencia generada por nuestro punto de conexión de servicio para monitorear métricas clave, como el rendimiento y la desviación del modelo, para detectar cualquier desviación o anomalía en nuestros datos o en el modelo a lo largo del tiempo. Este enfoque proactivo nos ayuda a tomar medidas correctivas oportunas, como volver a entrenar el modelo o actualizar sus funciones, para mantener un rendimiento óptimo y la alineación con los objetivos comerciales.


Arquitectura de datos de monitoreo de Databricks Lakehouse Fuente: Databricks


DLM ofrece tres tipos de análisis o profile type : series temporales , instantáneas e inferencia . Dado que nos interesa analizar nuestra tabla de inferencia, nos centraremos en esta última. Para utilizar una tabla para el seguimiento (nuestra “ tabla principal ”), debemos asegurarnos de que la tabla tenga la estructura adecuada. Para la tabla de inferencia , cada fila debe corresponder a una solicitud con las siguientes columnas:

  • Características del modelo

  • predicción del modelo

  • Identificación del modelo

  • timestamp : marca de tiempo de la solicitud de inferencia

  • verdad fundamental (opcional)


El ID del modelo es importante en los casos en los que ofrecemos varios modelos y queremos hacer un seguimiento del rendimiento de cada uno de ellos en un único panel de control. Si hay más de un ID de modelo disponible, DLM lo utiliza para segmentar los datos y calcular métricas y estadísticas para cada segmento por separado.


DLM calcula cada estadística y métrica para un intervalo de tiempo específico. Para el análisis de inferencia, utilizó la columna de marca de tiempo , además de un tamaño de ventana definido por el usuario para identificar las ventanas de tiempo. Más información a continuación.


DLM admite dos problem type para las tablas de inferencia: “ clasificación ” o “ regresión ”. Calcula algunas de las métricas y estadísticas relevantes en función de esta especificación.


Para utilizar DLM, debemos crear un monitor y adjuntarlo a una tabla. Al hacer esto, DLM crea dos metric tables :

  • Tabla de métricas de perfil : esta tabla contiene estadísticas resumidas como mínimo, máximo, porcentaje de valores nulos y ceros. También contiene métricas adicionales basadas en el tipo de problema definido por el usuario. Por ejemplo, precisión , recuperación y f1_score para los modelos de clasificación, y error_cuadrado_medio y error_promedio_medio para los modelos de regresión.

  • Tabla de métricas de deriva : contiene estadísticas que miden cómo ha cambiado la distribución de datos a lo largo del tiempo o en relación con un valor de referencia (si se proporciona) . Calcula medidas como la prueba de Chi-cuadrado y la prueba KS.


Para ver la lista completa de métricas de cada tabla, consulte la página de documentación de la tabla de métricas del monitor . También es posible crear métricas personalizadas .


Un aspecto importante de la creación de un sistema de monitoreo es asegurarse de que nuestro panel de monitoreo tenga acceso a los últimos datos de inferencia a medida que llegan. Para eso, podemos usar la transmisión de tablas Delta para realizar un seguimiento de las filas procesadas en la tabla de inferencia. Usamos la tabla de inferencia del servidor de modelos como nuestra tabla de origen ( readStream ), y la tabla de monitoreo como la tabla de destino ( writeStream ). También nos aseguramos de que la captura de datos de cambios (CDC) esté habilitada en ambas tablas (está habilitada de manera predeterminada en la tabla de inferencia). De esta manera, procesamos solo los cambios (insertar/actualizar/eliminar) en la tabla de origen en lugar de volver a procesar la tabla completa en cada actualización.

Práctica

Para habilitar el monitoreo sobre nuestra tabla de inferencia realizamos los siguientes pasos:

  1. Lea la tabla de inferencia como una tabla de transmisión
  2. Cree una nueva tabla delta con el esquema correcto descomprimiendo la tabla de inferencia generada por nuestro punto final de servicio del modelo.
  3. Preparar la tabla de referencia (si la hay)
  4. Crea un monitor sobre la tabla resultante y actualiza la métrica
  5. Programe un flujo de trabajo para descomprimir la tabla de inferencia en la estructura correcta y actualizar las métricas


Primero, debemos instalar la API de monitoreo de Lakehouse. Debería estar instalada si usa Databricks rum time 15.3 LTS o superior:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Leamos la tabla de inferencia como una tabla de transmisión

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


A continuación, tenemos que poner la tabla en el formato correcto, como se describe arriba. Esta tabla debe tener una fila para cada predicción con las características y el valor de predicción correspondientes. La tabla de inferencia que obtenemos del punto de conexión que sirve al modelo almacena las solicitudes y respuestas del punto de conexión como un formato JSON anidado. Aquí hay un ejemplo de la carga útil JSON para la columna de solicitud y respuesta.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


Para descomprimir esta tabla en el esquema correcto, podemos utilizar el siguiente código que está adaptado de la documentación de Databricks ( Cuaderno de inicio de Inference table Lakehouse Monitoring ).


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


La tabla resultante se vería así:

Tabla de carga útil descomprimida

A continuación debemos inicializar nuestra tabla de receptores

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


y escribe los resultados

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Por último, creamos nuestra tabla de referencia. DLM utiliza esta tabla para calcular las desviaciones comparando la distribución de columnas similares de los modelos de referencia y primario. La tabla de referencia debe tener la misma columna de características que la columna primaria, así como la misma columna de identificación del modelo. Para la tabla de referencia, utilizamos la tabla de predicción de nuestro conjunto de datos de validación que almacenamos antes, después de entrenar nuestro modelo utilizando el mejor hiperparámetro. Para calcular la métrica de desviaciones, Databricks calcula las métricas de perfil tanto para la tabla primaria como para la de referencia. Aquí puede leer sobre la tabla primaria y la tabla de referencia .


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Ahora estamos listos para crear nuestro panel de monitoreo. Podemos hacerlo usando la interfaz de usuario o la API de monitoreo de Lakehouse. Aquí usamos la segunda opción:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


Después de ejecutar el código, Databricks tarda un tiempo en calcular todas las métricas. Para ver el panel, vaya a la pestaña Quality de la tabla de receptores (es decir, unpacked_requests_table_name ). Debería ver una página como la siguiente.

Vista de monitoreo de modelos de Databricks


Si hace clic en Ver refresh history verá las actualizaciones en ejecución, pendientes y pasadas. Haga clic en View Dashboard para abrir su panel.

Panel de control de supervisión de modelos de Databricks



Comenzamos con la tabla de inferencia ( my_endpoint_payload ), la procesamos y guardamos el resultado en my_endpoint_payload_unpacked y pasamos esta tabla junto con nuestra tabla de referencia ( base_table_als ) a nuestra API de monitoreo. El DLM calcula las métricas de perfil para cada tabla ( my_endpoint_payload_unpacked_profile_metric ) y las usa para calcular las métricas de desviación ( my_endpoint_payload_unpacked_drift_metrics ).


¡Ahí lo tienes! ¡Tienes todo lo que necesitas para servir y monitorear tu modelo!


¡En la siguiente parte te mostraré cómo automatizar este proceso usando Databricks Assets Bundle y Gitlab !