paint-brush
Laten we een MLOps-pijplijn bouwen met Databricks en Spark - Deel 2door@neshom
Nieuwe geschiedenis

Laten we een MLOps-pijplijn bouwen met Databricks en Spark - Deel 2

door Mohsen Jadidi42m2024/12/29
Read on Terminal Reader

Te lang; Lezen

In het tweede deel van deze blog zien we hoe Databricks ons in staat stelt om batch-implementatie en online-serving uit te voeren. We besteden wat tijd aan het opzetten van dashboards voor data- en modelmonitoring.
featured image - Laten we een MLOps-pijplijn bouwen met Databricks en Spark - Deel 2
Mohsen Jadidi HackerNoon profile picture
0-item
1-item
2-item

In het eerste deel van deze tutorialserie hebben we de eerste stappen gezet voor het bouwen van een end-to-end MLOps-pipeline met behulp van Databricks en Spark, geleid door de referentiearchitectuur van Databricks. Hier is een samenvatting van de belangrijkste stappen die we hebben behandeld:


  • Het opzetten van de Unity Catalog voor Medallion Architecture : We hebben onze gegevens georganiseerd in bronzen, zilveren en gouden lagen binnen de Unity Catalog, waarmee we een gestructureerd en efficiënt gegevensbeheersysteem hebben opgezet.

  • Gegevens opnemen in Unity Catalog : We hebben laten zien hoe u ruwe gegevens in het systeem kunt importeren, zodat u consistentie en kwaliteit kunt garanderen voor de daaropvolgende verwerkingsfasen.

  • Het model trainen : met behulp van Databricks hebben we een machine learning-model getraind dat is afgestemd op onze dataset. Hierbij hebben we de best practices voor schaalbare en effectieve modelontwikkeling gevolgd.

  • Hyperparameterafstemming met HyperOpt : om de modelprestaties te verbeteren, hebben we HyperOpt ingezet om de zoektocht naar optimale hyperparameters te automatiseren, waardoor de nauwkeurigheid en efficiëntie werden verbeterd.

  • Experimenten bijhouden met Databricks MLflow : We hebben MLflow gebruikt om onze experimenten te loggen en te monitoren. Zo houden we een uitgebreid overzicht bij van modelversies, statistieken en parameters, zodat we deze eenvoudig kunnen vergelijken en reproduceren.


Met deze fundamentele stappen voltooid, is uw model nu klaar voor implementatie. In dit tweede deel richten we ons op het integreren van twee kritieke componenten in ons systeem:


  1. Batch-inferentie : implementatie van batchverwerking om voorspellingen te genereren op basis van grote datasets, geschikt voor toepassingen zoals bulk-scoring en periodieke rapportage.
  2. Online inferentie (modelserveren) : het opzetten van realtime modelserveren om onmiddellijke voorspellingen te doen, essentieel voor interactieve toepassingen en services.
  3. Modelbewaking: om ervoor te zorgen dat uw geïmplementeerde modellen in de loop van de tijd optimale prestaties en betrouwbaarheid behouden.


Laten we beginnen!

Modelimplementatie

Het uitgangspunt van de vorige blog was modelevaluatie. Stel je nu voor dat we de vergelijking hebben gedaan en hebben ontdekt dat ons model een hogere prestatie laat zien in vergelijking met dit productiemodel. Omdat we het model in productie willen (aannemen) gebruiken, willen we profiteren van alle gegevens die we hebben. De volgende stap is om het model te trainen en te testen met behulp van de volledige dataset. Bewaar ons model vervolgens voor later gebruik door het te implementeren als ons kampioensmodel. Omdat dit het laatste model is dat we willen gebruiken voor inferentie, gebruiken we de Feature Engineering-client om het model te trainen. Op deze manier kunnen we niet alleen de modellijn gemakkelijker volgen, maar ook de schemavalidatie en featuretransformatie (indien van toepassing) uitbesteden aan de client.


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


we kunnen ook de Feature Store of Feature Engineering API's gebruiken om de modellen te trainen en te loggen

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


Wanneer we de feature engineering API gebruiken, kunnen we de afstamming van het model bekijken in Catalog Explorer

gegevenslijn in Dataticks Unity Catalog


