paint-brush
Zbudujmy potok MLOps z Databricks i Spark — część 2przez@neshom
Nowa historia

Zbudujmy potok MLOps z Databricks i Spark — część 2

przez Mohsen Jadidi42m2024/12/29
Read on Terminal Reader

Za długo; Czytać

W drugiej części tego bloga zobaczymy, jak Databricks umożliwia nam wdrażanie wsadowe i obsługę online. Poświęcamy trochę czasu na to, jak skonfigurować pulpity nawigacyjne do monitorowania danych i modeli.
featured image - Zbudujmy potok MLOps z Databricks i Spark — część 2
Mohsen Jadidi HackerNoon profile picture
0-item
1-item
2-item

W pierwszej części tej serii samouczków wykonaliśmy pierwsze kroki w celu zbudowania kompleksowego potoku MLOps przy użyciu Databricks i Spark, kierując się architekturą referencyjną Databricks. Oto podsumowanie kluczowych kroków, które omówiliśmy:


  • Konfigurowanie katalogu Unity dla architektury Medallion : Zorganizowaliśmy nasze dane w warstwach brązowej, srebrnej i złotej w katalogu Unity, tworząc ustrukturyzowany i wydajny system zarządzania danymi.

  • Wprowadzanie danych do katalogu Unity : Pokazaliśmy, jak importować surowe dane do systemu, zapewniając spójność i jakość kolejnych etapów przetwarzania.

  • Szkolenie modelu : Wykorzystując Databricks, wyszkoliliśmy model uczenia maszynowego dostosowany do naszego zestawu danych, postępując zgodnie z najlepszymi praktykami skalowalnego i efektywnego tworzenia modeli.

  • Strojenie hiperparametrów za pomocą HyperOpt : Aby zwiększyć wydajność modelu, zastosowaliśmy HyperOpt w celu zautomatyzowania wyszukiwania optymalnych hiperparametrów, co zwiększyło dokładność i wydajność.

  • Śledzenie eksperymentów za pomocą Databricks MLflow : Wykorzystaliśmy MLflow do rejestrowania i monitorowania naszych eksperymentów, prowadząc kompleksowy rejestr wersji modeli, metryk i parametrów w celu łatwego porównywania i powtarzalności.


Po wykonaniu tych podstawowych kroków Twój model jest teraz gotowy do wdrożenia. W tej drugiej części skupimy się na zintegrowaniu dwóch krytycznych komponentów z naszym systemem:


  1. Wnioskowanie wsadowe : Implementacja przetwarzania wsadowego w celu generowania prognoz na podstawie dużych zbiorów danych, odpowiednia do zastosowań takich jak zbiorcze ocenianie i okresowe raportowanie.
  2. Wnioskowanie online (obsługa modeli) : konfigurowanie obsługi modeli w czasie rzeczywistym w celu zapewnienia natychmiastowych prognoz, co jest niezbędne w przypadku aplikacji i usług interaktywnych.
  3. Monitorowanie modelu: zapewnia, że wdrożone modele utrzymują optymalną wydajność i niezawodność na przestrzeni czasu.


No to do dzieła!

Wdrażanie modelu

Punktem wyjścia ostatniego bloga była ocena modelu. Teraz wyobraźmy sobie, że przeprowadziliśmy porównanie i odkryliśmy, że nasz model wykazuje wyższą wydajność w porównaniu z tym modelem produkcyjnym. Ponieważ chcemy (zakładamy) używać modelu w produkcji, chcemy wykorzystać wszystkie dane, które mamy. Następnym krokiem jest trenowanie i testowanie modelu przy użyciu pełnego zestawu danych. Następnie utrwalamy nasz model do późniejszego użycia, wdrażając go jako nasz model mistrzowski. Ponieważ jest to ostateczny model, którego chcemy użyć do wnioskowania, używamy klienta Feature Engineering do trenowania modelu. W ten sposób nie tylko łatwiej śledzimy pochodzenie modelu, ale także przekazujemy klientowi walidację schematu i transformację funkcji (jeśli taka istnieje).


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


możemy również użyć interfejsów API Feature Store lub Feature Engineering do trenowania i rejestrowania modeli

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


gdy używamy interfejsu API inżynierii funkcji, możemy przeglądać pochodzenie modelu w Eksploratorze katalogów

pochodzenie danych w katalogu Dataticks Unity


Teraz zaktualizujmy opis modelu i przypisajmy mu etykietę Champion.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Teraz sprawdź schemat, w którym zarejestrowałeś model. Powinny zostać wyświetlone wszystkie aktualizacje w następujący sposób

