paint-brush
Bauen wir eine MLOps-Pipeline mit Databricks und Spark – Teil 2von@neshom
Neue Geschichte

Bauen wir eine MLOps-Pipeline mit Databricks und Spark – Teil 2

von Mohsen Jadidi42m2024/12/29
Read on Terminal Reader

Zu lang; Lesen

Im zweiten Teil dieses Blogs erfahren Sie, wie Databricks uns Batch-Bereitstellung und Online-Serving ermöglicht. Wir beschäftigen uns mit der Einrichtung von Dashboards zur Daten- und Modellüberwachung.
featured image - Bauen wir eine MLOps-Pipeline mit Databricks und Spark – Teil 2
Mohsen Jadidi HackerNoon profile picture
0-item
1-item
2-item

Im ersten Teil dieser Tutorial- Reihe haben wir die ersten Schritte zum Aufbau einer End-to-End-MLOps-Pipeline mit Databricks und Spark unternommen, geleitet von der Referenzarchitektur von Databricks. Hier ist eine Zusammenfassung der wichtigsten Schritte, die wir behandelt haben:


  • Einrichten des Unity-Katalogs für Medallion Architecture : Wir haben unsere Daten im Unity-Katalog in Bronze-, Silber- und Goldebenen organisiert und so ein strukturiertes und effizientes Datenverwaltungssystem eingerichtet.

  • Daten in den Unity-Katalog aufnehmen : Wir haben gezeigt, wie Rohdaten in das System importiert werden, um Konsistenz und Qualität für nachfolgende Verarbeitungsphasen sicherzustellen.

  • Trainieren des Modells : Mithilfe von Databricks haben wir ein auf unseren Datensatz zugeschnittenes maschinelles Lernmodell trainiert und dabei Best Practices für eine skalierbare und effektive Modellentwicklung befolgt.

  • Hyperparameter-Tuning mit HyperOpt : Um die Modellleistung zu verbessern, haben wir HyperOpt eingesetzt, um die Suche nach optimalen Hyperparametern zu automatisieren und so Genauigkeit und Effizienz zu verbessern.

  • Experimentverfolgung mit Databricks MLflow : Wir haben MLflow zum Protokollieren und Überwachen unserer Experimente verwendet und eine umfassende Aufzeichnung der Modellversionen, Metriken und Parameter geführt, um einen einfachen Vergleich und die Reproduzierbarkeit zu ermöglichen.


Wenn diese grundlegenden Schritte abgeschlossen sind, ist Ihr Modell nun bereit für die Bereitstellung. In diesem zweiten Teil konzentrieren wir uns auf die Integration zweier wichtiger Komponenten in unser System:


  1. Batch-Inferenz : Implementierung der Batch-Verarbeitung zur Generierung von Vorhersagen für große Datensätze, geeignet für Anwendungen wie Massenbewertung und regelmäßige Berichterstattung.
  2. Online-Inferenz (Modellbereitstellung) : Einrichten der Modellbereitstellung in Echtzeit, um sofortige Vorhersagen zu ermöglichen, die für interaktive Anwendungen und Dienste unerlässlich sind.
  3. Modellüberwachung: um sicherzustellen, dass Ihre bereitgestellten Modelle im Laufe der Zeit optimale Leistung und Zuverlässigkeit beibehalten.


Lassen Sie uns loslegen!

Modellbereitstellung

Der Ausgangspunkt des letzten Blogs war die Modellbewertung. Stellen Sie sich nun vor, wir haben den Vergleich durchgeführt und festgestellt, dass unser Modell im Vergleich zu diesem Produktionsmodell eine höhere Leistung aufweist. Da wir das Modell (angenommen) in der Produktion verwenden möchten, möchten wir alle Daten nutzen, die wir haben. Der nächste Schritt besteht darin, das Modell mit dem vollständigen Datensatz zu trainieren und zu testen. Anschließend speichern wir unser Modell für die spätere Verwendung, indem wir es als unser Champion-Modell bereitstellen. Da dies das endgültige Modell ist, das wir für die Inferenz verwenden möchten, verwenden wir den Feature Engineering-Client, um das Modell zu trainieren. Auf diese Weise können wir nicht nur die Modellherkunft einfacher verfolgen, sondern auch die Schemavalidierung und Featuretransformation (falls vorhanden) an den Client auslagern.


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


Wir können auch den Feature Store oder die Feature Engineering APIs verwenden, um die Modelle zu trainieren und zu protokollieren

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


Wenn wir die Feature Engineering API verwenden, können wir die Herkunft des Modells im Catalog Explorer anzeigen

Datenherkunft im Dataticks Unity Catalog


Aktualisieren wir nun die Modellbeschreibung und weisen ihr das Label „Champion“ zu.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Überprüfen Sie nun das Schema, in dem Sie das Modell registriert haben. Sie sollten alle Ihre Aktualisierungen wie folgt sehen