Laten we nu de modelbeschrijving bijwerken en er een Champion-label aan toewijzen.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Ga nu verder en controleer het schema waarmee u het model hebt geregistreerd. U zou al uw updates als volgt moeten zien

Modelregister in Databricks Unity Catalog

Model stages : Als u workspace gebruikt voor modelregistratie, moet u stages gebruiken om uw modellen te beheren. Het gebruik van aliassen werkt niet. Bekijk hier om te zien hoe het werkt

Modelinferentie

Batch-scoren

Stel je nu voor dat we ons model in productie willen gebruiken voor inferentie. In deze stap laden we het championmodel en gebruiken het om 20 filmaanbevelingen voor elke gebruiker te genereren.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


en u kunt zien dat we dezelfde trainingsgegevens hebben gebruikt voor batch-scoring. Hoewel het in het geval van aanbevelingssystemen logisch is, willen we in de meeste toepassingen het model gebruiken om wat ongeziene gegevens te scoren. Stel u bijvoorbeeld voor dat u Netflix bent en de aanbevelingen van de gebruiker aan het einde van de dag wilt bijwerken op basis van hun nieuwe kijklijst. We kunnen een taak plannen die de batch-scoring op een specifiek tijdstip aan het einde van de dag uitvoert.


Nu kunnen we doorgaan en de aanbevelingen voor elke gebruiker genereren. Hiervoor vinden we de top 20 items per gebruiker

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


zo ziet het resultaat eruit

Ten slotte kunnen we de voorspelling opslaan als een deltalabel op onze UC of ze publiceren naar een downstream-systeem Mongo DB of Azure Cosmos DB. We gaan met de eerste optie


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Streaming/online-inferentie

Stel je nu eens een geval voor waarin we onze aanbevelingen willen updaten op basis van realtime gebruikersinteracties. Voor dit geval kunnen we model serving gebruiken. Wanneer iemand jouw model wil gebruiken, kan hij/zij data naar de server sturen. De server stuurt die data vervolgens naar jouw geïmplementeerde model, dat in actie komt, de data analyseert en een voorspelling genereert. Ze kunnen worden gebruikt in webapplicaties, mobiele apps of zelfs embedded systemen. Een van de toepassingen van deze aanpak is om traffic routing mogelijk te maken voor A/B-testen.


Het ALS-algoritme kan niet direct worden gebruikt voor online inferentie, omdat het vereist dat het model opnieuw wordt getraind met behulp van de volledige data (oud + nieuw) om de aanbevelingen bij te werken. Gradient Descent-leeralgoritmen zijn voorbeelden van modellen die kunnen worden gebruikt voor online updates. We kunnen in toekomstige berichten naar enkele van deze algoritmen kijken.


Om te illustreren hoe zo'n model zou werken, maken we een (nutteloos) model dat als eindpunt fungeert en filmbeoordelingen voorspelt op basis van de beoordeling door een gebruiker!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


Dit zal een lunchmodelservingcluster voor ons creëren, dus het kost wat tijd. Als u nu het Serving -venster opent, zou u uw eindpunt moeten zien.


we kunnen één eindpunt gebruiken om meerdere modellen te bedienen. Vervolgens kunnen we verkeersroutering gebruiken voor scenario's zoals A/B-testen of de prestaties van verschillende modellen in de productie vergelijken.

Inferentietabel

Inferentietabellen in Databricks Model Serving fungeren als een automatisch logboek voor onze geïmplementeerde modellen. Wanneer ingeschakeld, leggen ze inkomende verzoeken (gegevens verzonden voor voorspelling), de bijbehorende modeluitvoer (voorspellingen) en enkele andere metagegevens vast als een Delta-tabel binnen Unity Catalog. We kunnen de inferentietabel gebruiken voor monitoring en debugging , lineage tracking en een gegevensverzamelingsprocedure voor het opnieuw trainen of verfijnen van onze modellen.


We kunnen de inference table op ons serving endpoint inschakelen om het model te monitoren. We kunnen dit doen door de auto_capture_config eigenschappen in de payload te specificeren wanneer we het endpoint voor het eerst aanmaken. Of we updaten ons endpoint daarna met de put opdracht en de config endpoint URL als volgt (meer hier )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