Rejestr modeli w katalogu Databricks Unity

Etapy modelu : Jeśli używasz obszaru roboczego do rejestru modeli, powinieneś zarządzać swoimi modelami za pomocą etapów. Korzystanie z aliasów nie zadziała. Sprawdź tutaj żeby zobaczyć jak to działa

Wnioskowanie z modelu

Ocena partii

Teraz wyobraźmy sobie, że chcemy użyć naszego modelu w produkcji do wnioskowania. W tym kroku ładujemy model champion i używamy go do wygenerowania 20 rekomendacji filmowych dla każdego użytkownika.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


i widać, że użyliśmy tych samych danych treningowych do punktacji wsadowej. Chociaż w przypadku systemów rekomendacji ma to sens, w większości zastosowań chcemy użyć modelu do punktacji niektórych niewidzianych danych. Na przykład, Wyobraź sobie, że jesteś Netflix i chcesz zaktualizować rekomendacje użytkowników pod koniec dnia na podstawie ich nowej listy obserwowanych. Możemy zaplanować zadanie, które uruchomi punktację wsadową o określonej porze pod koniec dnia.


Teraz możemy przejść dalej i wygenerować rekomendacje dla każdego użytkownika. W tym celu znajdujemy 20 najlepszych pozycji na użytkownika

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


Oto jak wygląda wynik

Na koniec możemy zapisać prognozę jako etykietę delta w naszym UC lub opublikować ją w systemach podrzędnych Mongo DB lub Azure Cosmos DB. Wybieramy pierwszą opcję


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Strumieniowanie/wnioskowanie online

Teraz wyobraź sobie przypadek, w którym chcemy zaktualizować nasze rekomendacje na podstawie interakcji użytkowników w czasie rzeczywistym. W tym przypadku możemy użyć obsługi modeli. Gdy ktoś chce użyć Twojego modelu, może wysłać dane do serwera. Następnie serwer przekazuje te dane do wdrożonego modelu, który wchodzi w działanie, analizuje dane i generuje prognozę. Mogą być używane w aplikacjach internetowych, aplikacjach mobilnych, a nawet systemach wbudowanych. Jednym z zastosowań tego podejścia jest umożliwienie kierowania ruchem na potrzeby testów A/B.


Algorytm ALS nie może być używany bezpośrednio do wnioskowania online, ponieważ wymaga ponownego trenowania modelu przy użyciu całych danych (starych + nowych) w celu aktualizacji rekomendacji. Algorytmy uczenia się Gradient Descent są przykładami modeli, które można wykorzystać do aktualizacji online. Możemy przyjrzeć się niektórym z tych algorytmów w przyszłym poście.


Jednak, aby zilustrować, jak taki model miałby działać, tworzymy (bezużyteczny) model obsługujący punkt końcowy, który przewiduje ocenę filmu na podstawie tego, kiedy użytkownik oceni film!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


To utworzy i uruchomi klaster obsługujący model, więc zajmie to trochę czasu. Teraz, jeśli otworzysz okno Serving , powinieneś zobaczyć swój punkt końcowy.


możemy użyć jednego punktu końcowego do obsługi wielu modeli. Następnie możemy użyć routingu ruchu dla scenariuszy, takich jak testowanie A/B lub porównanie wydajności różnych modeli w produkcji.

Tabela wnioskowania

Tabele wnioskowania w Databricks Model Serving działają jako automatyczny dziennik dla naszych wdrożonych modeli. Po włączeniu przechwytują przychodzące żądania (dane wysłane do prognozowania), odpowiadające im dane wyjściowe modelu (prognozy) i niektóre inne metadane jako tabelę Delta w Unity Catalog. Możemy użyć tabeli wnioskowania do monitorowania i debugowania , śledzenia pochodzenia oraz procedury zbierania danych do ponownego trenowania lub dostrajania naszych modeli.


Możemy włączyć inference table na naszym punkcie końcowym obsługi, aby monitorować model. Możemy to zrobić, określając właściwości auto_capture_config w ładunku, gdy po raz pierwszy tworzymy punkt końcowy. Albo aktualizujemy nasz punkt końcowy później, używając polecenia put i adresu URL punktu końcowego config w następujący sposób (więcej tutaj )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


teraz wprowadźmy do punktu końcowego pewne dane dotyczące interakcji użytkownika fikcyjnego

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


