In het hebben we de eerste stappen gezet voor het bouwen van een end-to-end MLOps-pipeline met behulp van Databricks en Spark, geleid door de referentiearchitectuur van Databricks. Hier is een samenvatting van de belangrijkste stappen die we hebben behandeld: eerste deel van deze tutorialserie : We hebben onze gegevens georganiseerd in bronzen, zilveren en gouden lagen binnen de Unity Catalog, waarmee we een gestructureerd en efficiënt gegevensbeheersysteem hebben opgezet. Het opzetten van de Unity Catalog voor Medallion Architecture : We hebben laten zien hoe u ruwe gegevens in het systeem kunt importeren, zodat u consistentie en kwaliteit kunt garanderen voor de daaropvolgende verwerkingsfasen. Gegevens opnemen in Unity Catalog : met behulp van Databricks hebben we een machine learning-model getraind dat is afgestemd op onze dataset. Hierbij hebben we de best practices voor schaalbare en effectieve modelontwikkeling gevolgd. Het model trainen : om de modelprestaties te verbeteren, hebben we HyperOpt ingezet om de zoektocht naar optimale hyperparameters te automatiseren, waardoor de nauwkeurigheid en efficiëntie werden verbeterd. Hyperparameterafstemming met HyperOpt : We hebben MLflow gebruikt om onze experimenten te loggen en te monitoren. Zo houden we een uitgebreid overzicht bij van modelversies, statistieken en parameters, zodat we deze eenvoudig kunnen vergelijken en reproduceren. Experimenten bijhouden met Databricks MLflow Met deze fundamentele stappen voltooid, is uw model nu klaar voor implementatie. In dit tweede deel richten we ons op het integreren van twee kritieke componenten in ons systeem: : implementatie van batchverwerking om voorspellingen te genereren op basis van grote datasets, geschikt voor toepassingen zoals bulk-scoring en periodieke rapportage. Batch-inferentie : het opzetten van realtime modelserveren om onmiddellijke voorspellingen te doen, essentieel voor interactieve toepassingen en services. Online inferentie (modelserveren) om ervoor te zorgen dat uw geïmplementeerde modellen in de loop van de tijd optimale prestaties en betrouwbaarheid behouden. Modelbewaking: Laten we beginnen! Modelimplementatie Het uitgangspunt van de vorige blog was modelevaluatie. Stel je nu voor dat we de vergelijking hebben gedaan en hebben ontdekt dat ons model een hogere prestatie laat zien in vergelijking met dit productiemodel. Omdat we het model in productie willen (aannemen) gebruiken, willen we profiteren van alle gegevens die we hebben. De volgende stap is om het model te trainen en te testen met behulp van de volledige dataset. Bewaar ons model vervolgens voor later gebruik door het te implementeren als ons kampioensmodel. Omdat dit het laatste model is dat we willen gebruiken voor inferentie, gebruiken we de Feature Engineering-client om het model te trainen. Op deze manier kunnen we niet alleen de modellijn gemakkelijker volgen, maar ook de schemavalidatie en featuretransformatie (indien van toepassing) uitbesteden aan de client. with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) we kunnen ook de gebruiken om de modellen te trainen en te loggen Feature Store of Feature Engineering API's model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) Wanneer we de feature engineering API gebruiken, kunnen we de afstamming van het model bekijken in Catalog Explorer Laten we nu de modelbeschrijving bijwerken en er een Champion-label aan toewijzen. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) Ga nu verder en controleer het schema waarmee u het model hebt geregistreerd. U zou al uw updates als volgt moeten zien : Als u workspace gebruikt voor modelregistratie, moet u stages gebruiken om uw modellen te beheren. Het gebruik van aliassen werkt niet. Bekijk om te zien hoe het werkt Model stages hier Modelinferentie Batch-scoren Stel je nu voor dat we ons model in productie willen gebruiken voor inferentie. In deze stap laden we het championmodel en gebruiken het om 20 filmaanbevelingen voor elke gebruiker te genereren. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) en u kunt zien dat we dezelfde trainingsgegevens hebben gebruikt voor batch-scoring. Hoewel het in het geval van aanbevelingssystemen logisch is, willen we in de meeste toepassingen het model gebruiken om wat ongeziene gegevens te scoren. Stel u bijvoorbeeld voor dat u Netflix bent en de aanbevelingen van de gebruiker aan het einde van de dag wilt bijwerken op basis van hun nieuwe kijklijst. We kunnen een taak plannen die de batch-scoring op een specifiek tijdstip aan het einde van de dag uitvoert. Nu kunnen we doorgaan en de aanbevelingen voor elke gebruiker genereren. Hiervoor vinden we de top 20 items per gebruiker from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) zo ziet het resultaat eruit Ten slotte kunnen we de voorspelling opslaan als een deltalabel op onze UC of ze publiceren naar een downstream-systeem Mongo DB of Azure Cosmos DB. We gaan met de eerste optie df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") Streaming/online-inferentie Stel je nu eens een geval voor waarin we onze aanbevelingen willen updaten op basis van realtime gebruikersinteracties. Voor dit geval kunnen we model serving gebruiken. Wanneer iemand jouw model wil gebruiken, kan hij/zij data naar de server sturen. De server stuurt die data vervolgens naar jouw geïmplementeerde model, dat in actie komt, de data analyseert en een voorspelling genereert. Ze kunnen worden gebruikt in webapplicaties, mobiele apps of zelfs embedded systemen. Een van de toepassingen van deze aanpak is om traffic routing mogelijk te maken voor A/B-testen. Het ALS-algoritme kan niet direct worden gebruikt voor online inferentie, omdat het vereist dat het model opnieuw wordt getraind met behulp van de volledige data (oud + nieuw) om de aanbevelingen bij te werken. Gradient Descent-leeralgoritmen zijn voorbeelden van modellen die kunnen worden gebruikt voor online updates. We kunnen in toekomstige berichten naar enkele van deze algoritmen kijken. Om te illustreren hoe zo'n model zou werken, maken we een (nutteloos) model dat als eindpunt fungeert en filmbeoordelingen voorspelt op basis van de beoordeling door een gebruiker! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) Dit zal een lunchmodelservingcluster voor ons creëren, dus het kost wat tijd. Als u nu het -venster opent, zou u uw eindpunt moeten zien. Serving we kunnen één eindpunt gebruiken om meerdere modellen te bedienen. Vervolgens kunnen we verkeersroutering gebruiken voor scenario's zoals A/B-testen of de prestaties van verschillende modellen in de productie vergelijken. Inferentietabel Inferentietabellen in Databricks Model Serving fungeren als een automatisch logboek voor onze geïmplementeerde modellen. Wanneer ingeschakeld, leggen ze inkomende verzoeken (gegevens verzonden voor voorspelling), de bijbehorende modeluitvoer (voorspellingen) en enkele andere metagegevens vast als een Delta-tabel binnen Unity Catalog. We kunnen de inferentietabel gebruiken voor , en een gegevensverzamelingsprocedure voor of van onze modellen. monitoring en debugging lineage tracking het opnieuw trainen verfijnen We kunnen de op ons serving endpoint inschakelen om het model te monitoren. We kunnen dit doen door de eigenschappen in de payload te specificeren wanneer we het endpoint voor het eerst aanmaken. Of we updaten ons endpoint daarna met de opdracht en de endpoint URL als volgt (meer inference table auto_capture_config put config ) hier data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) Laten we nu het eindpunt voeden met wat dummy-gebruikersinteractiegegevens import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) We kunnen de endpoint logs controleren in de tabel. Het duurt ongeveer 10 minuten voordat u de data in de tabel kunt zien. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) je zou zoiets moeten zien als je payloadtabel Om het schema van deze inferentietabel te begrijpen, kijk naar “Unity catalogus inferentietabel schema==” .== hier Modelbewaking Model- en datamonitoring zijn een complex onderwerp dat veel tijd kost om onder de knie te krijgen. Databricks Lakehouse Monitoring (DLM) vermindert de overhead van het bouwen van een goed monitoringsysteem door standaard en aanpasbare sjablonen te bieden voor veelvoorkomende use cases. Het onder de knie krijgen van DLM en modelmonitoring in het algemeen vereist echter veel experimenten. Ik wil u hier geen uitgebreid overzicht van modelmonitoring geven, maar u eerder een startpunt geven. Ik wijd misschien in de toekomst een blog aan dit onderwerp. Een korte samenvatting van DLM-functionaliteiten en -functies Nu ons model up and running is, kunnen we de inferentietabel die door ons serving endpoint is gegenereerd gebruiken om belangrijke statistieken te monitoren, zoals modelprestaties en drift, om afwijkingen of anomalieën in onze data of model in de loop van de tijd te detecteren. Deze proactieve aanpak helpt ons om tijdig corrigerende maatregelen te nemen, zoals het opnieuw trainen van het model of het updaten van de functies, om optimale prestaties en afstemming op bedrijfsdoelstellingen te behouden. DLM biedt drie typen analyse of : , en . Omdat we geïnteresseerd zijn in het analyseren van onze inferencetabel, richten we ons op de laatste. Om een tabel te gebruiken voor monitoring - onze " ", moeten we ervoor zorgen dat de tabel de juiste structuur heeft. Voor de moet elke rij overeenkomen met een verzoek met de volgende kolommen: profile type Time Series Snapshot Inference primaire tabel inferencetabel modelkenmerken modelvoorspelling model-id : tijdstempel van het inferentieverzoek timestamp (optioneel) grondwaarheid De is belangrijk voor gevallen waarin we meerdere modellen bedienen en we de prestaties van elk model in één monitoringdashboard willen volgen. Als er meer dan één model-id beschikbaar is, gebruikt DLM deze om de gegevens te slicen en metrische gegevens en statistische gegevens voor elke slice afzonderlijk te berekenen. model-id DLM berekent elke statistiek en metriek voor een bepaald tijdsinterval. Voor inferentieanalyse gebruikte het de , plus een door de gebruiker gedefinieerde venstergrootte om de tijdsvensters te identificeren. meer hieronder. timestampkolom DLM ondersteunt twee voor inferentietabellen: " " of " ". Het berekent enkele van de relevante statistieken en statistieken op basis van deze specificatie. problem type classificatie regressie Om DLM te gebruiken, moeten we een monitor maken en deze aan een tabel koppelen. Wanneer we dit doen, maakt DLM twee : metric tables : deze tabel bevat samenvattende statistieken zoals min, max, percentage nullen en nullen. Het bevat ook aanvullende metrieken op basis van het door de gebruiker gedefinieerde probleemtype. Bijvoorbeeld , en voor de classificatiemodellen, en en voor regressiemodellen. profielmetriektabel precisie recall f1_score mean_squared_error mean_average_error : bevat statistieken die meten hoe de distributie van data is veranderd of ten opzichte van een . Het berekent metingen zoals Chi-kwadraattest, KS-test. drift metrische tabel in de loop van de tijd basislijnwaarde (indien verstrekt) om de lijst met complete metrics voor elke tabel te bekijken, controleer documentation page. Het is ook mogelijk om Monitor metric table te maken. aangepaste metrics Een belangrijk aspect van het bouwen van een monitoringsysteem is om ervoor te zorgen dat ons monitoringdashboard toegang heeft tot de nieuwste inferentiegegevens zodra ze binnenkomen. Hiervoor kunnen we gebruiken om verwerkte rijen in de inferentietabel bij te houden. We gebruiken de inferentietabel van de modelserver als onze brontabel ( ) en de monitoringtabel als de sinktabel ( ). We zorgen er ook voor dat de (CDC) is ingeschakeld op beide tabellen (het is standaard ingeschakeld op de inferentietabel). Op deze manier verwerken we alleen wijzigingen - invoegen/bijwerken/verwijderen - in de brontabel in plaats van de hele tabel opnieuw te verwerken bij elke vernieuwing. Delta-tabelstreaming readStream writeStream Change Data Capture Praktisch Om de monitoring van onze inferentietabel mogelijk te maken, nemen we de volgende stappen: Lees de inferentietabel als een streamingtabel Maak een nieuwe deltatabel met het juiste schema door de inferentietabel uit te pakken die is gegenereerd door ons modelserverend eindpunt. Maak de basislijntabel (indien van toepassing) klaar Maak een monitor over de resulterende tabel en vernieuw de metriek Plan een workflow om de inferentietabel uit te pakken naar de juiste structuur en de statistieken te vernieuwen Eerst moeten we de Lakehouse Monitoring API installeren. Deze zou al geïnstalleerd moeten zijn als u Databricks rum time 15.3 LTS en hoger gebruikt: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() Laten we de inferentietabel lezen als een streamingtabel requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True Vervolgens moeten we de tabel in het juiste formaat zetten zoals hierboven beschreven. Deze tabel moet één rij hebben voor elke voorspelling met relevante kenmerken en voorspellingswaarde. De inferentietabel die we krijgen van het model dat het eindpunt bedient, slaat de eindpuntverzoeken en -reacties op als een geneste JSON-indeling. Hier is een voorbeeld van de JSON-payload voor de aanvraag- en reactiekolom. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | Om deze tabel uit te pakken naar het juiste schema kunnen we de volgende code gebruiken die is aangepast uit de Databricks-documentatie ( ). Inference table Lakehouse Monitoring starter notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned De resulterende tabel ziet er als volgt uit: Vervolgens moeten we onze sink-tabel initialiseren dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() en schrijf de resultaten checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ Ten slotte maken we onze baselinetabel. DLM gebruikt deze tabel om de drifts te berekenen door de distributie van vergelijkbare kolommen van baseline- en primaire modellen te vergelijken. De baselinetabel moet dezelfde featurekolom hebben als de primaire kolom en dezelfde modelidentificatiekolom. Voor de baselinetabel gebruiken we de voorspellingstabel van onze die we eerder hebben opgeslagen nadat we ons model hebben getraind met de beste hyperparameter. Om de driftmetriek te berekenen, berekent Databricks de profielmetrieken voor zowel de primaire als de baselinetabel. Hier kunt u meer lezen over de validatiedataset . primaire tabel en baselinetabel #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") Nu zijn we klaar om ons monitoring dashboard te creëren. We kunnen dit doen met behulp van de of de Lakehouse Monitoring API. Hier gebruiken we de tweede optie: UI # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) nadat we de code hebben uitgevoerd duurt het even voordat Databricks alle metriek heeft berekend. Om het dashboard te zien ga je naar het tabblad van je sinktabel (d.w.z. ). Je zou een pagina als volgt moeten zien. Quality unpacked_requests_table_name Als u op de klikt, ziet u de lopende, in behandeling zijnde en eerdere vernieuwingen. Klik op het om uw dashboard te openen. refresh history View Dashboard dus we beginnen met de inferentietabel ( ), verwerken deze en slaan het resultaat op in en geven deze tabel samen met onze basislijntabel ( ) door aan onze monitoring API. De DLM berekent de profielmetrieken voor elke tabel ( ) en gebruikt deze om de driftmetrieken te berekenen ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics Daar heb je het! Je hebt alles wat je nodig hebt om je model te bedienen en te monitoren! In het volgende deel laat ik je zien hoe je dit proces kunt automatiseren met behulp van en ! Databricks Assets Bundle Gitlab In het hebben we de eerste stappen gezet voor het bouwen van een end-to-end MLOps-pipeline met behulp van Databricks en Spark, geleid door de referentiearchitectuur van Databricks. Hier is een samenvatting van de belangrijkste stappen die we hebben behandeld: eerste deel van deze tutorialserie : We hebben onze gegevens georganiseerd in bronzen, zilveren en gouden lagen binnen de Unity Catalog, waarmee we een gestructureerd en efficiënt gegevensbeheersysteem hebben opgezet. Het opzetten van de Unity Catalog voor Medallion Architecture : We hebben laten zien hoe u ruwe gegevens in het systeem kunt importeren, zodat u consistentie en kwaliteit kunt garanderen voor de daaropvolgende verwerkingsfasen. Gegevens opnemen in Unity Catalog : met behulp van Databricks hebben we een machine learning-model getraind dat is afgestemd op onze dataset. Hierbij hebben we de best practices voor schaalbare en effectieve modelontwikkeling gevolgd. Het model trainen : om de modelprestaties te verbeteren, hebben we HyperOpt ingezet om de zoektocht naar optimale hyperparameters te automatiseren, waardoor de nauwkeurigheid en efficiëntie werden verbeterd. Hyperparameterafstemming met HyperOpt : We hebben MLflow gebruikt om onze experimenten te loggen en te monitoren. Zo houden we een uitgebreid overzicht bij van modelversies, statistieken en parameters, zodat we deze eenvoudig kunnen vergelijken en reproduceren. Experimenten bijhouden met Databricks MLflow Met deze fundamentele stappen voltooid, is uw model nu klaar voor implementatie. In dit tweede deel richten we ons op het integreren van twee kritieke componenten in ons systeem: : implementatie van batchverwerking om voorspellingen te genereren op basis van grote datasets, geschikt voor toepassingen zoals bulk-scoring en periodieke rapportage. Batch-inferentie : het opzetten van realtime modelserveren om onmiddellijke voorspellingen te doen, essentieel voor interactieve toepassingen en services. Online inferentie (modelserveren) om ervoor te zorgen dat uw geïmplementeerde modellen in de loop van de tijd optimale prestaties en betrouwbaarheid behouden. Modelbewaking: Laten we beginnen! Modelimplementatie Het uitgangspunt van de vorige blog was modelevaluatie. Stel je nu voor dat we de vergelijking hebben gedaan en hebben ontdekt dat ons model een hogere prestatie laat zien in vergelijking met dit productiemodel. Omdat we het model in productie willen (aannemen) gebruiken, willen we profiteren van alle gegevens die we hebben. De volgende stap is om het model te trainen en te testen met behulp van de volledige dataset. Bewaar ons model vervolgens voor later gebruik door het te implementeren als ons kampioensmodel. Omdat dit het laatste model is dat we willen gebruiken voor inferentie, gebruiken we de Feature Engineering-client om het model te trainen. Op deze manier kunnen we niet alleen de modellijn gemakkelijker volgen, maar ook de schemavalidatie en featuretransformatie (indien van toepassing) uitbesteden aan de client. with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) we kunnen ook de gebruiken om de modellen te trainen en te loggen Feature Store of Feature Engineering API's model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) Wanneer we de feature engineering API gebruiken, kunnen we de afstamming van het model bekijken in Catalog Explorer Laten we nu de modelbeschrijving bijwerken en er een Champion-label aan toewijzen. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) Ga nu verder en controleer het schema waarmee u het model hebt geregistreerd. U zou al uw updates als volgt moeten zien : Als u workspace gebruikt voor modelregistratie, moet u stages gebruiken om uw modellen te beheren. Het gebruik van aliassen werkt niet. Bekijk om te zien hoe het werkt Model stages hier Modelinferentie Batch-scoren Stel je nu voor dat we ons model in productie willen gebruiken voor inferentie. In deze stap laden we het championmodel en gebruiken het om 20 filmaanbevelingen voor elke gebruiker te genereren. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) en u kunt zien dat we dezelfde trainingsgegevens hebben gebruikt voor batch-scoring. Hoewel het in het geval van aanbevelingssystemen logisch is, willen we in de meeste toepassingen het model gebruiken om wat ongeziene gegevens te scoren. Stel u bijvoorbeeld voor dat u Netflix bent en de aanbevelingen van de gebruiker aan het einde van de dag wilt bijwerken op basis van hun nieuwe kijklijst. We kunnen een taak plannen die de batch-scoring op een specifiek tijdstip aan het einde van de dag uitvoert. Nu kunnen we doorgaan en de aanbevelingen voor elke gebruiker genereren. Hiervoor vinden we de top 20 items per gebruiker from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) zo ziet het resultaat eruit Ten slotte kunnen we de voorspelling opslaan als een deltalabel op onze UC of ze publiceren naar een downstream-systeem Mongo DB of Azure Cosmos DB. We gaan met de eerste optie df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") Streaming/online-inferentie Stel je nu eens een geval voor waarin we onze aanbevelingen willen updaten op basis van realtime gebruikersinteracties. Voor dit geval kunnen we model serving gebruiken. Wanneer iemand jouw model wil gebruiken, kan hij/zij data naar de server sturen. De server stuurt die data vervolgens naar jouw geïmplementeerde model, dat in actie komt, de data analyseert en een voorspelling genereert. Ze kunnen worden gebruikt in webapplicaties, mobiele apps of zelfs embedded systemen. Een van de toepassingen van deze aanpak is om traffic routing mogelijk te maken voor A/B-testen. Het ALS-algoritme kan niet direct worden gebruikt voor online inferentie, omdat het vereist dat het model opnieuw wordt getraind met behulp van de volledige data (oud + nieuw) om de aanbevelingen bij te werken. Gradient Descent-leeralgoritmen zijn voorbeelden van modellen die kunnen worden gebruikt voor online updates. We kunnen in toekomstige berichten naar enkele van deze algoritmen kijken. Om te illustreren hoe zo'n model zou werken, maken we een (nutteloos) model dat als eindpunt fungeert en filmbeoordelingen voorspelt op basis van de beoordeling door een gebruiker! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) Dit zal een lunchmodelservingcluster voor ons creëren, dus het kost wat tijd. Als u nu het -venster opent, zou u uw eindpunt moeten zien. Serving we kunnen één eindpunt gebruiken om meerdere modellen te bedienen. Vervolgens kunnen we verkeersroutering gebruiken voor scenario's zoals A/B-testen of de prestaties van verschillende modellen in de productie vergelijken. Inferentietabel Inferentietabellen in Databricks Model Serving fungeren als een automatisch logboek voor onze geïmplementeerde modellen. Wanneer ingeschakeld, leggen ze inkomende verzoeken (gegevens verzonden voor voorspelling), de bijbehorende modeluitvoer (voorspellingen) en enkele andere metagegevens vast als een Delta-tabel binnen Unity Catalog. We kunnen de inferentietabel gebruiken voor , en een gegevensverzamelingsprocedure voor of van onze modellen. monitoring en debugging lineage tracking het opnieuw trainen verfijnen We kunnen de op ons serving endpoint inschakelen om het model te monitoren. We kunnen dit doen door de eigenschappen in de payload te specificeren wanneer we het endpoint voor het eerst aanmaken. Of we updaten ons endpoint daarna met de opdracht en de endpoint URL als volgt (meer inference table auto_capture_config put config ) hier data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) Laten we nu het eindpunt voeden met wat dummy-gebruikersinteractiegegevens import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) We kunnen de endpoint logs controleren in de tabel. Het duurt ongeveer 10 minuten voordat u de data in de tabel kunt zien. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) je zou zoiets moeten zien als je payloadtabel Om het schema van deze inferentietabel te begrijpen, kijk naar “Unity catalogus inferentietabel schema==” .== hier Modelbewaking Model- en datamonitoring zijn een complex onderwerp dat veel tijd kost om onder de knie te krijgen. Databricks Lakehouse Monitoring (DLM) vermindert de overhead van het bouwen van een goed monitoringsysteem door standaard en aanpasbare sjablonen te bieden voor veelvoorkomende use cases. Het onder de knie krijgen van DLM en modelmonitoring in het algemeen vereist echter veel experimenten. Ik wil u hier geen uitgebreid overzicht van modelmonitoring geven, maar u eerder een startpunt geven. Ik wijd misschien in de toekomst een blog aan dit onderwerp. Een korte samenvatting van DLM-functionaliteiten en -functies Nu ons model up and running is, kunnen we de inferentietabel die door ons serving endpoint is gegenereerd gebruiken om belangrijke statistieken te monitoren, zoals modelprestaties en drift, om afwijkingen of anomalieën in onze data of model in de loop van de tijd te detecteren. Deze proactieve aanpak helpt ons om tijdig corrigerende maatregelen te nemen, zoals het opnieuw trainen van het model of het updaten van de functies, om optimale prestaties en afstemming op bedrijfsdoelstellingen te behouden. DLM biedt drie typen analyse of : , en . Omdat we geïnteresseerd zijn in het analyseren van onze inferencetabel, richten we ons op de laatste. Om een tabel te gebruiken voor monitoring - onze " ", moeten we ervoor zorgen dat de tabel de juiste structuur heeft. Voor de moet elke rij overeenkomen met een verzoek met de volgende kolommen: profile type Time Series Snapshot Inference primaire tabel inferencetabel modelkenmerken modelvoorspelling model-id : tijdstempel van het inferentieverzoek timestamp (optioneel) grondwaarheid De is belangrijk voor gevallen waarin we meerdere modellen bedienen en we de prestaties van elk model in één monitoringdashboard willen volgen. Als er meer dan één model-id beschikbaar is, gebruikt DLM deze om de gegevens te slicen en metrische gegevens en statistische gegevens voor elke slice afzonderlijk te berekenen. model-id DLM berekent elke statistiek en metriek voor een bepaald tijdsinterval. Voor inferentieanalyse gebruikte het de , plus een door de gebruiker gedefinieerde venstergrootte om de tijdsvensters te identificeren. meer hieronder. timestampkolom DLM ondersteunt twee voor inferentietabellen: " " of " ". Het berekent enkele van de relevante statistieken en statistieken op basis van deze specificatie. problem type classificatie regressie Om DLM te gebruiken, moeten we een monitor maken en deze aan een tabel koppelen. Wanneer we dit doen, maakt DLM twee : metric tables : deze tabel bevat samenvattende statistieken zoals min, max, percentage nullen en nullen. Het bevat ook aanvullende metrieken op basis van het door de gebruiker gedefinieerde probleemtype. Bijvoorbeeld , en voor de classificatiemodellen, en en voor regressiemodellen. profielmetriektabel precisie recall f1_score mean_squared_error mean_average_error : bevat statistieken die meten hoe de distributie van data is veranderd of ten opzichte van een . Het berekent metingen zoals Chi-kwadraattest, KS-test. drift metrische tabel in de loop van de tijd basislijnwaarde (indien verstrekt) om de lijst met complete metrics voor elke tabel te bekijken, controleer documentation page. Het is ook mogelijk om Monitor metric table te maken. aangepaste metrics Een belangrijk aspect van het bouwen van een monitoringsysteem is om ervoor te zorgen dat ons monitoringdashboard toegang heeft tot de nieuwste inferentiegegevens zodra ze binnenkomen. Hiervoor kunnen we gebruiken om verwerkte rijen in de inferentietabel bij te houden. We gebruiken de inferentietabel van de modelserver als onze brontabel ( ) en de monitoringtabel als de sinktabel ( ). We zorgen er ook voor dat de (CDC) is ingeschakeld op beide tabellen (het is standaard ingeschakeld op de inferentietabel). Op deze manier verwerken we alleen wijzigingen - invoegen/bijwerken/verwijderen - in de brontabel in plaats van de hele tabel opnieuw te verwerken bij elke vernieuwing. Delta-tabelstreaming readStream writeStream Change Data Capture Praktisch Om de monitoring van onze inferentietabel mogelijk te maken, nemen we de volgende stappen: Lees de inferentietabel als een streamingtabel Maak een nieuwe deltatabel met het juiste schema door de inferentietabel uit te pakken die is gegenereerd door ons modelserverend eindpunt. Maak de basislijntabel (indien van toepassing) klaar Maak een monitor over de resulterende tabel en vernieuw de metriek Plan een workflow om de inferentietabel uit te pakken naar de juiste structuur en de statistieken te vernieuwen Eerst moeten we de Lakehouse Monitoring API installeren. Deze zou al geïnstalleerd moeten zijn als u Databricks rum time 15.3 LTS en hoger gebruikt: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() Laten we de inferentietabel lezen als een streamingtabel requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True Vervolgens moeten we de tabel in het juiste formaat zetten zoals hierboven beschreven. Deze tabel moet één rij hebben voor elke voorspelling met relevante kenmerken en voorspellingswaarde. De inferentietabel die we krijgen van het model dat het eindpunt bedient, slaat de eindpuntverzoeken en -reacties op als een geneste JSON-indeling. Hier is een voorbeeld van de JSON-payload voor de aanvraag- en reactiekolom. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | Om deze tabel uit te pakken naar het juiste schema kunnen we de volgende code gebruiken die is aangepast uit de Databricks-documentatie ( ). Inference table Lakehouse Monitoring starter notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned De resulterende tabel ziet er als volgt uit: Vervolgens moeten we onze sink-tabel initialiseren dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() en schrijf de resultaten checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ Ten slotte maken we onze baselinetabel. DLM gebruikt deze tabel om de drifts te berekenen door de distributie van vergelijkbare kolommen van baseline- en primaire modellen te vergelijken. De baselinetabel moet dezelfde featurekolom hebben als de primaire kolom en dezelfde modelidentificatiekolom. Voor de baselinetabel gebruiken we de voorspellingstabel van onze die we eerder hebben opgeslagen nadat we ons model hebben getraind met de beste hyperparameter. Om de driftmetriek te berekenen, berekent Databricks de profielmetrieken voor zowel de primaire als de baselinetabel. Hier kunt u meer lezen over de validatiedataset . primaire tabel en baselinetabel #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") Nu zijn we klaar om ons monitoring dashboard te creëren. We kunnen dit doen met behulp van de of de Lakehouse Monitoring API. Hier gebruiken we de tweede optie: UI # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) nadat we de code hebben uitgevoerd duurt het even voordat Databricks alle metriek heeft berekend. Om het dashboard te zien ga je naar het tabblad van je sinktabel (d.w.z. ). Je zou een pagina als volgt moeten zien. Quality unpacked_requests_table_name Als u op de klikt, ziet u de lopende, in behandeling zijnde en eerdere vernieuwingen. Klik op het om uw dashboard te openen. refresh history View Dashboard dus we beginnen met de inferentietabel ( ), verwerken deze en slaan het resultaat op in en geven deze tabel samen met onze basislijntabel ( ) door aan onze monitoring API. De DLM berekent de profielmetrieken voor elke tabel ( ) en gebruikt deze om de driftmetrieken te berekenen ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics Daar heb je het! Je hebt alles wat je nodig hebt om je model te bedienen en te monitoren! In het volgende deel laat ik je zien hoe je dit proces kunt automatiseren met behulp van en ! Databricks Assets Bundle Gitlab