Modellregistrierung im Databricks Unity Catalog

Modellphasen : Wenn Sie den Arbeitsbereich für die Modellregistrierung verwenden, sollten Sie Phasen zur Verwaltung Ihrer Modelle verwenden. Die Verwendung von Aliasnamen funktioniert nicht. Schauen Sie hier nach um zu sehen, wie es funktioniert

Modellinferenz

Stapelbewertung

Stellen Sie sich nun vor, wir möchten unser Modell in der Produktion für Inferenzen verwenden. In diesem Schritt laden wir das Champion-Modell und verwenden es, um 20 Filmempfehlungen für jeden Benutzer zu generieren.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


und Sie können sehen, dass wir dieselben Trainingsdaten für die Batch-Bewertung verwendet haben. Obwohl dies bei Empfehlungssystemen sinnvoll ist, möchten wir das Modell in den meisten Anwendungen verwenden, um einige unsichtbare Daten zu bewerten. Stellen Sie sich beispielsweise vor, Sie sind Netflix und möchten die Benutzerempfehlungen am Ende des Tages basierend auf ihrer neuen Beobachtungsliste aktualisieren. Wir können einen Job planen, der die Batch-Bewertung zu einem bestimmten Zeitpunkt am Ende des Tages ausführt.


Jetzt können wir fortfahren und die Empfehlungen für jeden Benutzer generieren. Dazu finden wir die Top 20 Artikel pro Benutzer

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


so sieht das Ergebnis aus

Schließlich können wir die Vorhersage als Delta-Label auf unserer UC speichern oder sie in einem nachgelagerten System, Mongo DB oder Azure Cosmos DB, veröffentlichen. Wir entscheiden uns für die erste Option.


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Streaming/Online-Inferenz

Stellen Sie sich nun einen Fall vor, in dem wir unsere Empfehlungen basierend auf Echtzeit-Benutzerinteraktionen aktualisieren möchten. Für diesen Fall können wir Model Serving verwenden. Wenn jemand Ihr Modell verwenden möchte, kann er Daten an den Server senden. Der Server überträgt diese Daten dann an Ihr bereitgestelltes Modell, das in Aktion tritt, die Daten analysiert und eine Vorhersage generiert. Sie können in Webanwendungen, mobilen Apps oder sogar eingebetteten Systemen verwendet werden. Eine der Anwendungen dieses Ansatzes besteht darin, die Verkehrsführung für A/B-Tests zu ermöglichen.


Der ALS-Algorithmus kann nicht direkt für Online-Inferenzen verwendet werden, da er ein erneutes Training des Modells mit den gesamten Daten (alt + neu) erfordert, um die Empfehlungen zu aktualisieren. Gradient Descent-Lernalgorithmen sind Beispiele für Modelle, die für Online-Updates verwendet werden können. Wir werden uns einige dieser Algorithmen möglicherweise in einem zukünftigen Beitrag ansehen.


Um jedoch zu veranschaulichen, wie ein solches Modell funktionieren würde, erstellen wir einen (nutzlosen) Modell-Serving-Endpunkt, der die Filmbewertung vorhersagt, basierend auf der Bewertung eines Films durch einen Benutzer!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


Dadurch wird ein Cluster für die Bereitstellung des Lunch-Modells für uns erstellt, was einige Zeit in Anspruch nimmt. Wenn Sie jetzt das Fenster Serving öffnen, sollten Sie Ihren Endpunkt sehen.


Wir können einen Endpunkt verwenden, um mehrere Modelle zu bedienen. Dann können wir die Verkehrsführung für Szenarien wie A/B-Tests verwenden oder die Leistung verschiedener Modelle in der Produktion vergleichen.

Inferenztabelle

Inferenztabellen in Databricks Model Serving fungieren als automatisches Protokoll für unsere bereitgestellten Modelle. Wenn sie aktiviert sind, erfassen sie eingehende Anfragen (zur Vorhersage gesendete Daten), die entsprechenden Modellausgaben (Vorhersagen) und einige andere Metadaten als Delta-Tabelle im Unity Catalog. Wir können Inferenztabellen zum Überwachen und Debuggen , zur Herkunftsverfolgung und als Datenerfassungsverfahren zum erneuten Trainieren oder Feinabstimmen unserer Modelle verwenden.


Wir können die inference table auf unserem Serving-Endpunkt aktivieren, um das Modell zu überwachen. Wir können dies tun, indem wir die auto_capture_config -Eigenschaften in der Nutzlast angeben, wenn wir den Endpunkt zum ersten Mal erstellen. Oder wir aktualisieren unseren Endpunkt anschließend mit dem put -Befehl und der config Endpunkt-URL wie folgt (mehr hier )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


Jetzt füttern wir den Endpunkt mit einigen Dummy-Benutzerinteraktionsdaten

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