Możemy sprawdzić logi punktów końcowych w tabeli <catalog>.<schema>.<payload_table> . Zajmie to około 10 minut, zanim będzie można zobaczyć dane w tabeli.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


powinieneś zobaczyć coś takiego: twoja tabela ładunków

Model Databricks obsługujący tabelę ładunków


Aby zrozumieć schemat tej tabeli wnioskowania, sprawdź „Schemat tabeli wnioskowania katalogu Unity==” tutaj .==


Monitorowanie modelu

Monitorowanie modeli i danych to złożony temat, którego opanowanie wymaga dużo czasu. Databricks Lakehouse Monitoring (DLM) zmniejsza narzut związany z budowaniem odpowiedniego systemu monitorowania, zapewniając standardowe i konfigurowalne szablony dla typowych przypadków użycia. Jednak opanowanie DLM i monitorowania modeli ogólnie wymaga wielu eksperymentów. Nie chcę tutaj przedstawiać obszernego przeglądu monitorowania modeli, ale raczej dać punkt wyjścia. Być może w przyszłości poświęcę temu tematowi osobny wpis na blogu.


Krótkie podsumowanie funkcjonalności i cech DLM

Teraz, gdy nasz model jest już uruchomiony, możemy użyć tabeli wnioskowania wygenerowanej przez nasz punkt końcowy obsługujący, aby monitorować kluczowe wskaźniki, takie jak wydajność i dryf modelu, aby wykrywać wszelkie odchylenia lub anomalie w naszych danych lub modelu w czasie. To proaktywne podejście pomaga nam podejmować terminowe działania naprawcze, takie jak ponowne szkolenie modelu lub aktualizacja jego funkcji, aby utrzymać optymalną wydajność i zgodność z celami biznesowymi.


Databricks Lakehouse Monitoring Data Architecture źródło: Databricks


DLM zapewnia trzy typy analizy lub profile type : szereg czasowy , migawka i wnioskowanie . Ponieważ jesteśmy zainteresowani analizą naszej tabeli wnioskowania, skupiamy się na tej drugiej. Aby użyć tabeli do monitorowania - naszej „ tabeli podstawowej ”, powinniśmy upewnić się, że tabela ma właściwą strukturę. W przypadku tabeli wnioskowania każdy wiersz powinien odpowiadać żądaniom z następującymi kolumnami:

  • cechy modelu

  • prognozowanie modelu

  • identyfikator modelu

  • znacznik czasu : znacznik czasu żądania wnioskowania

  • prawda podstawowa (opcjonalnie)


Identyfikator modelu jest ważny w przypadkach, gdy obsługujemy wiele modeli i chcemy śledzić wydajność każdego modelu w jednym pulpicie monitorującym. Jeśli dostępnych jest więcej niż jeden identyfikator modelu, DLM używa go do podziału danych i obliczania metryk i statystyk dla każdego wycinka osobno.


DLM oblicza każdą statystykę i metrykę dla określonego przedziału czasu. Do analizy wnioskowania używa kolumny znacznika czasu oraz zdefiniowanego przez użytkownika rozmiaru okna, aby zidentyfikować okna czasowe. więcej poniżej.


DLM obsługuje dwa problem type dla tabel wnioskowania: „ klasyfikacja ” lub „ regresja ”. Oblicza niektóre istotne metryki i statystyki na podstawie tej specyfikacji.


Aby użyć DLM, powinniśmy utworzyć monitor i dołączyć go do tabeli. Kiedy to zrobimy, DLM utworzy dwie metric tables :

  • tabela metryk profilu : ta tabela zawiera statystyki podsumowujące, takie jak min, maks, procent wartości null i zer. Zawiera również dodatkowe metryki oparte na typie problemu zdefiniowanym przez użytkownika. Na przykład precyzja , odwołanie i f1_score dla modeli klasyfikacji oraz mean_squared_error i mean_average_error dla modeli regresji.

  • tabela metryki dryfu : zawiera statystyki mierzące, jak rozkład danych zmienił się w czasie lub w stosunku do wartości bazowej (jeśli jest podana) . Oblicza miary takie jak test chi-kwadrat, test KS.


aby zobaczyć listę kompletnych metryk dla każdej tabeli sprawdź stronę dokumentacji tabeli metryk Monitor . Możliwe jest również tworzenie niestandardowych metryk .