Laten we nu het eindpunt voeden met wat dummy-gebruikersinteractiegegevens

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


We kunnen de endpoint logs controleren in de <catalog>.<schema>.<payload_table> tabel. Het duurt ongeveer 10 minuten voordat u de data in de tabel kunt zien.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


je zou zoiets moeten zien als je payloadtabel

Databricks-model dat payloadtabel serveert


Om het schema van deze inferentietabel te begrijpen, kijk hier naar “Unity catalogus inferentietabel schema==” .==


Modelbewaking

Model- en datamonitoring zijn een complex onderwerp dat veel tijd kost om onder de knie te krijgen. Databricks Lakehouse Monitoring (DLM) vermindert de overhead van het bouwen van een goed monitoringsysteem door standaard en aanpasbare sjablonen te bieden voor veelvoorkomende use cases. Het onder de knie krijgen van DLM en modelmonitoring in het algemeen vereist echter veel experimenten. Ik wil u hier geen uitgebreid overzicht van modelmonitoring geven, maar u eerder een startpunt geven. Ik wijd misschien in de toekomst een blog aan dit onderwerp.


Een korte samenvatting van DLM-functionaliteiten en -functies

Nu ons model up and running is, kunnen we de inferentietabel die door ons serving endpoint is gegenereerd gebruiken om belangrijke statistieken te monitoren, zoals modelprestaties en drift, om afwijkingen of anomalieën in onze data of model in de loop van de tijd te detecteren. Deze proactieve aanpak helpt ons om tijdig corrigerende maatregelen te nemen, zoals het opnieuw trainen van het model of het updaten van de functies, om optimale prestaties en afstemming op bedrijfsdoelstellingen te behouden.


Databricks Lakehouse Monitoring Data Architectuurbron: Databricks


DLM biedt drie typen analyse of profile type : Time Series , Snapshot en Inference . Omdat we geïnteresseerd zijn in het analyseren van onze inferencetabel, richten we ons op de laatste. Om een tabel te gebruiken voor monitoring - onze " primaire tabel ", moeten we ervoor zorgen dat de tabel de juiste structuur heeft. Voor de inferencetabel moet elke rij overeenkomen met een verzoek met de volgende kolommen:

  • modelkenmerken

  • modelvoorspelling

  • model-id

  • timestamp : tijdstempel van het inferentieverzoek

  • grondwaarheid (optioneel)


De model-id is belangrijk voor gevallen waarin we meerdere modellen bedienen en we de prestaties van elk model in één monitoringdashboard willen volgen. Als er meer dan één model-id beschikbaar is, gebruikt DLM deze om de gegevens te slicen en metrische gegevens en statistische gegevens voor elke slice afzonderlijk te berekenen.


DLM berekent elke statistiek en metriek voor een bepaald tijdsinterval. Voor inferentieanalyse gebruikte het de timestampkolom , plus een door de gebruiker gedefinieerde venstergrootte om de tijdsvensters te identificeren. meer hieronder.


DLM ondersteunt twee problem type voor inferentietabellen: " classificatie " of " regressie ". Het berekent enkele van de relevante statistieken en statistieken op basis van deze specificatie.


Om DLM te gebruiken, moeten we een monitor maken en deze aan een tabel koppelen. Wanneer we dit doen, maakt DLM twee metric tables :

  • profielmetriektabel : deze tabel bevat samenvattende statistieken zoals min, max, percentage nullen en nullen. Het bevat ook aanvullende metrieken op basis van het door de gebruiker gedefinieerde probleemtype. Bijvoorbeeld precisie , recall en f1_score voor de classificatiemodellen, en mean_squared_error en mean_average_error voor regressiemodellen.

  • drift metrische tabel : bevat statistieken die meten hoe de distributie van data is veranderd in de loop van de tijd of ten opzichte van een basislijnwaarde (indien verstrekt) . Het berekent metingen zoals Chi-kwadraattest, KS-test.


om de lijst met complete metrics voor elke tabel te bekijken, controleer Monitor metric table documentation page. Het is ook mogelijk om aangepaste metrics te maken.


