Aka yatichäw serien nayrïr t’aqapanx nayrïr thakhinak lurapxta, mä pipeline MLOps tukuyat tukuykama lurañataki, Databricks ukat Spark uka apnaqasa, Databricks ukan arquitectura de referencia ukamp irpata. Akax mä recapitulación ukawa, kuna jach’a amtawinaktix uñakipt’awayktan ukanakxa:
Catálogo de Unidad ukax Arquitectura de Medallón ukatakiw utt’ayata : Jiwasan yatiyawinakasax bronce, qullqi ukat quri capas ukanakaruw Catálogo de Unidad ukan wakicht’apxta, mä sistema de gestión de datos estructurado ukat eficiente ukham utt’ayasa.
Ingesting Data into Unity Catalogue : Jiwasax uñacht’ayapxtwa kunjams crudo datos ukax sistema ukar apkatasispa, ukhamat mä consistencia ukat calidad ukax qhipa etapas de procesamiento ukataki.
Modelo yatichaña : Databricks apnaqasa, mä modelo de aprendizaje automático ukar yatichapxta, ukax jiwasan conjunto de datos ukarjam wakicht’atawa, ukax suma lurawinakampiw modelo escalable ukat askinjam lurañataki.
Sintonización de hiperparámetros con HyperOpt : Modelo ukan lurawip jach’anchayañatakixa, HyperOpt ukampiwa irnaqapxta, ukhamata hiperparámetros óptimos ukanaka thaqhañataki, ukhamata chiqapa ukhamaraki suma lurañataki.
Databricks MLflow ukampiw yant’äwinak arknaqañax : MLflow ukampiw yant’äwinakas qillqt’añataki ukat uñjañatakis apnaqapxta, mä jach’a qillqt’äwiw modelo ukan versión, métricas ukat parámetros ukanakat jasak chiqanchañataki ukat wasitat uñstayañataki.
Aka fundamental lurawinakampi tukuyatatxa, modelo ukax jichhax primed ukhamawa despliegue ukataki. Aka payïr t’aqapanx pä componentes criticos ukanakaw sistema ukar mayacht’añatak uñakipt’atäni:
¡Ukar mantañäni!
Qhipa blog ukan saräwipax modelo de evaluación ukawa. Jichhax amuyt’añäni, uñakipt’awix lurawaytanwa ukatx jikxatawayktanwa jiwasan modelo ukax mä jach’a lurawi uñacht’ayi aka modelo de producción ukar uñtasita. Kunjamatix jiwasax (asumi) modelo ukax producción ukan apnaqañ munktanxa, taqi datos ukanak aprovechañ munapxtan kunatix utjkistu ukanakxa. Jutïr amtawix modelo ukar yatichañawa ukat yant’añawa, ukax taqpach conjunto de datos ukampiw lurasi. Ukatx jiwasan modelo ukax qhipa apnaqañatakiw ch’amanchasi, ukax jiwasan modelo campeón ukham uñstayasa. Niyakixay akax qhipa modelo ukax inferencia ukatak apnaqañ munstanxa, ukax cliente de Ingeniería de Características ukampiw modelo ukar yatichañatak apnaqapxta. Ukhamatx janiw linaje modelo ukak jasak arknaqktanti, jan ukasti esquema validación ukat características transformación (ukax utjchi ukhax) cliente ukaruw descargar.
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
ukhamarakiw Feature Store jan ukax Feature Engineering APIs ukanak apnaqaraksna, modelos ukanakar yatichañataki ukhamarak qillqt’añataki
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
kunapachatix API ingeniería de características uka apnaqktan ukhax modelo ukan linaje ukar Catálogo Explorer ukan uñakipt’araksnawa
Jichhax modelo ukan qhanañchäwip machaqar tukuyañäni ukat mä etiqueta Campeón ukar churañäni.
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
Jichhax nayrar sartañamawa ukat esquema uñakipt’añamawa kunatix modelo ukar qillqantatawa. taqi machaq lurawinakamx akham uñjañamawa
Modelo etapas : Ukax registro de modelo ukatakix irnaqäw chiqawj apnaqañax modelos ukanakar apnaqañatakix etapas ukanakaw wakisi. Alias ukanakamp apnaqañax janiw irnaqkaniti. Aka chiqan uñakipt’añäni kunjams irnaqaski uk uñjañataki
Jichhax amuyt’añäni, jiwasan modelo ukar producción ukan inferencia ukar apnaqañ munapxta. Aka thakhinx modelo campeón ukaruw cargapxta ukatx ukampiw 20 películas ukan iwxt’awinakap sapa apnaqirinakatak uñstayañataki.
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
ukat uñjapxasmawa pachpa yatichäw yatiyawinak apnaqapxta lote puntuación ukataki. Sistemas recomendadores ukanx mä amuyunikïkchisa, jilpach aplicación ukanx modelo uka apnaqañ munapxta yaqhip jan uñjat yatiyawinak puntuación ukar puriñapataki. Amuyt’añataki, Imaging your are Netflix ukat apnaqirin iwxt’awinakaparux uru tukuyarux machaq uñjat lista ukarjam machaqar tukuyañ munapxi. Jiwasax irnaqäw wakicht’araksnawa, ukax lote puntuación ukax mä hora específico ukanw uru tukuyanx apnaqasi.
Jichhax nayrar sartañasawa ukat sapa apnaqiritak iwxt’awinak uñstayañasawa. Ukatakix sapa apnaqirinakarux 20 nayrïr yänak jikxatapxta
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
akax kunjams uka resultadox uñstaski ukawa
Tukuyañatakix predicción ukax mä etiqueta delta ukhamaw jiwasan UC ukan imasispa jan ukax sistemas descendentes Mongo DB jan ukax Azure Cosmos DB ukar uñt’ayaraksnawa. Jiwasax firs opción ukampiw sarapxta
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
Jichhax mä caso amuyt’añäni, kawkhantix chiqpach pachan apnaqirinakan mayacht’asiwiparjamax jiwasan iwxt’awinakas machaqar tukuyañ munapxta. Aka tuqitx modelo serving ukampiw apnaqasispa. Kunawsatix maynix modelo ukar apnaqañ munki ukhax servidor ukarux datos ukanakaw apayanispa. Ukatx servidor ukax uka datos ukarux modelo desplegado ukarux manq’ayaraki, ukax lurawiruw sararaki, datos ukarux uñakipi, ukatx mä predicción ukaw lurasi. Ukanakax aplicaciones web, apps móviles jan ukax sistemas embebidos ukanakan apnaqasispawa. Aka amtawin mä apnaqawipax A/B yant’awinakatakix ruteo de tráfico ukar yanapt’añawa.
Algoritmo ALS ukax janiw chiqak inferencia en línea ukatakix apnaqatäkaspati kunatix modelo ukar wasitat yatichañax wakisiwa taqpach datos (nayra + machaq) apnaqasa, iwxt’awinak machaqar tukuyañataki. Algoritmos de aprendizaje de Gradiente Descenso ukax modelo ukan uñacht’ayatawa, ukax internet tuqin machaq uñstawinakatakiw apnaqasispa. Inas jutïr post ukanx yaqhip uka algoritmos ukanakax uñakipt’araksna.
Ukampirus, kunjams ukham modelox irnaqaspa uk uñacht’ayañatakik, mä (inútil) modelo lurapxta, ukax punto final ukar yanapt’i, ukax películas ukan calificación ukarjam uñt’ayi, kunapachatix mä usuariox mä películas ukar uñt’ayi!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
Akax lurani ukat chika uru modelo serving cluster jiwasataki ukhamat mä juk’a pachaw munasispa. Jichhax Serving
ventana jist’arañax tukuyañ chiqawj uñjañama.
mä tukuyañ chiqaw walja modelo ukar yanapt’añatak apnaqaraksna. Ukatx ruteo de tráfico ukampiw escenarios ukanakatakix apnaqaraksna, kunjamatix A/B yant’awinakataki jan ukax modelos de diferencia ukan lurawip producción ukan uñakipt’araksnawa.
Tablas de inferencias ukanakax Databricks Model Serving ukanx mä registro automático ukhamaw jiwasan modelos desplegados ukanakatakix irnaqapxi. Kunawsatix ch’amanchatäki ukhax mantanir mayiwinak (datos ukanakax yatiyañatak apayata), modelo correspondiente ukan mistuwinakap (predicciones) ukat yaqhip yaqha metadatos ukanakax mä tabla Delta ukham Catálogo de Unidad ukan katjapxi. Jiwasax tabla de inferencias ukanak apnaqaraksnawa, uñakipañataki ukhamarak depuración , linaje ukar arknaqañataki , ukatx mä procedimiento de recogida de datos ukampiw modelos ukanakar wasitat yatichañataki jan ukax suma askichañataki .
Jiwasax inference table
ukax jiwasan punto final de servicio ukanx modelo ukar uñjañatakis ch’amancharaksnawa. Jiwasax luraraksnawa, auto_capture_config
ukan propiedades ukanakax payload ukan uñacht’ayasa, kunapachatix nayrïr kutix tukuyañ chimpu lurasktan ukhaxa. Jan ukax tukuyañ chiqarux qhipat machaqar tukuytan put
kamachimpi ukat config
tukuyañ URL ukampiw akham (juk’amp aka chiqan )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
jichhax mä qawqha simulacro usuario interacción datos ukanakamp tukuyañ chiqar manq’ayañäni
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
Jiwasax tukuyañ qillqatanakax <catalog>.<schema>.<payload_table>
tabla ukan uñakipt’araksnawa. Ukax niya 10 minutos ukjaw tabla ukan datos ukanakax uñjasi.
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
jumax uñjañamawa kunatix aka payload tabla
Aka inferencia tabla ukan esquema uñakipañatakixa, “Unidad catálogo inferencia tabla esquema==” uñakipt’aña aka tuqina .==
Modelo ukat datos uñakipaña mä complejo tema ukax walja pachaw munasispa masterización ukataki. Databricks Lakehouse Monitoring (DLM) ukaxa jisk’acharakiw mä sistema de monitoreo aski lurañatakixa, plantillas estándar ukhamaraki personalizables ukanakampi, casos de uso común ukanakataki. Ukampirus DLM ukat modelo de monitoreo general ukar masterización ukax walja experimentaciones ukanakaw wakisi. Janiw nayax mä jach’a uñakipäw modelo de monitoreo uka tuqit aka chiqan churañ munkti jan ukasti mä qalltañ chiqawj churañ munsma. Nayax jutïrinx mä blog uka tuqitw dedicañ munta.
Mä jisk’a qhanañcht’awi DLM lurawinakata ukhamaraki lurawinakata
Jichhax jiwasan modelo ukax utt’ayatawa, ukatx tabla de inferencias generadas por nuestro punto final de servicio ukampiw métricas claves ukanakar uñjañatakix kunjamatix modelo ukan lurawipa ukhamarak deriva ukax kuna desviaciones jan ukax anomalías ukanakas jiwasan datos jan ukax modelo ukan pachapar uñt’añataki. Aka proactivo uñakipañaxa yanapt’istuwa pachaparu chiqañchañataki, kunjamatixa modelo ukaru mayampi yatichaña jan ukaxa lurawinakapa machaqar tukuyaña, ukhamata suma lurañataki ukhamaraki alineación ukampi negocio amtanakampi.
DLM ukaxa kimsa kasta uñakipaña jan ukaxa profile type
churaraki: Serie de Tiempo , Instantánea ukatxa Inferencia . Niyakejjay jiwasan tabla de inferencias ukanakas uñakipañ munstanjja, qhepatjja, jukʼampejj uñakiptʼtanjja. Mä tabla uñakipañataki apnaqañataki - jiwasana “ tabla primaria ”, jiwasaxa uñjañasawa tabla ukaxa chiqapa lurawimpi. Tabla de inferencia ukatakixa , sapa filaxa mä mayiwimpi chikachasiñawa aka columnanakampi:
modelo ukan uñacht’awinakapa
modelo ukan yatiyawipa
modelo ukax id
pacha chimpu : inferencia mayiwi pacha chimpu
uraqi chiqa (janiwa munañakiti) .
Id modelo ukax wali askiwa kunawsatix walja modelos ukar yanapt’ktan ukat sapa modelo ukan lurawip mä tablón de monitoreo ukan uñakipañ munapxta. Mä id modelo ukat sipans juk’ampi utjki ukhaxa, DLM ukax datos ukar ch’iyjañatakiw apnaqi ukatx sapa t’aqatakix métricas ukat estáticas ukanakax sapa mayniruw jakthapi.
DLM ukax sapa estadísticas ukat métricas ukanak mä específico tiempo intervalo ukatakiw jakthapi. Inferencia uñakipañatakixa, columna de sello de tiempo ukampi apnaqatarakiwa, ukatxa mä usuario definido ventana tama ukampiwa pacha ventanas uñt’ayañataki. juk’ampi aka amparamp qillqt’ata.
DLM ukax pä problem type
tablas de inferencia ukatakix yanapt’i: “ clasificación ” jan ukax “ regresión ”. Ukax yaqhip wakiskir métricas ukat estadísticas ukanak aka especificación ukarjam jakthapi.
DLM apnaqañatakix mä monitor lurañasawa ukatx mä tabla ukar apkatañasawa. Kunawsatix aka DLM lurasktan ukhax pä metric tables
lurasi :
tabla métrica de perfil : aka tabla ukaxa suma qhananchatawa min, max, porcentaje de nulo ukatxa ceros. Ukhamaraki, yaqha métricas ukanakaw utjaraki, ukax jan walt’äw kasta apnaqirin qhanañchataparjama. Amuyt’añataki precisión , recall ukat f1_score ukax modelos de clasificación ukatakiw, ukatx mean_squared_error ukatx mean_average_error ukax modelos de regresión ukatakiw.
tabla métrica de deriva : ukax estadísticas ukaniwa, ukax kunjams datos ukan jaljawipax mayjt’awayi pachaparjama jan ukax mä valor de base ukar uñtasita (ukax churatarakiwa) . Ukax medidas ukanakaw jakthapita, kunjamatix Chi-cuadrado prueba, KS prueba.
sapa tabla ukan phuqhat métricas ukan lista uñakipañatakix Monitor métrica tabla documentación ukan uñakipaña. Ukhamaraki, ukax mä métricas personalizadas ukanakaw lurasispa.
Mä wakiskir aspecto sistema de monitoreo lurañatakix jiwasan dashboard de monitoreo ukax qhipa inferencia ukan yatiyawinakap puriñapatakiw uñjañapa. Ukatakix Delta tabla streaming ukampiw apnaqasispa, ukhamat tabla de inferencias ukan procesadas filas ukanakar uñjañataki. Jiwasax modelo serving ukan inferencia tabla ukarux jiwasan fuente tabla ukham apnaqapxta ( readStream
), ukatx monitoreo tabla ukarux sink tabla ukham apnaqapxta ( writeStream
). Ukhamaraki, Mayjt’awi Datos Captura (CDC) ukaxa panpacha tabla ukanxa ch’amanchatawa (ukaxa Tabla de Inferencia ukanxa nayraqata ch’amanchatawa). Ukhamatx mayjt’awinakak lurapxta - insert/update/delete - ukax phuqhat tabla ukanx taqpach tabla ukar sapa machaqar tukuyañan wasitat lurañat sipansa.
Jiwasan tabla de inferencias ukan uñjañax ch’amanchañatakix aka lurawinakampiw lurasi:
Nayraqatax Lakehouse Monitoring API ukar uñt’ayañaw wakisi. Ukaxa nayraqata uñstayatawa kunatixa Databricks rum time 15.3 LTS ukatxa juk’ampi:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
Tabla de inferencia ukax mä tabla de streaming ukham uñakipt’añäni
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
Ukxarusti, tabla ukarux chiqap formato ukar uchañaw wakisi kunjamatix nayraqat qhanañchawayktan ukhama. Aka tabla ukaxa sapa yatiyawitakixa mä fila ukaniñaparakiwa wakiskiri uñacht’awinakampi ukhamaraki yatiyawi chimpumpi. Tabla de inferencia ukax modelo servidor de punto final ukat apsutawa, ukax punto final mayiwinak ukat jaysawinak mä formato JSON anidado ukham imaña. Akax mä uñacht’äwiwa JSON payload ukax mayiwi ukat jaysawinak columna ukataki.
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
Aka tabla chiqa esquema ukar apsuñatakix aka código apnaqaraksnawa ukax Databricks documentación ukan adaptatawa ( Tabla de inferencia Lakehouse Monitoring starter notebook ).
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
Uka apsut tablax akham uñtasitäspawa:
Ukxarusti, jiwasan lavabo tabla ukar qalltañasawa
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
ukatxa uka yatxatatanaka qillqaña
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
Tukuyañatakix, jiwasan tabla de base ukar uñstayapxta. DLM ukax aka tabla ukampiw derivas ukanakax jakhthapita, ukax modelos de línea base ukat primaria ukanakan pachpa columnas ukanakan jaljawip uñakipt’asa. Tabla de base ukax pachpa columna de características ukaniñapaw nayrïr columna ukhamarak pachpa columna identificación de modelo ukampi. Tabla de base ukatakix tabla de predicción ukampiw apnaqapxta, ukax jiwasan conjunto de datos de validación ukampiw nayraqat imatäski, ukatx modelo ukarux he best hiperparámetro ukampiw yatichapxta. Métrica de deriva uñakipañatakixa, Databricks ukaxa métricas de perfil uka jakhthapitawa, ukaxa tabla primaria ukhamaraki línea base ukanakataki. Aka chiqanx tabla Primaria ukat tabla de base ukanakat uñakipt'apxasmawa .
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
Jichhax jiwasan tablón de monitoreo lurañatakiw uñakipt’atätanxa. Jiwasax UI ukampiw luraraksna jan ukax API de Monitoreo de Lagos ukan uñt’ayatawa. Aka chiqanx payïr amtawimpiw apnaqapxta:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
ukatx código apnaqañ tukuyatatx mä juk’a pachaw Databricks ukax taqpach métrica ukar jakthapiñkama. Tabla de mando uñakipañatakix Quality
ukaruw sarañama, ukax sink tabla ukankiwa (mä arunx unpacked_requests_table_name
). Mä panka uñjañamawa akhama.
Uñakipañ refresh history
ukar ch’iqt’asax t’ijtäwi, pendiente ukat nayra machaqar tukuyatanakam uñjañamawa. ukatsti View Dashboard
ukaruw ch’iqt’añama, ukhamat dashboard ukar jist’arañataki.
ukhamatw tabla de inferencia ( my_endpoint_payload
) ukamp qalltawaytanxa, ukax proceso ukat resultado ukax my_endpoint_payload_unpacked
ukar imañawa ukatx aka tabla ukax jiwasan tabla de base ( base_table_als
) ukampiw API de monitoreo ukar pasañapa. DLM ukax sapa tabla ukan perfil métricas ukar jakthapi ( my_endpoint_payload_unpacked_profile_metric
) ukatx ukanak apnaqi deriva métricas ukar jakthapiñataki ( my_endpoint_payload_unpacked_drift_metrics
) .
¡Ukanwa jumax sarasma! jumax taqi kunatix wakiski ukax servir ukat monitoreo modelo!
Jutïr t’aqapanx kunjams aka lurawix Databricks Assets Bundle ukat Gitlab ukanakamp apnaqañax uk uñacht’ayapxäma !
Aka yatichäw serien nayrïr t’aqapanx nayrïr thakhinak lurapxta, mä pipeline MLOps tukuyat tukuykama lurañataki, Databricks ukat Spark uka apnaqasa, Databricks ukan arquitectura de referencia ukamp irpata. Akax mä recapitulación ukawa, kuna jach’a amtawinaktix uñakipt’awayktan ukanakxa:
Catálogo de Unidad ukax Arquitectura de Medallón ukatakiw utt’ayata : Jiwasan yatiyawinakasax bronce, qullqi ukat quri capas ukanakaruw Catálogo de Unidad ukan wakicht’apxta, mä sistema de gestión de datos estructurado ukat eficiente ukham utt’ayasa.
Ingesting Data into Unity Catalogue : Jiwasax uñacht’ayapxtwa kunjams crudo datos ukax sistema ukar apkatasispa, ukhamat mä consistencia ukat calidad ukax qhipa etapas de procesamiento ukataki.
Modelo yatichaña : Databricks apnaqasa, mä modelo de aprendizaje automático ukar yatichapxta, ukax jiwasan conjunto de datos ukarjam wakicht’atawa, ukax suma lurawinakampiw modelo escalable ukat askinjam lurañataki.
Sintonización de hiperparámetros con HyperOpt : Modelo ukan lurawip jach’anchayañatakixa, HyperOpt ukampiwa irnaqapxta, ukhamata hiperparámetros óptimos ukanaka thaqhañataki, ukhamata chiqapa ukhamaraki suma lurañataki.
Databricks MLflow ukampiw yant’äwinak arknaqañax : MLflow ukampiw yant’äwinakas qillqt’añataki ukat uñjañatakis apnaqapxta, mä jach’a qillqt’äwiw modelo ukan versión, métricas ukat parámetros ukanakat jasak chiqanchañataki ukat wasitat uñstayañataki.
Aka fundamental lurawinakampi tukuyatatxa, modelo ukax jichhax primed ukhamawa despliegue ukataki. Aka payïr t’aqapanx pä componentes criticos ukanakaw sistema ukar mayacht’añatak uñakipt’atäni:
¡Ukar mantañäni!
Qhipa blog ukan saräwipax modelo de evaluación ukawa. Jichhax amuyt’añäni, uñakipt’awix lurawaytanwa ukatx jikxatawayktanwa jiwasan modelo ukax mä jach’a lurawi uñacht’ayi aka modelo de producción ukar uñtasita. Kunjamatix jiwasax (asumi) modelo ukax producción ukan apnaqañ munktanxa, taqi datos ukanak aprovechañ munapxtan kunatix utjkistu ukanakxa. Jutïr amtawix modelo ukar yatichañawa ukat yant’añawa, ukax taqpach conjunto de datos ukampiw lurasi. Ukatx jiwasan modelo ukax qhipa apnaqañatakiw ch’amanchasi, ukax jiwasan modelo campeón ukham uñstayasa. Niyakixay akax qhipa modelo ukax inferencia ukatak apnaqañ munstanxa, ukax cliente de Ingeniería de Características ukampiw modelo ukar yatichañatak apnaqapxta. Ukhamatx janiw linaje modelo ukak jasak arknaqktanti, jan ukasti esquema validación ukat características transformación (ukax utjchi ukhax) cliente ukaruw descargar.
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
ukhamarakiw Feature Store jan ukax Feature Engineering APIs ukanak apnaqaraksna, modelos ukanakar yatichañataki ukhamarak qillqt’añataki
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
kunapachatix API ingeniería de características uka apnaqktan ukhax modelo ukan linaje ukar Catálogo Explorer ukan uñakipt’araksnawa
Jichhax modelo ukan qhanañchäwip machaqar tukuyañäni ukat mä etiqueta Campeón ukar churañäni.
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
Jichhax nayrar sartañamawa ukat esquema uñakipt’añamawa kunatix modelo ukar qillqantatawa. taqi machaq lurawinakamx akham uñjañamawa
Modelo etapas : Ukax registro de modelo ukatakix irnaqäw chiqawj apnaqañax modelos ukanakar apnaqañatakix etapas ukanakaw wakisi. Alias ukanakamp apnaqañax janiw irnaqkaniti. Aka chiqan uñakipt’añäni kunjams irnaqaski uk uñjañataki
Jichhax amuyt’añäni, jiwasan modelo ukar producción ukan inferencia ukar apnaqañ munapxta. Aka thakhinx modelo campeón ukaruw cargapxta ukatx ukampiw 20 películas ukan iwxt’awinakap sapa apnaqirinakatak uñstayañataki.
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
ukat uñjapxasmawa pachpa yatichäw yatiyawinak apnaqapxta lote puntuación ukataki. Sistemas recomendadores ukanx mä amuyunikïkchisa, jilpach aplicación ukanx modelo uka apnaqañ munapxta yaqhip jan uñjat yatiyawinak puntuación ukar puriñapataki. Amuyt’añataki, Imaging your are Netflix ukat apnaqirin iwxt’awinakaparux uru tukuyarux machaq uñjat lista ukarjam machaqar tukuyañ munapxi. Jiwasax irnaqäw wakicht’araksnawa, ukax lote puntuación ukax mä hora específico ukanw uru tukuyanx apnaqasi.
Jichhax nayrar sartañasawa ukat sapa apnaqiritak iwxt’awinak uñstayañasawa. Ukatakix sapa apnaqirinakarux 20 nayrïr yänak jikxatapxta
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
akax kunjams uka resultadox uñstaski ukawa
Tukuyañatakix predicción ukax mä etiqueta delta ukhamaw jiwasan UC ukan imasispa jan ukax sistemas descendentes Mongo DB jan ukax Azure Cosmos DB ukar uñt’ayaraksnawa. Jiwasax firs opción ukampiw sarapxta
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
Jichhax mä caso amuyt’añäni, kawkhantix chiqpach pachan apnaqirinakan mayacht’asiwiparjamax jiwasan iwxt’awinakas machaqar tukuyañ munapxta. Aka tuqitx modelo serving ukampiw apnaqasispa. Kunawsatix maynix modelo ukar apnaqañ munki ukhax servidor ukarux datos ukanakaw apayanispa. Ukatx servidor ukax uka datos ukarux modelo desplegado ukarux manq’ayaraki, ukax lurawiruw sararaki, datos ukarux uñakipi, ukatx mä predicción ukaw lurasi. Ukanakax aplicaciones web, apps móviles jan ukax sistemas embebidos ukanakan apnaqasispawa. Aka amtawin mä apnaqawipax A/B yant’awinakatakix ruteo de tráfico ukar yanapt’añawa.
Algoritmo ALS ukax janiw chiqak inferencia en línea ukatakix apnaqatäkaspati kunatix modelo ukar wasitat yatichañax wakisiwa taqpach datos (nayra + machaq) apnaqasa, iwxt’awinak machaqar tukuyañataki. Algoritmos de aprendizaje de Gradiente Descenso ukax modelo ukan uñacht’ayatawa, ukax internet tuqin machaq uñstawinakatakiw apnaqasispa. Inas jutïr post ukanx yaqhip uka algoritmos ukanakax uñakipt’araksna.
Ukampirus, kunjams ukham modelox irnaqaspa uk uñacht’ayañatakik, mä (inútil) modelo lurapxta, ukax punto final ukar yanapt’i, ukax películas ukan calificación ukarjam uñt’ayi, kunapachatix mä usuariox mä películas ukar uñt’ayi!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
Akax lurani ukat chika uru modelo serving cluster jiwasataki ukhamat mä juk’a pachaw munasispa. Jichhax Serving
ventana jist’arañax tukuyañ chiqawj uñjañama.
mä tukuyañ chiqaw walja modelo ukar yanapt’añatakix apnaqaraksna. Ukatx ruteo de tráfico ukampiw escenarios ukanakatakix apnaqaraksna, kunjamatix A/B yant’awinakataki jan ukax modelos de diferencia ukan lurawip producción ukan uñakipt’araksnawa.
Tablas de inferencias ukanakax Databricks Model Serving ukanx mä registro automático ukhamaw jiwasan modelos desplegados ukanakatakix irnaqapxi. Kunawsatix ch’amanchatäki ukhax mantanir mayiwinak (datos ukanakax yatiyañatak apayata), modelo correspondiente ukan mistuwinakap (predicciones) ukat yaqhip yaqha metadatos ukanakax mä tabla Delta ukham Catálogo de Unidad ukan katjapxi. Jiwasax tabla de inferencias ukanak apnaqaraksnawa, uñakipañataki ukhamarak depuración , linaje ukar arknaqañataki , ukatx mä procedimiento de recogida de datos ukampiw modelos ukanakar wasitat yatichañataki jan ukax suma askichañataki .
Jiwasax inference table
ukax jiwasan punto final de servicio ukanx modelo ukar uñjañatakis ch’amancharaksnawa. Jiwasax luraraksnawa, auto_capture_config
ukan propiedades ukanakax payload ukan uñacht’ayasa, kunapachatix nayrïr kutix tukuyañ chimpu lurasktan ukhaxa. Jan ukax tukuyañ chiqarux qhipat machaqar tukuytan put
kamachimpi ukat config
tukuyañ URL ukampiw akham (juk’amp aka chiqan )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
jichhax mä qawqha simulacro usuario interacción datos ukanakamp tukuyañ chiqar manq’ayañäni
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
Jiwasax tukuyañ qillqatanakax <catalog>.<schema>.<payload_table>
tabla ukan uñakipt’araksnawa. Ukax niya 10 minutos ukjaw tabla ukan datos ukanakax uñjasi.
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
jumax uñjañamawa kunatix aka payload tabla
Aka inferencia tabla ukan esquema uñakipañatakixa, “Unidad catálogo inferencia tabla esquema==” uñakipt’aña aka tuqina .==
Modelo ukat datos uñakipaña mä complejo tema ukax walja pachaw munasispa masterización ukataki. Databricks Lakehouse Monitoring (DLM) ukaxa jisk’acharakiw mä sistema de monitoreo aski lurañatakixa, plantillas estándar ukhamaraki personalizables ukanakampi, casos de uso común ukanakataki. Ukampirus DLM ukat modelo de monitoreo general ukar masterización ukax walja experimentaciones ukanakaw wakisi. Janiw nayax mä jach’a uñakipäw modelo de monitoreo uka tuqit aka chiqan churañ munkti jan ukasti mä qalltañ chiqawj churañ munsma. Nayax jutïrinx mä blog uka tuqitw dedicañ munta.
Mä jisk’a qhanañcht’awi DLM lurawinakata ukhamaraki lurawinakata
Jichhax jiwasan modelo ukax utt’ayatawa, ukatx tabla de inferencias generadas por nuestro punto final de servicio ukampiw métricas claves ukanakar uñjañatakix kunjamatix modelo ukan lurawipa ukhamarak deriva ukax kuna desviaciones jan ukax anomalías ukanakas jiwasan datos jan ukax modelo ukan pachapar uñt’añataki. Aka proactivo uñakipañaxa yanapt’istuwa pachaparu chiqañchañataki, kunjamatixa modelo ukaru mayampi yatichaña jan ukaxa lurawinakapa machaqar tukuyaña, ukhamata suma lurañataki ukhamaraki alineación ukampi negocio amtanakampi.
DLM ukaxa kimsa kasta uñakipaña jan ukaxa profile type
churaraki: Serie de Tiempo , Instantánea ukatxa Inferencia . Niyakejjay jiwasan tabla de inferencias ukanakas uñakipañ munstanjja, qhepatjja, jukʼampejj uñakiptʼtanjja. Mä tabla uñakipañataki apnaqañataki - jiwasana “ tabla primaria ”, jiwasaxa uñjañasawa tabla ukaxa chiqapa lurawimpi. Tabla de inferencia ukatakixa , sapa filaxa mä mayiwimpi chikachasiñawa aka columnanakampi:
modelo ukan uñacht’awinakapa
modelo ukan yatiyawipa
modelo ukax id
pacha chimpu : inferencia mayiwi pacha chimpu
uraqi chiqapa (opcional) .
Id modelo ukax wali askiwa kunawsatix walja modelos ukar yanapt’ktan ukhax sapa modelo ukan lurawip mä tablón de monitoreo ukan uñakipañ munapxta. Mä id modelo ukat sipans juk’ampi utjki ukhaxa, DLM ukax datos ukar ch’iyjañatakiw apnaqi ukatx sapa t’aqatakix métricas ukat estáticas ukanakax sapa mayniruw jakthapi.
DLM ukax sapa estadísticas ukat métricas ukanak mä específico tiempo intervalo ukatakiw jakthapi. Inferencia uñakipañatakixa, columna de sello de tiempo ukampi apnaqatarakiwa, ukatxa mä usuario definido ventana tama ukampiwa pacha ventanas uñt’ayañataki. juk’ampi aka amparamp qillqt’ata.
DLM ukax pä problem type
tablas de inferencia ukatakix yanapt’i: “ clasificación ” jan ukax “ regresión ”. Ukax yaqhip wakiskir métricas ukat estadísticas ukanak aka especificación ukarjam jakthapi.
DLM apnaqañatakix mä monitor lurañasawa ukatx mä tabla ukar apkatañasawa. Kunawsatix aka DLM lurasktan ukhax pä metric tables
lurasi :
tabla métrica de perfil : aka tabla ukaxa suma qhananchatawa min, max, porcentaje de nulo ukatxa ceros. Ukhamaraki, yaqha métricas ukanakaw utjaraki, ukax jan walt’äw kasta apnaqirin qhanañchataparjama. Amuyt’añataki precisión , recall ukat f1_score ukax modelos de clasificación ukatakiw, ukatx mean_squared_error ukatx mean_average_error ukax modelos de regresión ukatakiw.
tabla métrica de deriva : ukax estadísticas ukaniwa, ukax kunjams datos ukan jaljawipax mayjt’awayi pachaparjama jan ukax mä valor de base ukar uñtasita (ukax churatarakiwa) . Ukax medidas ukanakaw jakthapita, kunjamatix Chi-cuadrado prueba, KS prueba.
sapa tabla ukan phuqhat métricas ukan lista uñakipañatakix Monitor métrica tabla documentación ukan uñakipaña. Ukhamaraki, ukax mä métricas personalizadas ukanakaw lurasispa.
Mä wakiskir aspecto sistema de monitoreo lurañatakix jiwasan dashboard de monitoreo ukax qhipa inferencia ukan yatiyawinakap puriñapatakiw uñjañapa. Ukatakix Delta tabla streaming ukampiw apnaqasispa, ukhamat tabla de inferencias ukan procesadas filas ukanakar uñjañataki. Jiwasax modelo serving ukan inferencia tabla ukarux jiwasan fuente tabla ukham apnaqapxta ( readStream
), ukatx monitoreo tabla ukarux sink tabla ukham apnaqapxta ( writeStream
). Ukhamaraki, Cambio de Datos Captura (CDC) ukax panpacha tabla ukan ch’amanchatawa (Tabla de Inferencia ukanx nayraqat ch’amanchatawa). Ukhamatx mayjt’awinakak lurapxta - insert/update/delete - ukax phuqhat tabla ukanx taqpach tabla ukar sapa machaqar tukuyañan wasitat lurañat sipansa.
Jiwasan tabla de inferencias ukan uñjañax ch’amanchañatakix aka lurawinakampiw lurasi:
Nayraqatax Lakehouse Monitoring API ukar uñt’ayañaw wakisi. Ukaxa nayraqata uñstayatawa kunatixa Databricks rum time 15.3 LTS ukatxa juk’ampi:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
Tabla de inferencia ukax mä tabla de streaming ukham uñakipt’añäni
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
Ukxarusti, tabla ukarux chiqap formato ukar uchañaw wakisi kunjamatix nayraqat qhanañchawayktan ukhama. Aka tabla ukaxa sapa yatiyawitakixa mä fila ukaniñaparakiwa wakiskiri uñacht’awinakampi ukhamaraki yatiyawi chimpumpi. Tabla de inferencia ukax modelo servicio de punto final ukat apsutawa, ukax punto final mayiwinak ukat jaysawinak mä formato JSON anidado ukham imañataki. Akax mä uñacht’äwiwa JSON payload ukax mayiwi ukat jaysawinak columna ukataki.
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
Aka tabla chiqa esquema ukar apsuñatakix aka código apnaqaraksnawa ukax Databricks documentación ukan adaptatawa ( Tabla de inferencia Lakehouse Monitoring starter notebook ).
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
Uka apsut tablax akham uñtasitäspawa:
Ukxarusti, jiwasan lavabo tabla ukar qalltañasawa
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
ukatxa uka yatxatatanaka qillqaña
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
Tukuyañatakix, jiwasan tabla de base ukar uñstayapxta. DLM ukax aka tabla ukampiw derivas ukanakax jakhthapita, ukax modelos de línea base ukat primaria ukanakan pachpa columnas ukanakan jaljawip uñakipt’asa. Tabla de base ukax pachpa columna de características ukaniñapaw nayrïr columna ukhamarak pachpa columna identificación de modelo ukampi. Tabla de base ukatakix tabla de predicción ukampiw apnaqapxta, ukax jiwasan conjunto de datos de validación ukampiw nayraqat imatäski, ukatx modelo ukarux he best hiperparámetro ukampiw yatichapxta. Métrica de deriva uñakipañatakixa, Databricks ukaxa métricas de perfil uka jakhthapitawa, ukaxa tabla primaria ukhamaraki línea base ukanakataki. Aka chiqanx tabla Primaria ukat tabla de base ukanakat uñakipt'apxasmawa .
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
Jichhax jiwasan tablón de monitoreo lurañatakiw uñakipt’atätanxa. Jiwasax UI ukampiw luraraksna jan ukax API de Monitoreo de Lagos ukan uñt’ayatawa. Aka chiqanx payïr amtawimpiw apnaqapxta:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
ukatx código apnaqañ tukuyatatx mä juk’a pachaw Databricks ukax taqpach métrica ukar jakthapiñkama. Tabla de mando uñakipañatakix Quality
ukaruw sarañama, ukax sink tabla ukankiwa (mä arunx unpacked_requests_table_name
). Mä panka uñjañamawa akhama.
Uñakipañ refresh history
ukar ch’iqt’asax t’ijtäwi, pendiente ukat nayra machaqar tukuyatanakam uñjañamawa. ukatsti View Dashboard
ukaruw ch’iqt’añama, ukhamat dashboard ukar jist’arañataki.
ukhamatw tabla de inferencia ( my_endpoint_payload
) ukamp qalltawaytanxa, ukax proceso ukat resultado ukax my_endpoint_payload_unpacked
ukar imañawa ukatx aka tabla ukax jiwasan tabla de base ( base_table_als
) ukampiw API de monitoreo ukar pasañapa. DLM ukax sapa tabla ukan perfil métricas ukar jakthapi ( my_endpoint_payload_unpacked_profile_metric
) ukatx ukanak apnaqi deriva métricas ukar jakthapiñataki ( my_endpoint_payload_unpacked_drift_metrics
) .
¡Ukanwa jumax sarasma! jumax taqi kunatix wakiski ukax servir ukat monitoreo modelo!
Jutïr t’aqapanx kunjams aka lurawix Databricks Assets Bundle ukat Gitlab ukanakamp apnaqañax uk uñacht’ayapxäma !