Ważnym aspektem tworzenia systemu monitorowania jest upewnienie się, że nasz panel monitorowania ma dostęp do najnowszych danych wnioskowania w miarę ich pojawiania się. W tym celu możemy użyć strumieniowania tabeli Delta , aby śledzić przetworzone wiersze w tabeli wnioskowania. Używamy tabeli wnioskowania obsługującej model jako naszej tabeli źródłowej ( readStream ), a tabeli monitorowania jako tabeli odbiorczej ( writeStream ). Upewniamy się również, że funkcja Change Data Capture (CDC) jest włączona w obu tabelach (jest domyślnie włączona w tabeli wnioskowania). W ten sposób przetwarzamy tylko zmiany — wstawianie/aktualizowanie/usuwanie — w tabeli źródłowej, zamiast ponownie przetwarzać całą tabelę przy każdym odświeżeniu.

Praktycznie

Aby umożliwić monitorowanie naszej tabeli wnioskowania, należy wykonać następujące kroki:

  1. Odczytaj tabelę wnioskowania jako tabelę strumieniową
  2. Utwórz nową tabelę delta z odpowiednim schematem, rozpakowując tabelę wnioskowania wygenerowaną przez nasz punkt końcowy obsługujący model.
  3. Przygotuj tabelę bazową (jeśli istnieje)
  4. Utwórz monitor nad tabelą wyników i odśwież metrykę
  5. Zaplanuj przepływ pracy, aby rozpakować tabelę wnioskowań do odpowiedniej struktury i odświeżyć metryki


Najpierw musimy zainstalować Lakehouse Monitoring API. Powinien być już zainstalowany, jeśli używasz Databricks rum time 15.3 LTS i nowszych:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Odczytujemy tabelę wnioskowania jako tabelę strumieniową

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


Następnie musimy umieścić tabelę w odpowiednim formacie, jak opisano powyżej. Ta tabela powinna mieć jeden wiersz dla każdej prognozy z odpowiednimi cechami i wartością prognozy. Tabela wnioskowania, którą otrzymujemy z punktu końcowego obsługującego model, przechowuje żądania i odpowiedzi punktu końcowego jako zagnieżdżony format JSON. Oto przykład ładunku JSON dla kolumny żądania i odpowiedzi.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


Aby rozpakować tę tabelę do odpowiedniego schematu, możemy skorzystać z następującego kodu zaczerpniętego z dokumentacji Databricks ( Inference table Lakehouse Monitoring starter notebook ).


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


Wynikowa tabela wyglądałaby następująco:

Rozpakowano tabelę ładunków

Następnie powinniśmy zainicjować naszą tabelę odbiorczą

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


i napisz wyniki

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Na koniec tworzymy naszą tabelę bazową. DLM używa tej tabeli do obliczania dryftów poprzez porównanie rozkładu podobnych kolumn modeli bazowych i podstawowych. Tabela bazowa powinna mieć tę samą kolumnę cech co kolumna podstawowa, a także tę samą kolumnę identyfikacji modelu. W przypadku tabeli bazowej używamy tabeli predykcji naszego zestawu danych walidacyjnych , który zapisaliśmy wcześniej po przeszkoleniu naszego modelu przy użyciu najlepszego hiperparametru. Aby obliczyć metrykę dryftu, Databricks oblicza metryki profilu zarówno dla tabeli podstawowej, jak i tabeli bazowej. Tutaj możesz przeczytać o tabeli podstawowej i tabeli bazowej .


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Teraz możemy utworzyć nasz panel monitorujący. Możemy to zrobić za pomocą interfejsu użytkownika lub API Lakehouse Monitoring. Tutaj używamy drugiej opcji:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


po uruchomieniu kodu potrzeba trochę czasu, aby Databricks obliczył wszystkie metryki. Aby zobaczyć pulpit, przejdź do zakładki Quality tabeli sink (tj. unpacked_requests_table_name ). Powinieneś zobaczyć następującą stronę.

Widok monitorowania modelu Databricks


Klikając na „Wyświetl refresh history , zobaczysz trwające, oczekujące i przeszłe odświeżenia. Kliknij „ View Dashboard , aby otworzyć pulpit nawigacyjny.

Panel monitorowania modelu Databricks



więc zaczynamy od tabeli wnioskowania ( my_endpoint_payload ), przetwarzamy ją i zapisujemy wynik w my_endpoint_payload_unpacked i przekazujemy tę tabelę wraz z naszą tabelą bazową ( base_table_als ) do naszego API monitorującego. DLM oblicza metryki profilu dla każdej tabeli ( my_endpoint_payload_unpacked_profile_metric ) i używa ich do obliczenia metryk dryfu ( my_endpoint_payload_unpacked_drift_metrics )