Een belangrijk aspect van het bouwen van een monitoringsysteem is om ervoor te zorgen dat ons monitoringdashboard toegang heeft tot de nieuwste inferentiegegevens zodra ze binnenkomen. Hiervoor kunnen we Delta-tabelstreaming gebruiken om verwerkte rijen in de inferentietabel bij te houden. We gebruiken de inferentietabel van de modelserver als onze brontabel ( readStream ) en de monitoringtabel als de sinktabel ( writeStream ). We zorgen er ook voor dat de Change Data Capture (CDC) is ingeschakeld op beide tabellen (het is standaard ingeschakeld op de inferentietabel). Op deze manier verwerken we alleen wijzigingen - invoegen/bijwerken/verwijderen - in de brontabel in plaats van de hele tabel opnieuw te verwerken bij elke vernieuwing.

Praktisch

Om de monitoring van onze inferentietabel mogelijk te maken, nemen we de volgende stappen:

  1. Lees de inferentietabel als een streamingtabel
  2. Maak een nieuwe deltatabel met het juiste schema door de inferentietabel uit te pakken die is gegenereerd door ons modelserverend eindpunt.
  3. Maak de basislijntabel (indien van toepassing) klaar
  4. Maak een monitor over de resulterende tabel en vernieuw de metriek
  5. Plan een workflow om de inferentietabel uit te pakken naar de juiste structuur en de statistieken te vernieuwen


Eerst moeten we de Lakehouse Monitoring API installeren. Deze zou al geïnstalleerd moeten zijn als u Databricks rum time 15.3 LTS en hoger gebruikt:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Laten we de inferentietabel lezen als een streamingtabel

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


Vervolgens moeten we de tabel in het juiste formaat zetten zoals hierboven beschreven. Deze tabel moet één rij hebben voor elke voorspelling met relevante kenmerken en voorspellingswaarde. De inferentietabel die we krijgen van het model dat het eindpunt bedient, slaat de eindpuntverzoeken en -reacties op als een geneste JSON-indeling. Hier is een voorbeeld van de JSON-payload voor de aanvraag- en reactiekolom.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


Om deze tabel uit te pakken naar het juiste schema kunnen we de volgende code gebruiken die is aangepast uit de Databricks-documentatie ( Inference table Lakehouse Monitoring starter notebook ).


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


De resulterende tabel ziet er als volgt uit:

Payload tabel uitgepakt

Vervolgens moeten we onze sink-tabel initialiseren

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


en schrijf de resultaten

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Ten slotte maken we onze baselinetabel. DLM gebruikt deze tabel om de drifts te berekenen door de distributie van vergelijkbare kolommen van baseline- en primaire modellen te vergelijken. De baselinetabel moet dezelfde featurekolom hebben als de primaire kolom en dezelfde modelidentificatiekolom. Voor de baselinetabel gebruiken we de voorspellingstabel van onze validatiedataset die we eerder hebben opgeslagen nadat we ons model hebben getraind met de beste hyperparameter. Om de driftmetriek te berekenen, berekent Databricks de profielmetrieken voor zowel de primaire als de baselinetabel. Hier kunt u meer lezen over de primaire tabel en baselinetabel .


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Nu zijn we klaar om ons monitoring dashboard te creëren. We kunnen dit doen met behulp van de UI of de Lakehouse Monitoring API. Hier gebruiken we de tweede optie:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


nadat we de code hebben uitgevoerd duurt het even voordat Databricks alle metriek heeft berekend. Om het dashboard te zien ga je naar het tabblad Quality van je sinktabel (d.w.z. unpacked_requests_table_name ). Je zou een pagina als volgt moeten zien.

Databricks Model Monitoring-weergave


Als u op de refresh history klikt, ziet u de lopende, in behandeling zijnde en eerdere vernieuwingen. Klik op het View Dashboard om uw dashboard te openen.

Databricks Model Monitoring Dashboard



dus we beginnen met de inferentietabel ( my_endpoint_payload ), verwerken deze en slaan het resultaat op in my_endpoint_payload_unpacked en geven deze tabel samen met onze basislijntabel ( base_table_als ) door aan onze monitoring API. De DLM berekent de profielmetrieken voor elke tabel ( my_endpoint_payload_unpacked_profile_metric ) en gebruikt deze om de driftmetrieken te berekenen ( my_endpoint_payload_unpacked_drift_metrics )


