Im Reihe haben wir die ersten Schritte zum Aufbau einer End-to-End-MLOps-Pipeline mit Databricks und Spark unternommen, geleitet von der Referenzarchitektur von Databricks. Hier ist eine Zusammenfassung der wichtigsten Schritte, die wir behandelt haben: ersten Teil dieser Tutorial- : Wir haben unsere Daten im Unity-Katalog in Bronze-, Silber- und Goldebenen organisiert und so ein strukturiertes und effizientes Datenverwaltungssystem eingerichtet. Einrichten des Unity-Katalogs für Medallion Architecture : Wir haben gezeigt, wie Rohdaten in das System importiert werden, um Konsistenz und Qualität für nachfolgende Verarbeitungsphasen sicherzustellen. Daten in den Unity-Katalog aufnehmen : Mithilfe von Databricks haben wir ein auf unseren Datensatz zugeschnittenes maschinelles Lernmodell trainiert und dabei Best Practices für eine skalierbare und effektive Modellentwicklung befolgt. Trainieren des Modells : Um die Modellleistung zu verbessern, haben wir HyperOpt eingesetzt, um die Suche nach optimalen Hyperparametern zu automatisieren und so Genauigkeit und Effizienz zu verbessern. Hyperparameter-Tuning mit HyperOpt : Wir haben MLflow zum Protokollieren und Überwachen unserer Experimente verwendet und eine umfassende Aufzeichnung der Modellversionen, Metriken und Parameter geführt, um einen einfachen Vergleich und die Reproduzierbarkeit zu ermöglichen. Experimentverfolgung mit Databricks MLflow Wenn diese grundlegenden Schritte abgeschlossen sind, ist Ihr Modell nun bereit für die Bereitstellung. In diesem zweiten Teil konzentrieren wir uns auf die Integration zweier wichtiger Komponenten in unser System: : Implementierung der Batch-Verarbeitung zur Generierung von Vorhersagen für große Datensätze, geeignet für Anwendungen wie Massenbewertung und regelmäßige Berichterstattung. Batch-Inferenz : Einrichten der Modellbereitstellung in Echtzeit, um sofortige Vorhersagen zu ermöglichen, die für interaktive Anwendungen und Dienste unerlässlich sind. Online-Inferenz (Modellbereitstellung) um sicherzustellen, dass Ihre bereitgestellten Modelle im Laufe der Zeit optimale Leistung und Zuverlässigkeit beibehalten. Modellüberwachung: Lassen Sie uns loslegen! Modellbereitstellung Der Ausgangspunkt des letzten Blogs war die Modellbewertung. Stellen Sie sich nun vor, wir haben den Vergleich durchgeführt und festgestellt, dass unser Modell im Vergleich zu diesem Produktionsmodell eine höhere Leistung aufweist. Da wir das Modell (angenommen) in der Produktion verwenden möchten, möchten wir alle Daten nutzen, die wir haben. Der nächste Schritt besteht darin, das Modell mit dem vollständigen Datensatz zu trainieren und zu testen. Anschließend speichern wir unser Modell für die spätere Verwendung, indem wir es als unser Champion-Modell bereitstellen. Da dies das endgültige Modell ist, das wir für die Inferenz verwenden möchten, verwenden wir den Feature Engineering-Client, um das Modell zu trainieren. Auf diese Weise können wir nicht nur die Modellherkunft einfacher verfolgen, sondern auch die Schemavalidierung und Featuretransformation (falls vorhanden) an den Client auslagern. with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) Wir können auch den verwenden, um die Modelle zu trainieren und zu protokollieren Feature Store oder die Feature Engineering APIs model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) Wenn wir die Feature Engineering API verwenden, können wir die Herkunft des Modells im Catalog Explorer anzeigen Aktualisieren wir nun die Modellbeschreibung und weisen ihr das Label „Champion“ zu. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) Überprüfen Sie nun das Schema, in dem Sie das Modell registriert haben. Sie sollten alle Ihre Aktualisierungen wie folgt sehen : Wenn Sie den Arbeitsbereich für die Modellregistrierung verwenden, sollten Sie Phasen zur Verwaltung Ihrer Modelle verwenden. Die Verwendung von Aliasnamen funktioniert nicht. Schauen Sie nach um zu sehen, wie es funktioniert Modellphasen hier Modellinferenz Stapelbewertung Stellen Sie sich nun vor, wir möchten unser Modell in der Produktion für Inferenzen verwenden. In diesem Schritt laden wir das Champion-Modell und verwenden es, um 20 Filmempfehlungen für jeden Benutzer zu generieren. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) und Sie können sehen, dass wir dieselben Trainingsdaten für die Batch-Bewertung verwendet haben. Obwohl dies bei Empfehlungssystemen sinnvoll ist, möchten wir das Modell in den meisten Anwendungen verwenden, um einige unsichtbare Daten zu bewerten. Stellen Sie sich beispielsweise vor, Sie sind Netflix und möchten die Benutzerempfehlungen am Ende des Tages basierend auf ihrer neuen Beobachtungsliste aktualisieren. Wir können einen Job planen, der die Batch-Bewertung zu einem bestimmten Zeitpunkt am Ende des Tages ausführt. Jetzt können wir fortfahren und die Empfehlungen für jeden Benutzer generieren. Dazu finden wir die Top 20 Artikel pro Benutzer from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) so sieht das Ergebnis aus Schließlich können wir die Vorhersage als Delta-Label auf unserer UC speichern oder sie in einem nachgelagerten System, Mongo DB oder Azure Cosmos DB, veröffentlichen. Wir entscheiden uns für die erste Option. df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") Streaming/Online-Inferenz Stellen Sie sich nun einen Fall vor, in dem wir unsere Empfehlungen basierend auf Echtzeit-Benutzerinteraktionen aktualisieren möchten. Für diesen Fall können wir Model Serving verwenden. Wenn jemand Ihr Modell verwenden möchte, kann er Daten an den Server senden. Der Server überträgt diese Daten dann an Ihr bereitgestelltes Modell, das in Aktion tritt, die Daten analysiert und eine Vorhersage generiert. Sie können in Webanwendungen, mobilen Apps oder sogar eingebetteten Systemen verwendet werden. Eine der Anwendungen dieses Ansatzes besteht darin, die Verkehrsführung für A/B-Tests zu ermöglichen. Der ALS-Algorithmus kann nicht direkt für Online-Inferenzen verwendet werden, da er ein erneutes Training des Modells mit den gesamten Daten (alt + neu) erfordert, um die Empfehlungen zu aktualisieren. Gradient Descent-Lernalgorithmen sind Beispiele für Modelle, die für Online-Updates verwendet werden können. Wir werden uns einige dieser Algorithmen möglicherweise in einem zukünftigen Beitrag ansehen. Um jedoch zu veranschaulichen, wie ein solches Modell funktionieren würde, erstellen wir einen (nutzlosen) Modell-Serving-Endpunkt, der die Filmbewertung vorhersagt, basierend auf der Bewertung eines Films durch einen Benutzer! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) Dadurch wird ein Cluster für die Bereitstellung des Lunch-Modells für uns erstellt, was einige Zeit in Anspruch nimmt. Wenn Sie jetzt das Fenster öffnen, sollten Sie Ihren Endpunkt sehen. Serving Wir können einen Endpunkt verwenden, um mehrere Modelle zu bedienen. Dann können wir die Verkehrsführung für Szenarien wie A/B-Tests verwenden oder die Leistung verschiedener Modelle in der Produktion vergleichen. Inferenztabelle Inferenztabellen in Databricks Model Serving fungieren als automatisches Protokoll für unsere bereitgestellten Modelle. Wenn sie aktiviert sind, erfassen sie eingehende Anfragen (zur Vorhersage gesendete Daten), die entsprechenden Modellausgaben (Vorhersagen) und einige andere Metadaten als Delta-Tabelle im Unity Catalog. Wir können Inferenztabellen zum , und als Datenerfassungsverfahren zum oder unserer Modelle verwenden. Überwachen und Debuggen zur Herkunftsverfolgung erneuten Trainieren Feinabstimmen Wir können die auf unserem Serving-Endpunkt aktivieren, um das Modell zu überwachen. Wir können dies tun, indem wir die -Eigenschaften in der Nutzlast angeben, wenn wir den Endpunkt zum ersten Mal erstellen. Oder wir aktualisieren unseren Endpunkt anschließend mit dem -Befehl und der Endpunkt-URL wie folgt (mehr inference table auto_capture_config put config ) hier data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) Jetzt füttern wir den Endpunkt mit einigen Dummy-Benutzerinteraktionsdaten import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) Wir können die Endpunktprotokolle in der Tabelle überprüfen. Es dauert etwa 10 Minuten, bis Sie die Daten in der Tabelle sehen können. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) Sie sollten in etwa diese Nutzlasttabelle sehen Um das Schema dieser Inferenztabelle zu verstehen, lesen Sie „Unity-Katalog-Inferenztabellenschema==“.== hier Modellüberwachung Modell- und Datenüberwachung ist ein komplexes Thema, dessen Beherrschung viel Zeit in Anspruch nimmt. Databricks Lakehouse Monitoring (DLM) reduziert den Aufwand für den Aufbau eines geeigneten Überwachungssystems, indem es standardmäßige und anpassbare Vorlagen für gängige Anwendungsfälle bereitstellt. Die Beherrschung von DLM und Modellüberwachung im Allgemeinen erfordert jedoch viel Experimentieren. Ich möchte Ihnen hier keinen umfassenden Überblick über die Modellüberwachung geben, sondern Ihnen einen Ausgangspunkt bieten. Vielleicht widme ich diesem Thema in Zukunft einen Blog. Eine kurze Zusammenfassung der DLM-Funktionen und -Features Jetzt, da unser Modell läuft, können wir die von unserem Serving-Endpunkt generierte Inferenztabelle verwenden, um wichtige Kennzahlen wie Modellleistung und Drift zu überwachen und Abweichungen oder Anomalien in unseren Daten oder unserem Modell im Laufe der Zeit zu erkennen. Dieser proaktive Ansatz hilft uns, rechtzeitig Korrekturmaßnahmen zu ergreifen, z. B. das Modell neu zu trainieren oder seine Funktionen zu aktualisieren, um eine optimale Leistung und Ausrichtung auf die Geschäftsziele aufrechtzuerhalten. DLM bietet drei Analyse- bzw. : , und . Da wir an der Analyse unserer Inferenztabelle interessiert sind, konzentrieren wir uns auf Letzteres. Um eine Tabelle zur Überwachung zu verwenden – unsere „ “, sollten wir sicherstellen, dass die Tabelle die richtige Struktur hat. Bei der sollte jede Zeile einer Anfrage mit folgenden Spalten entsprechen: profile type Zeitreihen Snapshot Inferenz Primärtabelle Inferenztabelle Modellmerkmale Modellvorhersage Modell-ID : Zeitstempel der Inferenzanfrage timestamp (optional) Grundwahrheit Die ist wichtig, wenn wir mehrere Modelle bereitstellen und die Leistung jedes Modells in einem Überwachungs-Dashboard verfolgen möchten. Wenn mehr als eine Modell-ID verfügbar ist, verwendet DLM sie, um die Daten aufzuteilen und Metriken und Statistiken für jedes Segment separat zu berechnen. Modell-ID DLM berechnet alle Statistiken und Metriken für ein bestimmtes Zeitintervall. Für die Inferenzanalyse werden die sowie eine benutzerdefinierte Fenstergröße zur Identifizierung der Zeitfenster verwendet. Weitere Informationen finden Sie weiter unten. Zeitstempelspalte DLM unterstützt zwei für Inferenztabellen: „ “ oder „ “. Es berechnet einige der relevanten Metriken und Statistiken basierend auf dieser Spezifikation. problem type Klassifizierung Regression Um DLM zu verwenden, müssen wir einen Monitor erstellen und ihn an eine Tabelle anhängen. Dabei erstellt DLM zwei : metric tables : Diese Tabelle enthält zusammenfassende Statistiken wie Minimum, Maximum, Prozentsatz von Nullen und Nullen. Sie enthält außerdem zusätzliche Metriken basierend auf dem vom Benutzer definierten Problemtyp. Zum Beispiel , und für die Klassifizierungsmodelle und und für Regressionsmodelle. Profilmetriktabelle Präzision Rückruf f1_score mittlerer quadrierter Fehler mittlerer durchschnittlicher Fehler : Sie enthält Statistiken, die messen, wie sich die Verteilung der Daten oder im Verhältnis zu einem geändert hat. Sie berechnet Kennzahlen wie den Chi-Quadrat-Test und den KS-Test. Driftmetriktabelle im Laufe der Zeit Basiswert (sofern angegeben) Um die Liste der vollständigen Metriken für jede Tabelle anzuzeigen, sehen Sie auf der Dokumentationsseite nach. Es ist auch möglich der Monitormetriktabelle zu erstellen. , benutzerdefinierte Metriken Ein wichtiger Aspekt beim Aufbau eines Überwachungssystems besteht darin, sicherzustellen, dass unser Überwachungs-Dashboard Zugriff auf die neuesten Inferenzdaten hat, sobald diese eintreffen. Dazu können wir verwenden, um die verarbeiteten Zeilen in der Inferenztabelle zu verfolgen. Wir verwenden die Inferenztabelle des Modells als unsere Quelltabelle ( ) und die Überwachungstabelle als Senkentabelle ( ). Wir stellen außerdem sicher, dass (CDC) für beide Tabellen aktiviert ist (standardmäßig ist es für die Inferenztabelle aktiviert). Auf diese Weise verarbeiten wir nur Änderungen – Einfügen/Aktualisieren/Löschen – in der Quelltabelle, anstatt die gesamte Tabelle bei jeder Aktualisierung erneut zu verarbeiten. Delta Table Streaming readStream writeStream Change Data Capture Praxisnah Um das Monitoring über unsere Inferenztabelle zu ermöglichen, führen wir folgende Schritte aus: Lesen Sie die Inferenztabelle als Streaming-Tabelle Erstellen Sie eine neue Delta-Tabelle mit dem richtigen Schema, indem Sie die Inferenztabelle entpacken, die von unserem Modell-Serving-Endpunkt generiert wird. Bereiten Sie die Basistabelle vor (sofern vorhanden). Erstellen Sie einen Monitor über die resultierende Tabelle und aktualisieren Sie die Metrik Planen Sie einen Workflow, um die Inferenztabelle in die richtige Struktur zu entpacken und die Metriken zu aktualisieren Zuerst müssen wir die Lakehouse Monitoring API installieren. Sie sollte bereits installiert sein, wenn Sie Databricks Rum Time 15.3 LTS und höher verwenden: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() Lesen wir die Inferenztabelle als Streaming-Tabelle requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True Als nächstes müssen wir die Tabelle wie oben beschrieben in das richtige Format bringen. Diese Tabelle sollte für jede Vorhersage eine Zeile mit den relevanten Merkmalen und Vorhersagewerten enthalten. Die Inferenztabelle, die wir vom Endpunkt des Modells erhalten, speichert die Endpunktanforderungen und -antworten in einem verschachtelten JSON-Format. Hier ist ein Beispiel für die JSON-Nutzlast für die Anforderungs- und Antwortspalte. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | Um diese Tabelle in das richtige Schema zu entpacken, können wir den folgenden Code verwenden, der aus der Databricks-Dokumentation ( ) angepasst ist. Inferenztabelle, Lakehouse Monitoring-Starter-Notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned Die resultierende Tabelle würde folgendermaßen aussehen: Als nächstes sollten wir unsere Sink-Tabelle initialisieren dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() und schreibe die Ergebnisse checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ Schließlich erstellen wir unsere Basistabelle. DLM verwendet diese Tabelle, um die Drifts zu berechnen, indem die Verteilung ähnlicher Spalten von Basis- und Primärmodellen verglichen wird. Die Basistabelle sollte dieselbe Merkmalsspalte wie die Primärspalte sowie dieselbe Modellidentifikationsspalte haben. Für die Basistabelle verwenden wir die Vorhersagetabelle unseres , den wir zuvor gespeichert haben, nachdem wir unser Modell mit dem besten Hyperparameter trainiert haben. Um die Driftmetrik zu berechnen, berechnet Databricks die Profilmetriken sowohl für die Primär- als auch für die Basistabelle. Hier können Sie mehr über die Validierungsdatensatzes lesen. Primärtabelle und die Basistabelle #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") Jetzt können wir unser Monitoring-Dashboard erstellen. Dies können wir entweder über die tun. oder die Lakehouse Monitoring API. Hier verwenden wir die zweite Option: Benutzeroberfläche # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) Nachdem wir den Code ausgeführt haben, dauert es einige Zeit, bis Databricks alle Metriken berechnet hat. Um das Dashboard anzuzeigen, gehen Sie zur Registerkarte Ihrer Sink-Tabelle (d. h. ). Sie sollten eine Seite wie die folgende sehen. Quality unpacked_requests_table_name Wenn Sie auf anzeigen“ klicken, werden Ihre laufenden, ausstehenden und vergangenen Aktualisierungen angezeigt. Klicken Sie auf um Ihr Dashboard zu öffnen. refresh history View Dashboard Wir beginnen also mit der Inferenztabelle ( ), verarbeiten sie, speichern das Ergebnis in und übergeben diese Tabelle zusammen mit unserer Basistabelle ( ) an unsere Überwachungs-API. Das DLM berechnet die Profilmetriken für jede Tabelle ( ) und verwendet sie zur Berechnung der Driftmetriken ( ). my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics Fertig! Sie haben alles, was Sie zum Bedienen und Überwachen Ihres Modells benötigen! Im nächsten Teil zeige ich Ihnen, wie Sie diesen Prozess mit und automatisieren können! Databricks Assets Bundle Gitlab Im Reihe haben wir die ersten Schritte zum Aufbau einer End-to-End-MLOps-Pipeline mit Databricks und Spark unternommen, geleitet von der Referenzarchitektur von Databricks. Hier ist eine Zusammenfassung der wichtigsten Schritte, die wir behandelt haben: ersten Teil dieser Tutorial- : Wir haben unsere Daten im Unity-Katalog in Bronze-, Silber- und Goldebenen organisiert und so ein strukturiertes und effizientes Datenverwaltungssystem eingerichtet. Einrichten des Unity-Katalogs für Medallion Architecture : Wir haben gezeigt, wie Rohdaten in das System importiert werden, um Konsistenz und Qualität für nachfolgende Verarbeitungsphasen sicherzustellen. Daten in den Unity-Katalog aufnehmen : Mithilfe von Databricks haben wir ein auf unseren Datensatz zugeschnittenes maschinelles Lernmodell trainiert und dabei Best Practices für eine skalierbare und effektive Modellentwicklung befolgt. Trainieren des Modells : Um die Modellleistung zu verbessern, haben wir HyperOpt eingesetzt, um die Suche nach optimalen Hyperparametern zu automatisieren und so Genauigkeit und Effizienz zu verbessern. Hyperparameter-Tuning mit HyperOpt : Wir haben MLflow zum Protokollieren und Überwachen unserer Experimente verwendet und eine umfassende Aufzeichnung der Modellversionen, Metriken und Parameter geführt, um einen einfachen Vergleich und die Reproduzierbarkeit zu ermöglichen. Experimentverfolgung mit Databricks MLflow Wenn diese grundlegenden Schritte abgeschlossen sind, ist Ihr Modell nun bereit für die Bereitstellung. In diesem zweiten Teil konzentrieren wir uns auf die Integration zweier wichtiger Komponenten in unser System: : Implementierung der Batch-Verarbeitung zur Generierung von Vorhersagen für große Datensätze, geeignet für Anwendungen wie Massenbewertung und regelmäßige Berichterstattung. Batch-Inferenz : Einrichten der Modellbereitstellung in Echtzeit, um sofortige Vorhersagen zu ermöglichen, die für interaktive Anwendungen und Dienste unerlässlich sind. Online-Inferenz (Modellbereitstellung) um sicherzustellen, dass Ihre bereitgestellten Modelle im Laufe der Zeit optimale Leistung und Zuverlässigkeit beibehalten. Modellüberwachung: Lassen Sie uns loslegen! Modellbereitstellung Der Ausgangspunkt des letzten Blogs war die Modellbewertung. Stellen Sie sich nun vor, wir haben den Vergleich durchgeführt und festgestellt, dass unser Modell im Vergleich zu diesem Produktionsmodell eine höhere Leistung aufweist. Da wir das Modell (angenommen) in der Produktion verwenden möchten, möchten wir alle Daten nutzen, die wir haben. Der nächste Schritt besteht darin, das Modell mit dem vollständigen Datensatz zu trainieren und zu testen. Anschließend speichern wir unser Modell für die spätere Verwendung, indem wir es als unser Champion-Modell bereitstellen. Da dies das endgültige Modell ist, das wir für die Inferenz verwenden möchten, verwenden wir den Feature Engineering-Client, um das Modell zu trainieren. Auf diese Weise können wir nicht nur die Modellherkunft einfacher verfolgen, sondern auch die Schemavalidierung und Featuretransformation (falls vorhanden) an den Client auslagern. with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) Wir können auch den verwenden, um die Modelle zu trainieren und zu protokollieren Feature Store oder die Feature Engineering APIs model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) Wenn wir die Feature Engineering API verwenden, können wir die Herkunft des Modells im Catalog Explorer anzeigen Aktualisieren wir nun die Modellbeschreibung und weisen ihr das Label „Champion“ zu. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) Überprüfen Sie nun das Schema, in dem Sie das Modell registriert haben. Sie sollten alle Ihre Aktualisierungen wie folgt sehen : Wenn Sie den Arbeitsbereich für die Modellregistrierung verwenden, sollten Sie Phasen zur Verwaltung Ihrer Modelle verwenden. Die Verwendung von Aliasnamen funktioniert nicht. Schauen Sie nach um zu sehen, wie es funktioniert Modellphasen hier Modellinferenz Stapelbewertung Stellen Sie sich nun vor, wir möchten unser Modell in der Produktion für Inferenzen verwenden. In diesem Schritt laden wir das Champion-Modell und verwenden es, um 20 Filmempfehlungen für jeden Benutzer zu generieren. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) und Sie können sehen, dass wir dieselben Trainingsdaten für die Batch-Bewertung verwendet haben. Obwohl dies bei Empfehlungssystemen sinnvoll ist, möchten wir das Modell in den meisten Anwendungen verwenden, um einige unsichtbare Daten zu bewerten. Stellen Sie sich beispielsweise vor, Sie sind Netflix und möchten die Benutzerempfehlungen am Ende des Tages basierend auf ihrer neuen Beobachtungsliste aktualisieren. Wir können einen Job planen, der die Batch-Bewertung zu einem bestimmten Zeitpunkt am Ende des Tages ausführt. Jetzt können wir fortfahren und die Empfehlungen für jeden Benutzer generieren. Dazu finden wir die Top 20 Artikel pro Benutzer from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) so sieht das Ergebnis aus Schließlich können wir die Vorhersage als Delta-Label auf unserer UC speichern oder sie in einem nachgelagerten System, Mongo DB oder Azure Cosmos DB, veröffentlichen. Wir entscheiden uns für die erste Option. df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") Streaming/Online-Inferenz Stellen Sie sich nun einen Fall vor, in dem wir unsere Empfehlungen basierend auf Echtzeit-Benutzerinteraktionen aktualisieren möchten. Für diesen Fall können wir Model Serving verwenden. Wenn jemand Ihr Modell verwenden möchte, kann er Daten an den Server senden. Der Server überträgt diese Daten dann an Ihr bereitgestelltes Modell, das in Aktion tritt, die Daten analysiert und eine Vorhersage generiert. Sie können in Webanwendungen, mobilen Apps oder sogar eingebetteten Systemen verwendet werden. Eine der Anwendungen dieses Ansatzes besteht darin, die Verkehrsführung für A/B-Tests zu ermöglichen. Der ALS-Algorithmus kann nicht direkt für Online-Inferenzen verwendet werden, da er ein erneutes Training des Modells mit den gesamten Daten (alt + neu) erfordert, um die Empfehlungen zu aktualisieren. Gradient Descent-Lernalgorithmen sind Beispiele für Modelle, die für Online-Updates verwendet werden können. Wir werden uns einige dieser Algorithmen möglicherweise in einem zukünftigen Beitrag ansehen. Um jedoch zu veranschaulichen, wie ein solches Modell funktionieren würde, erstellen wir einen (nutzlosen) Modell-Serving-Endpunkt, der die Filmbewertung vorhersagt, basierend auf der Bewertung eines Films durch einen Benutzer! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) Dadurch wird ein Cluster für die Bereitstellung des Lunch-Modells für uns erstellt, was einige Zeit in Anspruch nimmt. Wenn Sie jetzt das Fenster öffnen, sollten Sie Ihren Endpunkt sehen. Serving Wir können einen Endpunkt verwenden, um mehrere Modelle zu bedienen. Dann können wir die Verkehrsführung für Szenarien wie A/B-Tests verwenden oder die Leistung verschiedener Modelle in der Produktion vergleichen. Inferenztabelle Inferenztabellen in Databricks Model Serving fungieren als automatisches Protokoll für unsere bereitgestellten Modelle. Wenn sie aktiviert sind, erfassen sie eingehende Anfragen (zur Vorhersage gesendete Daten), die entsprechenden Modellausgaben (Vorhersagen) und einige andere Metadaten als Delta-Tabelle im Unity Catalog. Wir können Inferenztabellen zum , und als Datenerfassungsverfahren zum oder unserer Modelle verwenden. Überwachen und Debuggen zur Herkunftsverfolgung erneuten Trainieren Feinabstimmen Wir können die auf unserem Serving-Endpunkt aktivieren, um das Modell zu überwachen. Wir können dies tun, indem wir die -Eigenschaften in der Nutzlast angeben, wenn wir den Endpunkt zum ersten Mal erstellen. Oder wir aktualisieren unseren Endpunkt anschließend mit dem -Befehl und der Endpunkt-URL wie folgt (mehr inference table auto_capture_config put config ) hier data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) Jetzt füttern wir den Endpunkt mit einigen Dummy-Benutzerinteraktionsdaten import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) Wir können die Endpunktprotokolle in der Tabelle überprüfen. Es dauert etwa 10 Minuten, bis Sie die Daten in der Tabelle sehen können. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) Sie sollten in etwa diese Nutzlasttabelle sehen Um das Schema dieser Inferenztabelle zu verstehen, lesen Sie „Unity-Katalog-Inferenztabellenschema==“.== hier Modellüberwachung Modell- und Datenüberwachung ist ein komplexes Thema, dessen Beherrschung viel Zeit in Anspruch nimmt. Databricks Lakehouse Monitoring (DLM) reduziert den Aufwand für den Aufbau eines geeigneten Überwachungssystems, indem es standardmäßige und anpassbare Vorlagen für gängige Anwendungsfälle bereitstellt. Die Beherrschung von DLM und Modellüberwachung im Allgemeinen erfordert jedoch viel Experimentieren. Ich möchte Ihnen hier keinen umfassenden Überblick über die Modellüberwachung geben, sondern Ihnen einen Ausgangspunkt bieten. Vielleicht widme ich diesem Thema in Zukunft einen Blog. Eine kurze Zusammenfassung der DLM-Funktionen und -Features Jetzt, da unser Modell läuft, können wir die von unserem Serving-Endpunkt generierte Inferenztabelle verwenden, um wichtige Kennzahlen wie Modellleistung und Drift zu überwachen und Abweichungen oder Anomalien in unseren Daten oder unserem Modell im Laufe der Zeit zu erkennen. Dieser proaktive Ansatz hilft uns, rechtzeitig Korrekturmaßnahmen zu ergreifen, z. B. das Modell neu zu trainieren oder seine Funktionen zu aktualisieren, um eine optimale Leistung und Ausrichtung auf die Geschäftsziele aufrechtzuerhalten. DLM bietet drei Analyse- bzw. : , und . Da wir an der Analyse unserer Inferenztabelle interessiert sind, konzentrieren wir uns auf Letzteres. Um eine Tabelle zur Überwachung zu verwenden – unsere „ “, sollten wir sicherstellen, dass die Tabelle die richtige Struktur hat. Bei der sollte jede Zeile einer Anfrage mit folgenden Spalten entsprechen: profile type Zeitreihen Snapshot Inferenz Primärtabelle Inferenztabelle Modellmerkmale Modellvorhersage Modell-ID : Zeitstempel der Inferenzanfrage timestamp (optional) Grundwahrheit Die ist wichtig, wenn wir mehrere Modelle bereitstellen und die Leistung jedes Modells in einem Überwachungs-Dashboard verfolgen möchten. Wenn mehr als eine Modell-ID verfügbar ist, verwendet DLM sie, um die Daten aufzuteilen und Metriken und Statistiken für jedes Segment separat zu berechnen. Modell-ID DLM berechnet alle Statistiken und Metriken für ein bestimmtes Zeitintervall. Für die Inferenzanalyse werden die sowie eine benutzerdefinierte Fenstergröße zur Identifizierung der Zeitfenster verwendet. Weitere Informationen finden Sie weiter unten. Zeitstempelspalte DLM unterstützt zwei für Inferenztabellen: „ “ oder „ “. Es berechnet einige der relevanten Metriken und Statistiken basierend auf dieser Spezifikation. problem type Klassifizierung Regression Um DLM zu verwenden, müssen wir einen Monitor erstellen und ihn an eine Tabelle anhängen. Dabei erstellt DLM zwei : metric tables : Diese Tabelle enthält zusammenfassende Statistiken wie Minimum, Maximum, Prozentsatz von Nullen und Nullen. Sie enthält außerdem zusätzliche Metriken basierend auf dem vom Benutzer definierten Problemtyp. Zum Beispiel , und für die Klassifizierungsmodelle und und für Regressionsmodelle. Profilmetriktabelle Präzision Rückruf f1_score mittlerer quadrierter Fehler mittlerer durchschnittlicher Fehler : Sie enthält Statistiken, die messen, wie sich die Verteilung der Daten oder im Verhältnis zu einem geändert hat. Sie berechnet Kennzahlen wie den Chi-Quadrat-Test und den KS-Test. Driftmetriktabelle im Laufe der Zeit Basiswert (sofern angegeben) Um die Liste der vollständigen Metriken für jede Tabelle anzuzeigen, sehen Sie auf der Dokumentationsseite nach. Es ist auch möglich der Monitormetriktabelle zu erstellen. , benutzerdefinierte Metriken Ein wichtiger Aspekt beim Aufbau eines Überwachungssystems besteht darin, sicherzustellen, dass unser Überwachungs-Dashboard Zugriff auf die neuesten Inferenzdaten hat, sobald diese eintreffen. Dazu können wir verwenden, um die verarbeiteten Zeilen in der Inferenztabelle zu verfolgen. Wir verwenden die Inferenztabelle des Modells als unsere Quelltabelle ( ) und die Überwachungstabelle als Senkentabelle ( ). Wir stellen außerdem sicher, dass (CDC) für beide Tabellen aktiviert ist (standardmäßig ist es für die Inferenztabelle aktiviert). Auf diese Weise verarbeiten wir nur Änderungen – Einfügen/Aktualisieren/Löschen – in der Quelltabelle, anstatt die gesamte Tabelle bei jeder Aktualisierung erneut zu verarbeiten. Delta Table Streaming readStream writeStream Change Data Capture Praxisnah Um das Monitoring über unsere Inferenztabelle zu ermöglichen, führen wir folgende Schritte aus: Lesen Sie die Inferenztabelle als Streaming-Tabelle Erstellen Sie eine neue Delta-Tabelle mit dem richtigen Schema, indem Sie die Inferenztabelle entpacken, die von unserem Modell-Serving-Endpunkt generiert wird. Bereiten Sie die Basistabelle vor (sofern vorhanden). Erstellen Sie einen Monitor über die resultierende Tabelle und aktualisieren Sie die Metrik Planen Sie einen Workflow, um die Inferenztabelle in die richtige Struktur zu entpacken und die Metriken zu aktualisieren Zuerst müssen wir die Lakehouse Monitoring API installieren. Sie sollte bereits installiert sein, wenn Sie Databricks Rum Time 15.3 LTS und höher verwenden: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() Lesen wir die Inferenztabelle als Streaming-Tabelle requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True Als nächstes müssen wir die Tabelle wie oben beschrieben in das richtige Format bringen. Diese Tabelle sollte für jede Vorhersage eine Zeile mit den relevanten Merkmalen und Vorhersagewerten enthalten. Die Inferenztabelle, die wir vom Endpunkt des Modells erhalten, speichert die Endpunktanforderungen und -antworten in einem verschachtelten JSON-Format. Hier ist ein Beispiel für die JSON-Nutzlast für die Anforderungs- und Antwortspalte. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | Um diese Tabelle in das richtige Schema zu entpacken, können wir den folgenden Code verwenden, der aus der Databricks-Dokumentation ( ) angepasst ist. Inferenztabelle, Lakehouse Monitoring-Starter-Notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned Die resultierende Tabelle würde folgendermaßen aussehen: Als nächstes sollten wir unsere Sink-Tabelle initialisieren dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() und schreibe die Ergebnisse checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ Schließlich erstellen wir unsere Basistabelle. DLM verwendet diese Tabelle, um die Drifts zu berechnen, indem die Verteilung ähnlicher Spalten von Basis- und Primärmodellen verglichen wird. Die Basistabelle sollte dieselbe Merkmalsspalte wie die Primärspalte sowie dieselbe Modellidentifikationsspalte haben. Für die Basistabelle verwenden wir die Vorhersagetabelle unseres , den wir zuvor gespeichert haben, nachdem wir unser Modell mit dem besten Hyperparameter trainiert haben. Um die Driftmetrik zu berechnen, berechnet Databricks die Profilmetriken sowohl für die Primär- als auch für die Basistabelle. Hier können Sie mehr über die Validierungsdatensatzes lesen. Primärtabelle und die Basistabelle #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") Jetzt können wir unser Monitoring-Dashboard erstellen. Dies können wir entweder über die tun. oder die Lakehouse Monitoring API. Hier verwenden wir die zweite Option: Benutzeroberfläche # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) Nachdem wir den Code ausgeführt haben, dauert es einige Zeit, bis Databricks alle Metriken berechnet hat. Um das Dashboard anzuzeigen, gehen Sie zur Registerkarte Ihrer Sink-Tabelle (d. h. ). Sie sollten eine Seite wie die folgende sehen. Quality unpacked_requests_table_name Wenn Sie auf anzeigen“ klicken, werden Ihre laufenden, ausstehenden und vergangenen Aktualisierungen angezeigt. Klicken Sie auf um Ihr Dashboard zu öffnen. refresh history View Dashboard Wir beginnen also mit der Inferenztabelle ( ), verarbeiten sie, speichern das Ergebnis in und übergeben diese Tabelle zusammen mit unserer Basistabelle ( ) an unsere Überwachungs-API. Das DLM berechnet die Profilmetriken für jede Tabelle ( ) und verwendet sie zur Berechnung der Driftmetriken ( ). my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics Fertig! Sie haben alles, was Sie zum Bedienen und Überwachen Ihres Modells benötigen! Im nächsten Teil zeige ich Ihnen, wie Sie diesen Prozess mit und automatisieren können! Databricks Assets Bundle Gitlab