Oto wszystko! Masz wszystko, czego potrzebujesz do obsługi i monitorowania swojego modelu!


W następnej części pokażę Ci, jak zautomatyzować ten proces, używając Databricks Assets Bundle i Gitlab !

W pierwszej części tej serii samouczków wykonaliśmy pierwsze kroki w celu zbudowania kompleksowego potoku MLOps przy użyciu Databricks i Spark, kierując się architekturą referencyjną Databricks. Oto podsumowanie kluczowych kroków, które omówiliśmy:


  • Konfigurowanie katalogu Unity dla architektury Medallion : Zorganizowaliśmy nasze dane w warstwach brązowej, srebrnej i złotej w katalogu Unity, tworząc ustrukturyzowany i wydajny system zarządzania danymi.

  • Wprowadzanie danych do katalogu Unity : Pokazaliśmy, jak importować surowe dane do systemu, zapewniając spójność i jakość kolejnych etapów przetwarzania.

  • Szkolenie modelu : Wykorzystując Databricks, wyszkoliliśmy model uczenia maszynowego dostosowany do naszego zestawu danych, postępując zgodnie z najlepszymi praktykami skalowalnego i efektywnego tworzenia modeli.

  • Strojenie hiperparametrów za pomocą HyperOpt : Aby zwiększyć wydajność modelu, zastosowaliśmy HyperOpt w celu zautomatyzowania wyszukiwania optymalnych hiperparametrów, co zwiększyło dokładność i wydajność.

  • Śledzenie eksperymentów za pomocą Databricks MLflow : Wykorzystaliśmy MLflow do rejestrowania i monitorowania naszych eksperymentów, prowadząc kompleksowy rejestr wersji modeli, metryk i parametrów w celu łatwego porównywania i powtarzalności.


Po wykonaniu tych podstawowych kroków Twój model jest teraz gotowy do wdrożenia. W tej drugiej części skupimy się na zintegrowaniu dwóch krytycznych komponentów z naszym systemem:


  1. Wnioskowanie wsadowe : Implementacja przetwarzania wsadowego w celu generowania prognoz na podstawie dużych zbiorów danych, odpowiednia do zastosowań takich jak zbiorcze ocenianie i okresowe raportowanie.
  2. Wnioskowanie online (obsługa modeli) : konfigurowanie obsługi modeli w czasie rzeczywistym w celu zapewnienia natychmiastowych prognoz, co jest niezbędne w przypadku aplikacji i usług interaktywnych.
  3. Monitorowanie modelu: zapewnia, że wdrożone modele utrzymują optymalną wydajność i niezawodność na przestrzeni czasu.


No to do dzieła!

Wdrażanie modelu

Punktem wyjścia ostatniego bloga była ocena modelu. Teraz wyobraźmy sobie, że przeprowadziliśmy porównanie i odkryliśmy, że nasz model wykazuje wyższą wydajność w porównaniu z tym modelem produkcyjnym. Ponieważ chcemy (zakładamy) używać modelu w produkcji, chcemy wykorzystać wszystkie dane, które mamy. Następnym krokiem jest trenowanie i testowanie modelu przy użyciu pełnego zestawu danych. Następnie utrwalamy nasz model do późniejszego użycia, wdrażając go jako nasz model mistrzowski. Ponieważ jest to ostateczny model, którego chcemy użyć do wnioskowania, używamy klienta Feature Engineering do trenowania modelu. W ten sposób nie tylko łatwiej śledzimy pochodzenie modelu, ale także przekazujemy klientowi walidację schematu i transformację funkcji (jeśli taka istnieje).


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


możemy również użyć interfejsów API Feature Store lub Feature Engineering do trenowania i rejestrowania modeli

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


gdy używamy interfejsu API inżynierii funkcji, możemy przeglądać pochodzenie modelu w Eksploratorze katalogów

pochodzenie danych w katalogu Dataticks Unity


Teraz zaktualizujmy opis modelu i przypisajmy mu etykietę Champion.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Teraz sprawdź schemat, w którym zarejestrowałeś model. Powinny zostać wyświetlone wszystkie aktualizacje w następujący sposób

Rejestr modeli w katalogu Databricks Unity

Etapy modelu : Jeśli używasz obszaru roboczego do rejestru modeli, powinieneś zarządzać swoimi modelami za pomocą etapów. Korzystanie z aliasów nie zadziała. Sprawdź tutaj żeby zobaczyć jak to działa