Daar heb je het! Je hebt alles wat je nodig hebt om je model te bedienen en te monitoren!


In het volgende deel laat ik je zien hoe je dit proces kunt automatiseren met behulp van Databricks Assets Bundle en Gitlab !

In het eerste deel van deze tutorialserie hebben we de eerste stappen gezet voor het bouwen van een end-to-end MLOps-pipeline met behulp van Databricks en Spark, geleid door de referentiearchitectuur van Databricks. Hier is een samenvatting van de belangrijkste stappen die we hebben behandeld:


  • Het opzetten van de Unity Catalog voor Medallion Architecture : We hebben onze gegevens georganiseerd in bronzen, zilveren en gouden lagen binnen de Unity Catalog, waarmee we een gestructureerd en efficiënt gegevensbeheersysteem hebben opgezet.

  • Gegevens opnemen in Unity Catalog : We hebben laten zien hoe u ruwe gegevens in het systeem kunt importeren, zodat u consistentie en kwaliteit kunt garanderen voor de daaropvolgende verwerkingsfasen.

  • Het model trainen : met behulp van Databricks hebben we een machine learning-model getraind dat is afgestemd op onze dataset. Hierbij hebben we de best practices voor schaalbare en effectieve modelontwikkeling gevolgd.

  • Hyperparameterafstemming met HyperOpt : om de modelprestaties te verbeteren, hebben we HyperOpt ingezet om de zoektocht naar optimale hyperparameters te automatiseren, waardoor de nauwkeurigheid en efficiëntie werden verbeterd.

  • Experimenten bijhouden met Databricks MLflow : We hebben MLflow gebruikt om onze experimenten te loggen en te monitoren. Zo houden we een uitgebreid overzicht bij van modelversies, statistieken en parameters, zodat we deze eenvoudig kunnen vergelijken en reproduceren.


Met deze fundamentele stappen voltooid, is uw model nu klaar voor implementatie. In dit tweede deel richten we ons op het integreren van twee kritieke componenten in ons systeem:


  1. Batch-inferentie : implementatie van batchverwerking om voorspellingen te genereren op basis van grote datasets, geschikt voor toepassingen zoals bulk-scoring en periodieke rapportage.
  2. Online inferentie (modelserveren) : het opzetten van realtime modelserveren om onmiddellijke voorspellingen te doen, essentieel voor interactieve toepassingen en services.
  3. Modelbewaking: om ervoor te zorgen dat uw geïmplementeerde modellen in de loop van de tijd optimale prestaties en betrouwbaarheid behouden.


Laten we beginnen!

Modelimplementatie

Het uitgangspunt van de vorige blog was modelevaluatie. Stel je nu voor dat we de vergelijking hebben gedaan en hebben ontdekt dat ons model een hogere prestatie laat zien in vergelijking met dit productiemodel. Omdat we het model in productie willen (aannemen) gebruiken, willen we profiteren van alle gegevens die we hebben. De volgende stap is om het model te trainen en te testen met behulp van de volledige dataset. Bewaar ons model vervolgens voor later gebruik door het te implementeren als ons kampioensmodel. Omdat dit het laatste model is dat we willen gebruiken voor inferentie, gebruiken we de Feature Engineering-client om het model te trainen. Op deze manier kunnen we niet alleen de modellijn gemakkelijker volgen, maar ook de schemavalidatie en featuretransformatie (indien van toepassing) uitbesteden aan de client.


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


we kunnen ook de Feature Store of Feature Engineering API's gebruiken om de modellen te trainen en te loggen

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


Wanneer we de feature engineering API gebruiken, kunnen we de afstamming van het model bekijken in Catalog Explorer

gegevenslijn in Dataticks Unity Catalog


Laten we nu de modelbeschrijving bijwerken en er een Champion-label aan toewijzen.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Ga nu verder en controleer het schema waarmee u het model hebt geregistreerd. U zou al uw updates als volgt moeten zien

Modelregister in Databricks Unity Catalog