Wir können die Endpunktprotokolle in der Tabelle <catalog>.<schema>.<payload_table> überprüfen. Es dauert etwa 10 Minuten, bis Sie die Daten in der Tabelle sehen können.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


Sie sollten in etwa diese Nutzlasttabelle sehen

Nutzlasttabelle für das Databricks-Modell


Um das Schema dieser Inferenztabelle zu verstehen, lesen Sie hier „Unity-Katalog-Inferenztabellenschema==“.==


Modellüberwachung

Modell- und Datenüberwachung ist ein komplexes Thema, dessen Beherrschung viel Zeit in Anspruch nimmt. Databricks Lakehouse Monitoring (DLM) reduziert den Aufwand für den Aufbau eines geeigneten Überwachungssystems, indem es standardmäßige und anpassbare Vorlagen für gängige Anwendungsfälle bereitstellt. Die Beherrschung von DLM und Modellüberwachung im Allgemeinen erfordert jedoch viel Experimentieren. Ich möchte Ihnen hier keinen umfassenden Überblick über die Modellüberwachung geben, sondern Ihnen einen Ausgangspunkt bieten. Vielleicht widme ich diesem Thema in Zukunft einen Blog.


Eine kurze Zusammenfassung der DLM-Funktionen und -Features

Jetzt, da unser Modell läuft, können wir die von unserem Serving-Endpunkt generierte Inferenztabelle verwenden, um wichtige Kennzahlen wie Modellleistung und Drift zu überwachen und Abweichungen oder Anomalien in unseren Daten oder unserem Modell im Laufe der Zeit zu erkennen. Dieser proaktive Ansatz hilft uns, rechtzeitig Korrekturmaßnahmen zu ergreifen, z. B. das Modell neu zu trainieren oder seine Funktionen zu aktualisieren, um eine optimale Leistung und Ausrichtung auf die Geschäftsziele aufrechtzuerhalten.


Databricks Lakehouse Monitoring-Datenarchitektur Quelle: Databricks


DLM bietet drei Analyse- bzw. profile type : Zeitreihen , Snapshot und Inferenz . Da wir an der Analyse unserer Inferenztabelle interessiert sind, konzentrieren wir uns auf Letzteres. Um eine Tabelle zur Überwachung zu verwenden – unsere „ Primärtabelle “, sollten wir sicherstellen, dass die Tabelle die richtige Struktur hat. Bei der Inferenztabelle sollte jede Zeile einer Anfrage mit folgenden Spalten entsprechen:

  • Modellmerkmale

  • Modellvorhersage

  • Modell-ID

  • timestamp : Zeitstempel der Inferenzanfrage

  • Grundwahrheit (optional)


Die Modell-ID ist wichtig, wenn wir mehrere Modelle bereitstellen und die Leistung jedes Modells in einem Überwachungs-Dashboard verfolgen möchten. Wenn mehr als eine Modell-ID verfügbar ist, verwendet DLM sie, um die Daten aufzuteilen und Metriken und Statistiken für jedes Segment separat zu berechnen.


DLM berechnet alle Statistiken und Metriken für ein bestimmtes Zeitintervall. Für die Inferenzanalyse werden die Zeitstempelspalte sowie eine benutzerdefinierte Fenstergröße zur Identifizierung der Zeitfenster verwendet. Weitere Informationen finden Sie weiter unten.


DLM unterstützt zwei problem type für Inferenztabellen: „ Klassifizierung “ oder „ Regression “. Es berechnet einige der relevanten Metriken und Statistiken basierend auf dieser Spezifikation.


Um DLM zu verwenden, müssen wir einen Monitor erstellen und ihn an eine Tabelle anhängen. Dabei erstellt DLM zwei metric tables :

  • Profilmetriktabelle : Diese Tabelle enthält zusammenfassende Statistiken wie Minimum, Maximum, Prozentsatz von Nullen und Nullen. Sie enthält außerdem zusätzliche Metriken basierend auf dem vom Benutzer definierten Problemtyp. Zum Beispiel Präzision , Rückruf und f1_score für die Klassifizierungsmodelle und mittlerer quadrierter Fehler und mittlerer durchschnittlicher Fehler für Regressionsmodelle.

  • Driftmetriktabelle : Sie enthält Statistiken, die messen, wie sich die Verteilung der Daten im Laufe der Zeit oder im Verhältnis zu einem Basiswert (sofern angegeben) geändert hat. Sie berechnet Kennzahlen wie den Chi-Quadrat-Test und den KS-Test.


Um die Liste der vollständigen Metriken für jede Tabelle anzuzeigen, sehen Sie auf der Dokumentationsseite der Monitormetriktabelle nach. Es ist auch möglich , benutzerdefinierte Metriken zu erstellen.