Wnioskowanie z modelu

Ocena partii

Teraz wyobraźmy sobie, że chcemy użyć naszego modelu w produkcji do wnioskowania. W tym kroku ładujemy model champion i używamy go do wygenerowania 20 rekomendacji filmowych dla każdego użytkownika.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


i widać, że użyliśmy tych samych danych treningowych do punktacji wsadowej. Chociaż w przypadku systemów rekomendacji ma to sens, w większości zastosowań chcemy użyć modelu do punktacji niektórych niewidzianych danych. Na przykład, Wyobraź sobie, że jesteś Netflix i chcesz zaktualizować rekomendacje użytkowników pod koniec dnia na podstawie ich nowej listy obserwowanych. Możemy zaplanować zadanie, które uruchomi punktację wsadową o określonej porze pod koniec dnia.


Teraz możemy przejść dalej i wygenerować rekomendacje dla każdego użytkownika. W tym celu znajdujemy 20 najlepszych pozycji na użytkownika

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


Oto jak wygląda wynik

Na koniec możemy zapisać prognozę jako etykietę delta w naszym UC lub opublikować ją w systemach podrzędnych Mongo DB lub Azure Cosmos DB. Wybieramy pierwszą opcję


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Strumieniowanie/wnioskowanie online

Teraz wyobraź sobie przypadek, w którym chcemy zaktualizować nasze rekomendacje na podstawie interakcji użytkowników w czasie rzeczywistym. W tym przypadku możemy użyć obsługi modeli. Gdy ktoś chce użyć Twojego modelu, może wysłać dane do serwera. Następnie serwer przekazuje te dane do wdrożonego modelu, który wchodzi w działanie, analizuje dane i generuje prognozę. Mogą być używane w aplikacjach internetowych, aplikacjach mobilnych, a nawet systemach wbudowanych. Jednym z zastosowań tego podejścia jest umożliwienie kierowania ruchem na potrzeby testów A/B.


Algorytm ALS nie może być używany bezpośrednio do wnioskowania online, ponieważ wymaga ponownego trenowania modelu przy użyciu całych danych (starych + nowych) w celu aktualizacji rekomendacji. Algorytmy uczenia się Gradient Descent są przykładami modeli, które można wykorzystać do aktualizacji online. Możemy przyjrzeć się niektórym z tych algorytmów w przyszłym poście.


Jednak, aby zilustrować, jak taki model miałby działać, tworzymy (bezużyteczny) model obsługujący punkt końcowy, który przewiduje ocenę filmu na podstawie tego, kiedy użytkownik oceni film!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


To utworzy i uruchomi klaster obsługujący model, więc zajmie to trochę czasu. Teraz, jeśli otworzysz okno Serving , powinieneś zobaczyć swój punkt końcowy.


możemy użyć jednego punktu końcowego do obsługi wielu modeli. Następnie możemy użyć routingu ruchu dla scenariuszy, takich jak testowanie A/B lub porównanie wydajności różnych modeli w produkcji.

Tabela wnioskowania

Tabele wnioskowania w Databricks Model Serving działają jako automatyczny dziennik dla naszych wdrożonych modeli. Po włączeniu przechwytują przychodzące żądania (dane wysłane do prognozowania), odpowiadające im dane wyjściowe modelu (prognozy) i niektóre inne metadane jako tabelę Delta w Unity Catalog. Możemy użyć tabeli wnioskowania do monitorowania i debugowania , śledzenia pochodzenia oraz procedury zbierania danych do ponownego trenowania lub dostrajania naszych modeli.


Możemy włączyć inference table na naszym punkcie końcowym obsługi, aby monitorować model. Możemy to zrobić, określając właściwości auto_capture_config w ładunku, gdy po raz pierwszy tworzymy punkt końcowy. Albo aktualizujemy nasz punkt końcowy później, używając polecenia put i adresu URL punktu końcowego config w następujący sposób (więcej tutaj )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


teraz wprowadźmy do punktu końcowego pewne dane dotyczące interakcji użytkownika fikcyjnego

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


Możemy sprawdzić logi punktów końcowych w tabeli <catalog>.<schema>.<payload_table> . Zajmie to około 10 minut, zanim będzie można zobaczyć dane w tabeli.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


powinieneś zobaczyć coś takiego: twoja tabela ładunków

Model Databricks obsługujący tabelę ładunków