Model stages : Als u workspace gebruikt voor modelregistratie, moet u stages gebruiken om uw modellen te beheren. Het gebruik van aliassen werkt niet. Bekijk hier om te zien hoe het werkt

Modelinferentie

Batch-scoren

Stel je nu voor dat we ons model in productie willen gebruiken voor inferentie. In deze stap laden we het championmodel en gebruiken het om 20 filmaanbevelingen voor elke gebruiker te genereren.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


en u kunt zien dat we dezelfde trainingsgegevens hebben gebruikt voor batch-scoring. Hoewel het in het geval van aanbevelingssystemen logisch is, willen we in de meeste toepassingen het model gebruiken om wat ongeziene gegevens te scoren. Stel u bijvoorbeeld voor dat u Netflix bent en de aanbevelingen van de gebruiker aan het einde van de dag wilt bijwerken op basis van hun nieuwe kijklijst. We kunnen een taak plannen die de batch-scoring op een specifiek tijdstip aan het einde van de dag uitvoert.


Nu kunnen we doorgaan en de aanbevelingen voor elke gebruiker genereren. Hiervoor vinden we de top 20 items per gebruiker

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


zo ziet het resultaat eruit

Ten slotte kunnen we de voorspelling opslaan als een deltalabel op onze UC of ze publiceren naar een downstream-systeem Mongo DB of Azure Cosmos DB. We gaan met de eerste optie


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Streaming/online-inferentie

Stel je nu eens een geval voor waarin we onze aanbevelingen willen updaten op basis van realtime gebruikersinteracties. Voor dit geval kunnen we model serving gebruiken. Wanneer iemand jouw model wil gebruiken, kan hij/zij data naar de server sturen. De server stuurt die data vervolgens naar jouw geïmplementeerde model, dat in actie komt, de data analyseert en een voorspelling genereert. Ze kunnen worden gebruikt in webapplicaties, mobiele apps of zelfs embedded systemen. Een van de toepassingen van deze aanpak is om traffic routing mogelijk te maken voor A/B-testen.


Het ALS-algoritme kan niet direct worden gebruikt voor online inferentie, omdat het vereist dat het model opnieuw wordt getraind met behulp van de volledige data (oud + nieuw) om de aanbevelingen bij te werken. Gradient Descent-leeralgoritmen zijn voorbeelden van modellen die kunnen worden gebruikt voor online updates. We kunnen in toekomstige berichten naar enkele van deze algoritmen kijken.


Om te illustreren hoe zo'n model zou werken, maken we een (nutteloos) model dat als eindpunt fungeert en filmbeoordelingen voorspelt op basis van de beoordeling door een gebruiker!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


Dit zal een lunchmodelservingcluster voor ons creëren, dus het kost wat tijd. Als u nu het Serving -venster opent, zou u uw eindpunt moeten zien.


we kunnen één eindpunt gebruiken om meerdere modellen te bedienen. Vervolgens kunnen we verkeersroutering gebruiken voor scenario's zoals A/B-testen of de prestaties van verschillende modellen in de productie vergelijken.

Inferentietabel

Inferentietabellen in Databricks Model Serving fungeren als een automatisch logboek voor onze geïmplementeerde modellen. Wanneer ingeschakeld, leggen ze inkomende verzoeken (gegevens verzonden voor voorspelling), de bijbehorende modeluitvoer (voorspellingen) en enkele andere metagegevens vast als een Delta-tabel binnen Unity Catalog. We kunnen de inferentietabel gebruiken voor monitoring en debugging , lineage tracking en een gegevensverzamelingsprocedure voor het opnieuw trainen of verfijnen van onze modellen.


We kunnen de inference table op ons serving endpoint inschakelen om het model te monitoren. We kunnen dit doen door de auto_capture_config eigenschappen in de payload te specificeren wanneer we het endpoint voor het eerst aanmaken. Of we updaten ons endpoint daarna met de put opdracht en de config endpoint URL als volgt (meer hier )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


Laten we nu het eindpunt voeden met wat dummy-gebruikersinteractiegegevens

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


We kunnen de endpoint logs controleren in de <catalog>.<schema>.<payload_table> tabel. Het duurt ongeveer 10 minuten voordat u de data in de tabel kunt zien.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


je zou zoiets moeten zien als je payloadtabel