Ein wichtiger Aspekt beim Aufbau eines Überwachungssystems besteht darin, sicherzustellen, dass unser Überwachungs-Dashboard Zugriff auf die neuesten Inferenzdaten hat, sobald diese eintreffen. Dazu können wir Delta Table Streaming verwenden, um die verarbeiteten Zeilen in der Inferenztabelle zu verfolgen. Wir verwenden die Inferenztabelle des Modells als unsere Quelltabelle ( readStream ) und die Überwachungstabelle als Senkentabelle ( writeStream ). Wir stellen außerdem sicher, dass Change Data Capture (CDC) für beide Tabellen aktiviert ist (standardmäßig ist es für die Inferenztabelle aktiviert). Auf diese Weise verarbeiten wir nur Änderungen – Einfügen/Aktualisieren/Löschen – in der Quelltabelle, anstatt die gesamte Tabelle bei jeder Aktualisierung erneut zu verarbeiten.

Praxisnah

Um das Monitoring über unsere Inferenztabelle zu ermöglichen, führen wir folgende Schritte aus:

  1. Lesen Sie die Inferenztabelle als Streaming-Tabelle
  2. Erstellen Sie eine neue Delta-Tabelle mit dem richtigen Schema, indem Sie die Inferenztabelle entpacken, die von unserem Modell-Serving-Endpunkt generiert wird.
  3. Bereiten Sie die Basistabelle vor (sofern vorhanden).
  4. Erstellen Sie einen Monitor über die resultierende Tabelle und aktualisieren Sie die Metrik
  5. Planen Sie einen Workflow, um die Inferenztabelle in die richtige Struktur zu entpacken und die Metriken zu aktualisieren


Zuerst müssen wir die Lakehouse Monitoring API installieren. Sie sollte bereits installiert sein, wenn Sie Databricks Rum Time 15.3 LTS und höher verwenden:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Lesen wir die Inferenztabelle als Streaming-Tabelle

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


Als nächstes müssen wir die Tabelle wie oben beschrieben in das richtige Format bringen. Diese Tabelle sollte für jede Vorhersage eine Zeile mit den relevanten Merkmalen und Vorhersagewerten enthalten. Die Inferenztabelle, die wir vom Endpunkt des Modells erhalten, speichert die Endpunktanforderungen und -antworten in einem verschachtelten JSON-Format. Hier ist ein Beispiel für die JSON-Nutzlast für die Anforderungs- und Antwortspalte.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


Um diese Tabelle in das richtige Schema zu entpacken, können wir den folgenden Code verwenden, der aus der Databricks-Dokumentation ( Inferenztabelle, Lakehouse Monitoring-Starter-Notebook ) angepasst ist.


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


Die resultierende Tabelle würde folgendermaßen aussehen:

Payload-Tabelle entpackt

Als nächstes sollten wir unsere Sink-Tabelle initialisieren

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


und schreibe die Ergebnisse

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Schließlich erstellen wir unsere Basistabelle. DLM verwendet diese Tabelle, um die Drifts zu berechnen, indem die Verteilung ähnlicher Spalten von Basis- und Primärmodellen verglichen wird. Die Basistabelle sollte dieselbe Merkmalsspalte wie die Primärspalte sowie dieselbe Modellidentifikationsspalte haben. Für die Basistabelle verwenden wir die Vorhersagetabelle unseres Validierungsdatensatzes , den wir zuvor gespeichert haben, nachdem wir unser Modell mit dem besten Hyperparameter trainiert haben. Um die Driftmetrik zu berechnen, berechnet Databricks die Profilmetriken sowohl für die Primär- als auch für die Basistabelle. Hier können Sie mehr über die Primärtabelle und die Basistabelle lesen.


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Jetzt können wir unser Monitoring-Dashboard erstellen. Dies können wir entweder über die Benutzeroberfläche tun. oder die Lakehouse Monitoring API. Hier verwenden wir die zweite Option:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


Nachdem wir den Code ausgeführt haben, dauert es einige Zeit, bis Databricks alle Metriken berechnet hat. Um das Dashboard anzuzeigen, gehen Sie zur Registerkarte Quality Ihrer Sink-Tabelle (d. h. unpacked_requests_table_name ). Sie sollten eine Seite wie die folgende sehen.

Databricks-Modellüberwachungsansicht


Wenn Sie auf refresh history anzeigen“ klicken, werden Ihre laufenden, ausstehenden und vergangenen Aktualisierungen angezeigt. Klicken Sie auf View Dashboard um Ihr Dashboard zu öffnen.

Dashboard zur Databricks-Modellüberwachung



Wir beginnen also mit der Inferenztabelle ( my_endpoint_payload ), verarbeiten sie, speichern das Ergebnis in my_endpoint_payload_unpacked und übergeben diese Tabelle zusammen mit unserer Basistabelle ( base_table_als ) an unsere Überwachungs-API. Das DLM berechnet die Profilmetriken für jede Tabelle ( my_endpoint_payload_unpacked_profile_metric ) und verwendet sie zur Berechnung der Driftmetriken ( my_endpoint_payload_unpacked_drift_metrics ).