Aby zrozumieć schemat tej tabeli wnioskowania, sprawdź „Schemat tabeli wnioskowania katalogu Unity==” tutaj .==


Monitorowanie modelu

Monitorowanie modeli i danych to złożony temat, którego opanowanie wymaga dużo czasu. Databricks Lakehouse Monitoring (DLM) zmniejsza narzut związany z budowaniem odpowiedniego systemu monitorowania, zapewniając standardowe i konfigurowalne szablony dla typowych przypadków użycia. Jednak opanowanie DLM i monitorowania modeli ogólnie wymaga wielu eksperymentów. Nie chcę tutaj przedstawiać obszernego przeglądu monitorowania modeli, ale raczej dać punkt wyjścia. Być może w przyszłości poświęcę temu tematowi osobny wpis na blogu.


Krótkie podsumowanie funkcjonalności i cech DLM

Teraz, gdy nasz model jest już uruchomiony, możemy użyć tabeli wnioskowania wygenerowanej przez nasz punkt końcowy obsługujący, aby monitorować kluczowe wskaźniki, takie jak wydajność i dryf modelu, aby wykrywać wszelkie odchylenia lub anomalie w naszych danych lub modelu w czasie. To proaktywne podejście pomaga nam podejmować terminowe działania naprawcze, takie jak ponowne szkolenie modelu lub aktualizacja jego funkcji, aby utrzymać optymalną wydajność i zgodność z celami biznesowymi.


Databricks Lakehouse Monitoring Data Architecture źródło: Databricks


DLM zapewnia trzy typy analizy lub profile type : szereg czasowy , migawka i wnioskowanie . Ponieważ jesteśmy zainteresowani analizą naszej tabeli wnioskowania, skupiamy się na tej drugiej. Aby użyć tabeli do monitorowania - naszej „ tabeli podstawowej ”, powinniśmy upewnić się, że tabela ma właściwą strukturę. W przypadku tabeli wnioskowania każdy wiersz powinien odpowiadać żądaniom z następującymi kolumnami:

  • cechy modelu

  • prognozowanie modelu

  • identyfikator modelu

  • znacznik czasu : znacznik czasu żądania wnioskowania

  • prawda podstawowa (opcjonalnie)


Identyfikator modelu jest ważny w przypadkach, gdy obsługujemy wiele modeli i chcemy śledzić wydajność każdego modelu w jednym pulpicie monitorującym. Jeśli dostępnych jest więcej niż jeden identyfikator modelu, DLM używa go do podziału danych i obliczania metryk i statystyk dla każdego wycinka osobno.


DLM oblicza każdą statystykę i metrykę dla określonego przedziału czasu. Do analizy wnioskowania używa kolumny znacznika czasu oraz zdefiniowanego przez użytkownika rozmiaru okna, aby zidentyfikować okna czasowe. więcej poniżej.


DLM obsługuje dwa problem type dla tabel wnioskowania: „ klasyfikacja ” lub „ regresja ”. Oblicza niektóre istotne metryki i statystyki na podstawie tej specyfikacji.


Aby użyć DLM, powinniśmy utworzyć monitor i dołączyć go do tabeli. Kiedy to zrobimy, DLM utworzy dwie metric tables :

  • tabela metryk profilu : ta tabela zawiera statystyki podsumowujące, takie jak min, maks, procent wartości null i zer. Zawiera również dodatkowe metryki oparte na typie problemu zdefiniowanym przez użytkownika. Na przykład precyzja , odwołanie i f1_score dla modeli klasyfikacji oraz mean_squared_error i mean_average_error dla modeli regresji.

  • tabela metryki dryfu : zawiera statystyki mierzące, jak rozkład danych zmienił się w czasie lub w stosunku do wartości bazowej (jeśli jest podana) . Oblicza miary takie jak test chi-kwadrat, test KS.


aby zobaczyć listę kompletnych metryk dla każdej tabeli sprawdź stronę dokumentacji tabeli metryk Monitor . Możliwe jest również tworzenie niestandardowych metryk .


Ważnym aspektem tworzenia systemu monitorowania jest upewnienie się, że nasz panel monitorowania ma dostęp do najnowszych danych wnioskowania w miarę ich pojawiania się. W tym celu możemy użyć strumieniowania tabeli Delta , aby śledzić przetworzone wiersze w tabeli wnioskowania. Używamy tabeli wnioskowania obsługującej model jako naszej tabeli źródłowej ( readStream ), a tabeli monitorowania jako tabeli odbiorczej ( writeStream ). Upewniamy się również, że funkcja Change Data Capture (CDC) jest włączona w obu tabelach (jest domyślnie włączona w tabeli wnioskowania). W ten sposób przetwarzamy tylko zmiany — wstawianie/aktualizowanie/usuwanie — w tabeli źródłowej, zamiast ponownie przetwarzać całą tabelę przy każdym odświeżeniu.