Databricks-model dat payloadtabel serveert


Om het schema van deze inferentietabel te begrijpen, kijk hier naar “Unity catalogus inferentietabel schema==” .==


Modelbewaking

Model- en datamonitoring zijn een complex onderwerp dat veel tijd kost om onder de knie te krijgen. Databricks Lakehouse Monitoring (DLM) vermindert de overhead van het bouwen van een goed monitoringsysteem door standaard en aanpasbare sjablonen te bieden voor veelvoorkomende use cases. Het onder de knie krijgen van DLM en modelmonitoring in het algemeen vereist echter veel experimenten. Ik wil u hier geen uitgebreid overzicht van modelmonitoring geven, maar u eerder een startpunt geven. Ik wijd misschien in de toekomst een blog aan dit onderwerp.


Een korte samenvatting van DLM-functionaliteiten en -functies

Nu ons model up and running is, kunnen we de inferentietabel die door ons serving endpoint is gegenereerd gebruiken om belangrijke statistieken te monitoren, zoals modelprestaties en drift, om afwijkingen of anomalieën in onze data of model in de loop van de tijd te detecteren. Deze proactieve aanpak helpt ons om tijdig corrigerende maatregelen te nemen, zoals het opnieuw trainen van het model of het updaten van de functies, om optimale prestaties en afstemming op bedrijfsdoelstellingen te behouden.


Databricks Lakehouse Monitoring Data Architectuurbron: Databricks


DLM biedt drie typen analyse of profile type : Time Series , Snapshot en Inference . Omdat we geïnteresseerd zijn in het analyseren van onze inferencetabel, richten we ons op de laatste. Om een tabel te gebruiken voor monitoring - onze " primaire tabel ", moeten we ervoor zorgen dat de tabel de juiste structuur heeft. Voor de inferencetabel moet elke rij overeenkomen met een verzoek met de volgende kolommen:

  • modelkenmerken

  • modelvoorspelling

  • model-id

  • timestamp : tijdstempel van het inferentieverzoek

  • grondwaarheid (optioneel)


De model-id is belangrijk voor gevallen waarin we meerdere modellen bedienen en we de prestaties van elk model in één monitoringdashboard willen volgen. Als er meer dan één model-id beschikbaar is, gebruikt DLM deze om de gegevens te slicen en metrische gegevens en statistische gegevens voor elke slice afzonderlijk te berekenen.


DLM berekent elke statistiek en metriek voor een bepaald tijdsinterval. Voor inferentieanalyse gebruikte het de timestampkolom , plus een door de gebruiker gedefinieerde venstergrootte om de tijdsvensters te identificeren. meer hieronder.


DLM ondersteunt twee problem type voor inferentietabellen: " classificatie " of " regressie ". Het berekent enkele van de relevante statistieken en statistieken op basis van deze specificatie.


Om DLM te gebruiken, moeten we een monitor maken en deze aan een tabel koppelen. Wanneer we dit doen, maakt DLM twee metric tables :

  • profielmetriektabel : deze tabel bevat samenvattende statistieken zoals min, max, percentage nullen en nullen. Het bevat ook aanvullende metrieken op basis van het door de gebruiker gedefinieerde probleemtype. Bijvoorbeeld precisie , recall en f1_score voor de classificatiemodellen, en mean_squared_error en mean_average_error voor regressiemodellen.

  • drift metrische tabel : bevat statistieken die meten hoe de distributie van data is veranderd in de loop van de tijd of ten opzichte van een basislijnwaarde (indien verstrekt) . Het berekent metingen zoals Chi-kwadraattest, KS-test.


om de lijst met complete metrics voor elke tabel te bekijken, controleer Monitor metric table documentation page. Het is ook mogelijk om aangepaste metrics te maken.


Een belangrijk aspect van het bouwen van een monitoringsysteem is om ervoor te zorgen dat ons monitoringdashboard toegang heeft tot de nieuwste inferentiegegevens zodra ze binnenkomen. Hiervoor kunnen we Delta-tabelstreaming gebruiken om verwerkte rijen in de inferentietabel bij te houden. We gebruiken de inferentietabel van de modelserver als onze brontabel ( readStream ) en de monitoringtabel als de sinktabel ( writeStream ). We zorgen er ook voor dat de Change Data Capture (CDC) is ingeschakeld op beide tabellen (het is standaard ingeschakeld op de inferentietabel). Op deze manier verwerken we alleen wijzigingen - invoegen/bijwerken/verwijderen - in de brontabel in plaats van de hele tabel opnieuw te verwerken bij elke vernieuwing.