Fertig! Sie haben alles, was Sie zum Bedienen und Überwachen Ihres Modells benötigen!


Im nächsten Teil zeige ich Ihnen, wie Sie diesen Prozess mit Databricks Assets Bundle und Gitlab automatisieren können!

Im ersten Teil dieser Tutorial- Reihe haben wir die ersten Schritte zum Aufbau einer End-to-End-MLOps-Pipeline mit Databricks und Spark unternommen, geleitet von der Referenzarchitektur von Databricks. Hier ist eine Zusammenfassung der wichtigsten Schritte, die wir behandelt haben:


  • Einrichten des Unity-Katalogs für Medallion Architecture : Wir haben unsere Daten im Unity-Katalog in Bronze-, Silber- und Goldebenen organisiert und so ein strukturiertes und effizientes Datenverwaltungssystem eingerichtet.

  • Daten in den Unity-Katalog aufnehmen : Wir haben gezeigt, wie Rohdaten in das System importiert werden, um Konsistenz und Qualität für nachfolgende Verarbeitungsphasen sicherzustellen.

  • Trainieren des Modells : Mithilfe von Databricks haben wir ein auf unseren Datensatz zugeschnittenes maschinelles Lernmodell trainiert und dabei Best Practices für eine skalierbare und effektive Modellentwicklung befolgt.

  • Hyperparameter-Tuning mit HyperOpt : Um die Modellleistung zu verbessern, haben wir HyperOpt eingesetzt, um die Suche nach optimalen Hyperparametern zu automatisieren und so Genauigkeit und Effizienz zu verbessern.

  • Experimentverfolgung mit Databricks MLflow : Wir haben MLflow zum Protokollieren und Überwachen unserer Experimente verwendet und eine umfassende Aufzeichnung der Modellversionen, Metriken und Parameter geführt, um einen einfachen Vergleich und die Reproduzierbarkeit zu ermöglichen.


Wenn diese grundlegenden Schritte abgeschlossen sind, ist Ihr Modell nun bereit für die Bereitstellung. In diesem zweiten Teil konzentrieren wir uns auf die Integration zweier wichtiger Komponenten in unser System:


  1. Batch-Inferenz : Implementierung der Batch-Verarbeitung zur Generierung von Vorhersagen für große Datensätze, geeignet für Anwendungen wie Massenbewertung und regelmäßige Berichterstattung.
  2. Online-Inferenz (Modellbereitstellung) : Einrichten der Modellbereitstellung in Echtzeit, um sofortige Vorhersagen zu ermöglichen, die für interaktive Anwendungen und Dienste unerlässlich sind.
  3. Modellüberwachung: um sicherzustellen, dass Ihre bereitgestellten Modelle im Laufe der Zeit optimale Leistung und Zuverlässigkeit beibehalten.


Lassen Sie uns loslegen!

Modellbereitstellung

Der Ausgangspunkt des letzten Blogs war die Modellbewertung. Stellen Sie sich nun vor, wir haben den Vergleich durchgeführt und festgestellt, dass unser Modell im Vergleich zu diesem Produktionsmodell eine höhere Leistung aufweist. Da wir das Modell (angenommen) in der Produktion verwenden möchten, möchten wir alle Daten nutzen, die wir haben. Der nächste Schritt besteht darin, das Modell mit dem vollständigen Datensatz zu trainieren und zu testen. Anschließend speichern wir unser Modell für die spätere Verwendung, indem wir es als unser Champion-Modell bereitstellen. Da dies das endgültige Modell ist, das wir für die Inferenz verwenden möchten, verwenden wir den Feature Engineering-Client, um das Modell zu trainieren. Auf diese Weise können wir nicht nur die Modellherkunft einfacher verfolgen, sondern auch die Schemavalidierung und Featuretransformation (falls vorhanden) an den Client auslagern.


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


Wir können auch den Feature Store oder die Feature Engineering APIs verwenden, um die Modelle zu trainieren und zu protokollieren

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


Wenn wir die Feature Engineering API verwenden, können wir die Herkunft des Modells im Catalog Explorer anzeigen

Datenherkunft im Dataticks Unity Catalog


Aktualisieren wir nun die Modellbeschreibung und weisen ihr das Label „Champion“ zu.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Überprüfen Sie nun das Schema, in dem Sie das Modell registriert haben. Sie sollten alle Ihre Aktualisierungen wie folgt sehen

Modellregistrierung im Databricks Unity Catalog

Modellphasen : Wenn Sie den Arbeitsbereich für die Modellregistrierung verwenden, sollten Sie Phasen zur Verwaltung Ihrer Modelle verwenden. Die Verwendung von Aliasnamen funktioniert nicht. Schauen Sie hier nach um zu sehen, wie es funktioniert