Praktycznie

Aby umożliwić monitorowanie naszej tabeli wnioskowania, należy wykonać następujące kroki:

  1. Odczytaj tabelę wnioskowania jako tabelę strumieniową
  2. Utwórz nową tabelę delta z odpowiednim schematem, rozpakowując tabelę wnioskowania wygenerowaną przez nasz punkt końcowy obsługujący model.
  3. Przygotuj tabelę bazową (jeśli istnieje)
  4. Utwórz monitor nad tabelą wyników i odśwież metrykę
  5. Zaplanuj przepływ pracy, aby rozpakować tabelę wnioskowań do odpowiedniej struktury i odświeżyć metryki


Najpierw musimy zainstalować Lakehouse Monitoring API. Powinien być już zainstalowany, jeśli używasz Databricks rum time 15.3 LTS i nowszych:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Odczytujemy tabelę wnioskowania jako tabelę strumieniową

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


Następnie musimy umieścić tabelę w odpowiednim formacie, jak opisano powyżej. Ta tabela powinna mieć jeden wiersz dla każdej prognozy z odpowiednimi cechami i wartością prognozy. Tabela wnioskowania, którą otrzymujemy z punktu końcowego obsługującego model, przechowuje żądania i odpowiedzi punktu końcowego jako zagnieżdżony format JSON. Oto przykład ładunku JSON dla kolumny żądania i odpowiedzi.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


Aby rozpakować tę tabelę do odpowiedniego schematu, możemy skorzystać z następującego kodu zaczerpniętego z dokumentacji Databricks ( Inference table Lakehouse Monitoring starter notebook ).


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


Wynikowa tabela wyglądałaby następująco:

Rozpakowano tabelę ładunków

Następnie powinniśmy zainicjować naszą tabelę odbiorczą

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


i napisz wyniki

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Na koniec tworzymy naszą tabelę bazową. DLM używa tej tabeli do obliczania dryftów poprzez porównanie rozkładu podobnych kolumn modeli bazowych i podstawowych. Tabela bazowa powinna mieć tę samą kolumnę cech co kolumna podstawowa, a także tę samą kolumnę identyfikacji modelu. W przypadku tabeli bazowej używamy tabeli predykcji naszego zestawu danych walidacyjnych , który zapisaliśmy wcześniej po przeszkoleniu naszego modelu przy użyciu najlepszego hiperparametru. Aby obliczyć metrykę dryftu, Databricks oblicza metryki profilu zarówno dla tabeli podstawowej, jak i tabeli bazowej. Tutaj możesz przeczytać o tabeli podstawowej i tabeli bazowej .


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Teraz możemy utworzyć nasz panel monitorujący. Możemy to zrobić za pomocą interfejsu użytkownika lub API Lakehouse Monitoring. Tutaj używamy drugiej opcji:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


po uruchomieniu kodu potrzeba trochę czasu, aby Databricks obliczył wszystkie metryki. Aby zobaczyć pulpit, przejdź do zakładki Quality tabeli sink (tj. unpacked_requests_table_name ). Powinieneś zobaczyć następującą stronę.

Widok monitorowania modelu Databricks


Klikając na „Wyświetl refresh history , zobaczysz trwające, oczekujące i przeszłe odświeżenia. Kliknij „ View Dashboard , aby otworzyć pulpit nawigacyjny.

Panel monitorowania modelu Databricks



więc zaczynamy od tabeli wnioskowania ( my_endpoint_payload ), przetwarzamy ją i zapisujemy wynik w my_endpoint_payload_unpacked i przekazujemy tę tabelę wraz z naszą tabelą bazową ( base_table_als ) do naszego API monitorującego. DLM oblicza metryki profilu dla każdej tabeli ( my_endpoint_payload_unpacked_profile_metric ) i używa ich do obliczenia metryk dryfu ( my_endpoint_payload_unpacked_drift_metrics )


Oto wszystko! Masz wszystko, czego potrzebujesz do obsługi i monitorowania swojego modelu!


W następnej części pokażę Ci, jak zautomatyzować ten proces, używając Databricks Assets Bundle i Gitlab !