Praktisch

Om de monitoring van onze inferentietabel mogelijk te maken, nemen we de volgende stappen:

  1. Lees de inferentietabel als een streamingtabel
  2. Maak een nieuwe deltatabel met het juiste schema door de inferentietabel uit te pakken die is gegenereerd door ons modelserverend eindpunt.
  3. Maak de basislijntabel (indien van toepassing) klaar
  4. Maak een monitor over de resulterende tabel en vernieuw de metriek
  5. Plan een workflow om de inferentietabel uit te pakken naar de juiste structuur en de statistieken te vernieuwen


Eerst moeten we de Lakehouse Monitoring API installeren. Deze zou al geïnstalleerd moeten zijn als u Databricks rum time 15.3 LTS en hoger gebruikt:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Laten we de inferentietabel lezen als een streamingtabel

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


Vervolgens moeten we de tabel in het juiste formaat zetten zoals hierboven beschreven. Deze tabel moet één rij hebben voor elke voorspelling met relevante kenmerken en voorspellingswaarde. De inferentietabel die we krijgen van het model dat het eindpunt bedient, slaat de eindpuntverzoeken en -reacties op als een geneste JSON-indeling. Hier is een voorbeeld van de JSON-payload voor de aanvraag- en reactiekolom.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


Om deze tabel uit te pakken naar het juiste schema kunnen we de volgende code gebruiken die is aangepast uit de Databricks-documentatie ( Inference table Lakehouse Monitoring starter notebook ).


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


De resulterende tabel ziet er als volgt uit:

Payload tabel uitgepakt

Vervolgens moeten we onze sink-tabel initialiseren

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


en schrijf de resultaten

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Ten slotte maken we onze baselinetabel. DLM gebruikt deze tabel om de drifts te berekenen door de distributie van vergelijkbare kolommen van baseline- en primaire modellen te vergelijken. De baselinetabel moet dezelfde featurekolom hebben als de primaire kolom en dezelfde modelidentificatiekolom. Voor de baselinetabel gebruiken we de voorspellingstabel van onze validatiedataset die we eerder hebben opgeslagen nadat we ons model hebben getraind met de beste hyperparameter. Om de driftmetriek te berekenen, berekent Databricks de profielmetrieken voor zowel de primaire als de baselinetabel. Hier kunt u meer lezen over de primaire tabel en baselinetabel .


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Nu zijn we klaar om ons monitoring dashboard te creëren. We kunnen dit doen met behulp van de UI of de Lakehouse Monitoring API. Hier gebruiken we de tweede optie:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


nadat we de code hebben uitgevoerd duurt het even voordat Databricks alle metriek heeft berekend. Om het dashboard te zien ga je naar het tabblad Quality van je sinktabel (d.w.z. unpacked_requests_table_name ). Je zou een pagina als volgt moeten zien.

Databricks Model Monitoring-weergave


Als u op de refresh history klikt, ziet u de lopende, in behandeling zijnde en eerdere vernieuwingen. Klik op het View Dashboard om uw dashboard te openen.

Databricks Model Monitoring Dashboard



dus we beginnen met de inferentietabel ( my_endpoint_payload ), verwerken deze en slaan het resultaat op in my_endpoint_payload_unpacked en geven deze tabel samen met onze basislijntabel ( base_table_als ) door aan onze monitoring API. De DLM berekent de profielmetrieken voor elke tabel ( my_endpoint_payload_unpacked_profile_metric ) en gebruikt deze om de driftmetrieken te berekenen ( my_endpoint_payload_unpacked_drift_metrics )


Daar heb je het! Je hebt alles wat je nodig hebt om je model te bedienen en te monitoren!


In het volgende deel laat ik je zien hoe je dit proces kunt automatiseren met behulp van Databricks Assets Bundle en Gitlab !