Modellinferenz

Stapelbewertung

Stellen Sie sich nun vor, wir möchten unser Modell in der Produktion für Inferenzen verwenden. In diesem Schritt laden wir das Champion-Modell und verwenden es, um 20 Filmempfehlungen für jeden Benutzer zu generieren.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


und Sie können sehen, dass wir dieselben Trainingsdaten für die Batch-Bewertung verwendet haben. Obwohl dies bei Empfehlungssystemen sinnvoll ist, möchten wir das Modell in den meisten Anwendungen verwenden, um einige unsichtbare Daten zu bewerten. Stellen Sie sich beispielsweise vor, Sie sind Netflix und möchten die Benutzerempfehlungen am Ende des Tages basierend auf ihrer neuen Beobachtungsliste aktualisieren. Wir können einen Job planen, der die Batch-Bewertung zu einem bestimmten Zeitpunkt am Ende des Tages ausführt.


Jetzt können wir fortfahren und die Empfehlungen für jeden Benutzer generieren. Dazu finden wir die Top 20 Artikel pro Benutzer

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


so sieht das Ergebnis aus

Schließlich können wir die Vorhersage als Delta-Label auf unserer UC speichern oder sie in einem nachgelagerten System, Mongo DB oder Azure Cosmos DB, veröffentlichen. Wir entscheiden uns für die erste Option.


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Streaming/Online-Inferenz

Stellen Sie sich nun einen Fall vor, in dem wir unsere Empfehlungen basierend auf Echtzeit-Benutzerinteraktionen aktualisieren möchten. Für diesen Fall können wir Model Serving verwenden. Wenn jemand Ihr Modell verwenden möchte, kann er Daten an den Server senden. Der Server überträgt diese Daten dann an Ihr bereitgestelltes Modell, das in Aktion tritt, die Daten analysiert und eine Vorhersage generiert. Sie können in Webanwendungen, mobilen Apps oder sogar eingebetteten Systemen verwendet werden. Eine der Anwendungen dieses Ansatzes besteht darin, die Verkehrsführung für A/B-Tests zu ermöglichen.


Der ALS-Algorithmus kann nicht direkt für Online-Inferenzen verwendet werden, da er ein erneutes Training des Modells mit den gesamten Daten (alt + neu) erfordert, um die Empfehlungen zu aktualisieren. Gradient Descent-Lernalgorithmen sind Beispiele für Modelle, die für Online-Updates verwendet werden können. Wir werden uns einige dieser Algorithmen möglicherweise in einem zukünftigen Beitrag ansehen.


Um jedoch zu veranschaulichen, wie ein solches Modell funktionieren würde, erstellen wir einen (nutzlosen) Modell-Serving-Endpunkt, der die Filmbewertung vorhersagt, basierend auf der Bewertung eines Films durch einen Benutzer!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


Dadurch wird ein Cluster für die Bereitstellung des Lunch-Modells für uns erstellt, was einige Zeit in Anspruch nimmt. Wenn Sie jetzt das Fenster Serving öffnen, sollten Sie Ihren Endpunkt sehen.


Wir können einen Endpunkt verwenden, um mehrere Modelle zu bedienen. Dann können wir die Verkehrsführung für Szenarien wie A/B-Tests verwenden oder die Leistung verschiedener Modelle in der Produktion vergleichen.

Inferenztabelle

Inferenztabellen in Databricks Model Serving fungieren als automatisches Protokoll für unsere bereitgestellten Modelle. Wenn sie aktiviert sind, erfassen sie eingehende Anfragen (zur Vorhersage gesendete Daten), die entsprechenden Modellausgaben (Vorhersagen) und einige andere Metadaten als Delta-Tabelle im Unity Catalog. Wir können Inferenztabellen zum Überwachen und Debuggen , zur Herkunftsverfolgung und als Datenerfassungsverfahren zum erneuten Trainieren oder Feinabstimmen unserer Modelle verwenden.


Wir können die inference table auf unserem Serving-Endpunkt aktivieren, um das Modell zu überwachen. Wir können dies tun, indem wir die auto_capture_config -Eigenschaften in der Nutzlast angeben, wenn wir den Endpunkt zum ersten Mal erstellen. Oder wir aktualisieren unseren Endpunkt anschließend mit dem put -Befehl und der config Endpunkt-URL wie folgt (mehr hier )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


Jetzt füttern wir den Endpunkt mit einigen Dummy-Benutzerinteraktionsdaten

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


Wir können die Endpunktprotokolle in der Tabelle <catalog>.<schema>.<payload_table> überprüfen. Es dauert etwa 10 Minuten, bis Sie die Daten in der Tabelle sehen können.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


Sie sollten in etwa diese Nutzlasttabelle sehen

Nutzlasttabelle für das Databricks-Modell


Um das Schema dieser Inferenztabelle zu verstehen, lesen Sie hier „Unity-Katalog-Inferenztabellenschema==“.==


Modellüberwachung

Modell- und Datenüberwachung ist ein komplexes Thema, dessen Beherrschung viel Zeit in Anspruch nimmt. Databricks Lakehouse Monitoring (DLM) reduziert den Aufwand für den Aufbau eines geeigneten Überwachungssystems, indem es standardmäßige und anpassbare Vorlagen für gängige Anwendungsfälle bereitstellt. Die Beherrschung von DLM und Modellüberwachung im Allgemeinen erfordert jedoch viel Experimentieren. Ich möchte Ihnen hier keinen umfassenden Überblick über die Modellüberwachung geben, sondern Ihnen einen Ausgangspunkt bieten. Vielleicht widme ich diesem Thema in Zukunft einen Blog.


Eine kurze Zusammenfassung der DLM-Funktionen und -Features

Jetzt, da unser Modell läuft, können wir die von unserem Serving-Endpunkt generierte Inferenztabelle verwenden, um wichtige Kennzahlen wie Modellleistung und Drift zu überwachen und Abweichungen oder Anomalien in unseren Daten oder unserem Modell im Laufe der Zeit zu erkennen. Dieser proaktive Ansatz hilft uns, rechtzeitig Korrekturmaßnahmen zu ergreifen, z. B. das Modell neu zu trainieren oder seine Funktionen zu aktualisieren, um eine optimale Leistung und Ausrichtung auf die Geschäftsziele aufrechtzuerhalten.


Databricks Lakehouse Monitoring-Datenarchitektur Quelle: Databricks


DLM bietet drei Analyse- bzw. profile type : Zeitreihen , Snapshot und Inferenz . Da wir an der Analyse unserer Inferenztabelle interessiert sind, konzentrieren wir uns auf Letzteres. Um eine Tabelle zur Überwachung zu verwenden – unsere „ Primärtabelle “, sollten wir sicherstellen, dass die Tabelle die richtige Struktur hat. Bei der Inferenztabelle sollte jede Zeile einer Anfrage mit folgenden Spalten entsprechen:

  • Modellmerkmale

  • Modellvorhersage

  • Modell-ID

  • timestamp : Zeitstempel der Inferenzanfrage

  • Grundwahrheit (optional)


Die Modell-ID ist wichtig, wenn wir mehrere Modelle bereitstellen und die Leistung jedes Modells in einem Überwachungs-Dashboard verfolgen möchten. Wenn mehr als eine Modell-ID verfügbar ist, verwendet DLM sie, um die Daten aufzuteilen und Metriken und Statistiken für jedes Segment separat zu berechnen.


DLM berechnet alle Statistiken und Metriken für ein bestimmtes Zeitintervall. Für die Inferenzanalyse werden die Zeitstempelspalte sowie eine benutzerdefinierte Fenstergröße zur Identifizierung der Zeitfenster verwendet. Weitere Informationen finden Sie weiter unten.


DLM unterstützt zwei problem type für Inferenztabellen: „ Klassifizierung “ oder „ Regression “. Es berechnet einige der relevanten Metriken und Statistiken basierend auf dieser Spezifikation.


Um DLM zu verwenden, müssen wir einen Monitor erstellen und ihn an eine Tabelle anhängen. Dabei erstellt DLM zwei metric tables :

  • Profilmetriktabelle : Diese Tabelle enthält zusammenfassende Statistiken wie Minimum, Maximum, Prozentsatz von Nullen und Nullen. Sie enthält außerdem zusätzliche Metriken basierend auf dem vom Benutzer definierten Problemtyp. Zum Beispiel Präzision , Rückruf und f1_score für die Klassifizierungsmodelle und mittlerer quadrierter Fehler und mittlerer durchschnittlicher Fehler für Regressionsmodelle.

  • Driftmetriktabelle : Sie enthält Statistiken, die messen, wie sich die Verteilung der Daten im Laufe der Zeit oder im Verhältnis zu einem Basiswert (sofern angegeben) geändert hat. Sie berechnet Kennzahlen wie den Chi-Quadrat-Test und den KS-Test.


Um die Liste der vollständigen Metriken für jede Tabelle anzuzeigen, sehen Sie auf der Dokumentationsseite der Monitormetriktabelle nach. Es ist auch möglich , benutzerdefinierte Metriken zu erstellen.


Ein wichtiger Aspekt beim Aufbau eines Überwachungssystems besteht darin, sicherzustellen, dass unser Überwachungs-Dashboard Zugriff auf die neuesten Inferenzdaten hat, sobald diese eintreffen. Dazu können wir Delta Table Streaming verwenden, um die verarbeiteten Zeilen in der Inferenztabelle zu verfolgen. Wir verwenden die Inferenztabelle des Modells als unsere Quelltabelle ( readStream ) und die Überwachungstabelle als Senkentabelle ( writeStream ). Wir stellen außerdem sicher, dass Change Data Capture (CDC) für beide Tabellen aktiviert ist (standardmäßig ist es für die Inferenztabelle aktiviert). Auf diese Weise verarbeiten wir nur Änderungen – Einfügen/Aktualisieren/Löschen – in der Quelltabelle, anstatt die gesamte Tabelle bei jeder Aktualisierung erneut zu verarbeiten.

Praxisnah

Um das Monitoring über unsere Inferenztabelle zu ermöglichen, führen wir folgende Schritte aus:

  1. Lesen Sie die Inferenztabelle als Streaming-Tabelle
  2. Erstellen Sie eine neue Delta-Tabelle mit dem richtigen Schema, indem Sie die Inferenztabelle entpacken, die von unserem Modell-Serving-Endpunkt generiert wird.
  3. Bereiten Sie die Basistabelle vor (sofern vorhanden).
  4. Erstellen Sie einen Monitor über die resultierende Tabelle und aktualisieren Sie die Metrik
  5. Planen Sie einen Workflow, um die Inferenztabelle in die richtige Struktur zu entpacken und die Metriken zu aktualisieren


Zuerst müssen wir die Lakehouse Monitoring API installieren. Sie sollte bereits installiert sein, wenn Sie Databricks Rum Time 15.3 LTS und höher verwenden:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Lesen wir die Inferenztabelle als Streaming-Tabelle

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


Als nächstes müssen wir die Tabelle wie oben beschrieben in das richtige Format bringen. Diese Tabelle sollte für jede Vorhersage eine Zeile mit den relevanten Merkmalen und Vorhersagewerten enthalten. Die Inferenztabelle, die wir vom Endpunkt des Modells erhalten, speichert die Endpunktanforderungen und -antworten in einem verschachtelten JSON-Format. Hier ist ein Beispiel für die JSON-Nutzlast für die Anforderungs- und Antwortspalte.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


Um diese Tabelle in das richtige Schema zu entpacken, können wir den folgenden Code verwenden, der aus der Databricks-Dokumentation ( Inferenztabelle, Lakehouse Monitoring-Starter-Notebook ) angepasst ist.


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


Die resultierende Tabelle würde folgendermaßen aussehen:

Payload-Tabelle entpackt

Als nächstes sollten wir unsere Sink-Tabelle initialisieren

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


und schreibe die Ergebnisse

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Schließlich erstellen wir unsere Basistabelle. DLM verwendet diese Tabelle, um die Drifts zu berechnen, indem die Verteilung ähnlicher Spalten von Basis- und Primärmodellen verglichen wird. Die Basistabelle sollte dieselbe Merkmalsspalte wie die Primärspalte sowie dieselbe Modellidentifikationsspalte haben. Für die Basistabelle verwenden wir die Vorhersagetabelle unseres Validierungsdatensatzes , den wir zuvor gespeichert haben, nachdem wir unser Modell mit dem besten Hyperparameter trainiert haben. Um die Driftmetrik zu berechnen, berechnet Databricks die Profilmetriken sowohl für die Primär- als auch für die Basistabelle. Hier können Sie mehr über die Primärtabelle und die Basistabelle lesen.


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Jetzt können wir unser Monitoring-Dashboard erstellen. Dies können wir entweder über die Benutzeroberfläche tun. oder die Lakehouse Monitoring API. Hier verwenden wir die zweite Option:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


Nachdem wir den Code ausgeführt haben, dauert es einige Zeit, bis Databricks alle Metriken berechnet hat. Um das Dashboard anzuzeigen, gehen Sie zur Registerkarte Quality Ihrer Sink-Tabelle (d. h. unpacked_requests_table_name ). Sie sollten eine Seite wie die folgende sehen.

Databricks-Modellüberwachungsansicht


Wenn Sie auf refresh history anzeigen“ klicken, werden Ihre laufenden, ausstehenden und vergangenen Aktualisierungen angezeigt. Klicken Sie auf View Dashboard um Ihr Dashboard zu öffnen.

Dashboard zur Databricks-Modellüberwachung



Wir beginnen also mit der Inferenztabelle ( my_endpoint_payload ), verarbeiten sie, speichern das Ergebnis in my_endpoint_payload_unpacked und übergeben diese Tabelle zusammen mit unserer Basistabelle ( base_table_als ) an unsere Überwachungs-API. Das DLM berechnet die Profilmetriken für jede Tabelle ( my_endpoint_payload_unpacked_profile_metric ) und verwendet sie zur Berechnung der Driftmetriken ( my_endpoint_payload_unpacked_drift_metrics ).


Fertig! Sie haben alles, was Sie zum Bedienen und Überwachen Ihres Modells benötigen!


Im nächsten Teil zeige ich Ihnen, wie Sie diesen Prozess mit Databricks Assets Bundle und Gitlab automatisieren können!