W wykonaliśmy pierwsze kroki w celu zbudowania kompleksowego potoku MLOps przy użyciu Databricks i Spark, kierując się architekturą referencyjną Databricks. Oto podsumowanie kluczowych kroków, które omówiliśmy: pierwszej części tej serii samouczków : Zorganizowaliśmy nasze dane w warstwach brązowej, srebrnej i złotej w katalogu Unity, tworząc ustrukturyzowany i wydajny system zarządzania danymi. Konfigurowanie katalogu Unity dla architektury Medallion : Pokazaliśmy, jak importować surowe dane do systemu, zapewniając spójność i jakość kolejnych etapów przetwarzania. Wprowadzanie danych do katalogu Unity : Wykorzystując Databricks, wyszkoliliśmy model uczenia maszynowego dostosowany do naszego zestawu danych, postępując zgodnie z najlepszymi praktykami skalowalnego i efektywnego tworzenia modeli. Szkolenie modelu : Aby zwiększyć wydajność modelu, zastosowaliśmy HyperOpt w celu zautomatyzowania wyszukiwania optymalnych hiperparametrów, co zwiększyło dokładność i wydajność. Strojenie hiperparametrów za pomocą HyperOpt : Wykorzystaliśmy MLflow do rejestrowania i monitorowania naszych eksperymentów, prowadząc kompleksowy rejestr wersji modeli, metryk i parametrów w celu łatwego porównywania i powtarzalności. Śledzenie eksperymentów za pomocą Databricks MLflow Po wykonaniu tych podstawowych kroków Twój model jest teraz gotowy do wdrożenia. W tej drugiej części skupimy się na zintegrowaniu dwóch krytycznych komponentów z naszym systemem: : Implementacja przetwarzania wsadowego w celu generowania prognoz na podstawie dużych zbiorów danych, odpowiednia do zastosowań takich jak zbiorcze ocenianie i okresowe raportowanie. Wnioskowanie wsadowe : konfigurowanie obsługi modeli w czasie rzeczywistym w celu zapewnienia natychmiastowych prognoz, co jest niezbędne w przypadku aplikacji i usług interaktywnych. Wnioskowanie online (obsługa modeli) zapewnia, że wdrożone modele utrzymują optymalną wydajność i niezawodność na przestrzeni czasu. Monitorowanie modelu: No to do dzieła! Wdrażanie modelu Punktem wyjścia ostatniego bloga była ocena modelu. Teraz wyobraźmy sobie, że przeprowadziliśmy porównanie i odkryliśmy, że nasz model wykazuje wyższą wydajność w porównaniu z tym modelem produkcyjnym. Ponieważ chcemy (zakładamy) używać modelu w produkcji, chcemy wykorzystać wszystkie dane, które mamy. Następnym krokiem jest trenowanie i testowanie modelu przy użyciu pełnego zestawu danych. Następnie utrwalamy nasz model do późniejszego użycia, wdrażając go jako nasz model mistrzowski. Ponieważ jest to ostateczny model, którego chcemy użyć do wnioskowania, używamy klienta Feature Engineering do trenowania modelu. W ten sposób nie tylko łatwiej śledzimy pochodzenie modelu, ale także przekazujemy klientowi walidację schematu i transformację funkcji (jeśli taka istnieje). with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) możemy również użyć do trenowania i rejestrowania modeli interfejsów API Feature Store lub Feature Engineering model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) gdy używamy interfejsu API inżynierii funkcji, możemy przeglądać pochodzenie modelu w Eksploratorze katalogów Teraz zaktualizujmy opis modelu i przypisajmy mu etykietę Champion. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) Teraz sprawdź schemat, w którym zarejestrowałeś model. Powinny zostać wyświetlone wszystkie aktualizacje w następujący sposób : Jeśli używasz obszaru roboczego do rejestru modeli, powinieneś zarządzać swoimi modelami za pomocą etapów. Korzystanie z aliasów nie zadziała. Sprawdź żeby zobaczyć jak to działa Etapy modelu tutaj Wnioskowanie z modelu Ocena partii Teraz wyobraźmy sobie, że chcemy użyć naszego modelu w produkcji do wnioskowania. W tym kroku ładujemy model champion i używamy go do wygenerowania 20 rekomendacji filmowych dla każdego użytkownika. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) i widać, że użyliśmy tych samych danych treningowych do punktacji wsadowej. Chociaż w przypadku systemów rekomendacji ma to sens, w większości zastosowań chcemy użyć modelu do punktacji niektórych niewidzianych danych. Na przykład, Wyobraź sobie, że jesteś Netflix i chcesz zaktualizować rekomendacje użytkowników pod koniec dnia na podstawie ich nowej listy obserwowanych. Możemy zaplanować zadanie, które uruchomi punktację wsadową o określonej porze pod koniec dnia. Teraz możemy przejść dalej i wygenerować rekomendacje dla każdego użytkownika. W tym celu znajdujemy 20 najlepszych pozycji na użytkownika from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) Oto jak wygląda wynik Na koniec możemy zapisać prognozę jako etykietę delta w naszym UC lub opublikować ją w systemach podrzędnych Mongo DB lub Azure Cosmos DB. Wybieramy pierwszą opcję df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") Strumieniowanie/wnioskowanie online Teraz wyobraź sobie przypadek, w którym chcemy zaktualizować nasze rekomendacje na podstawie interakcji użytkowników w czasie rzeczywistym. W tym przypadku możemy użyć obsługi modeli. Gdy ktoś chce użyć Twojego modelu, może wysłać dane do serwera. Następnie serwer przekazuje te dane do wdrożonego modelu, który wchodzi w działanie, analizuje dane i generuje prognozę. Mogą być używane w aplikacjach internetowych, aplikacjach mobilnych, a nawet systemach wbudowanych. Jednym z zastosowań tego podejścia jest umożliwienie kierowania ruchem na potrzeby testów A/B. Algorytm ALS nie może być używany bezpośrednio do wnioskowania online, ponieważ wymaga ponownego trenowania modelu przy użyciu całych danych (starych + nowych) w celu aktualizacji rekomendacji. Algorytmy uczenia się Gradient Descent są przykładami modeli, które można wykorzystać do aktualizacji online. Możemy przyjrzeć się niektórym z tych algorytmów w przyszłym poście. Jednak, aby zilustrować, jak taki model miałby działać, tworzymy (bezużyteczny) model obsługujący punkt końcowy, który przewiduje ocenę filmu na podstawie tego, kiedy użytkownik oceni film! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) To utworzy i uruchomi klaster obsługujący model, więc zajmie to trochę czasu. Teraz, jeśli otworzysz okno , powinieneś zobaczyć swój punkt końcowy. Serving możemy użyć jednego punktu końcowego do obsługi wielu modeli. Następnie możemy użyć routingu ruchu dla scenariuszy, takich jak testowanie A/B lub porównanie wydajności różnych modeli w produkcji. Tabela wnioskowania Tabele wnioskowania w Databricks Model Serving działają jako automatyczny dziennik dla naszych wdrożonych modeli. Po włączeniu przechwytują przychodzące żądania (dane wysłane do prognozowania), odpowiadające im dane wyjściowe modelu (prognozy) i niektóre inne metadane jako tabelę Delta w Unity Catalog. Możemy użyć tabeli wnioskowania do , oraz procedury zbierania danych do lub naszych modeli. monitorowania i debugowania śledzenia pochodzenia ponownego trenowania dostrajania Możemy włączyć na naszym punkcie końcowym obsługi, aby monitorować model. Możemy to zrobić, określając właściwości w ładunku, gdy po raz pierwszy tworzymy punkt końcowy. Albo aktualizujemy nasz punkt końcowy później, używając polecenia i adresu URL punktu końcowego w następujący sposób (więcej inference table auto_capture_config put config ) tutaj data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) teraz wprowadźmy do punktu końcowego pewne dane dotyczące interakcji użytkownika fikcyjnego import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) Możemy sprawdzić logi punktów końcowych w tabeli . Zajmie to około 10 minut, zanim będzie można zobaczyć dane w tabeli. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) powinieneś zobaczyć coś takiego: twoja tabela ładunków Aby zrozumieć schemat tej tabeli wnioskowania, sprawdź „Schemat tabeli wnioskowania katalogu Unity==” .== tutaj Monitorowanie modelu Monitorowanie modeli i danych to złożony temat, którego opanowanie wymaga dużo czasu. Databricks Lakehouse Monitoring (DLM) zmniejsza narzut związany z budowaniem odpowiedniego systemu monitorowania, zapewniając standardowe i konfigurowalne szablony dla typowych przypadków użycia. Jednak opanowanie DLM i monitorowania modeli ogólnie wymaga wielu eksperymentów. Nie chcę tutaj przedstawiać obszernego przeglądu monitorowania modeli, ale raczej dać punkt wyjścia. Być może w przyszłości poświęcę temu tematowi osobny wpis na blogu. Krótkie podsumowanie funkcjonalności i cech DLM Teraz, gdy nasz model jest już uruchomiony, możemy użyć tabeli wnioskowania wygenerowanej przez nasz punkt końcowy obsługujący, aby monitorować kluczowe wskaźniki, takie jak wydajność i dryf modelu, aby wykrywać wszelkie odchylenia lub anomalie w naszych danych lub modelu w czasie. To proaktywne podejście pomaga nam podejmować terminowe działania naprawcze, takie jak ponowne szkolenie modelu lub aktualizacja jego funkcji, aby utrzymać optymalną wydajność i zgodność z celami biznesowymi. DLM zapewnia trzy typy analizy lub : , i . Ponieważ jesteśmy zainteresowani analizą naszej tabeli wnioskowania, skupiamy się na tej drugiej. Aby użyć tabeli do monitorowania - naszej „ ”, powinniśmy upewnić się, że tabela ma właściwą strukturę. W przypadku każdy wiersz powinien odpowiadać żądaniom z następującymi kolumnami: profile type szereg czasowy migawka wnioskowanie tabeli podstawowej tabeli wnioskowania cechy modelu prognozowanie modelu identyfikator modelu : znacznik czasu żądania wnioskowania znacznik czasu (opcjonalnie) prawda podstawowa jest ważny w przypadkach, gdy obsługujemy wiele modeli i chcemy śledzić wydajność każdego modelu w jednym pulpicie monitorującym. Jeśli dostępnych jest więcej niż jeden identyfikator modelu, DLM używa go do podziału danych i obliczania metryk i statystyk dla każdego wycinka osobno. Identyfikator modelu DLM oblicza każdą statystykę i metrykę dla określonego przedziału czasu. Do analizy wnioskowania używa kolumny oraz zdefiniowanego przez użytkownika rozmiaru okna, aby zidentyfikować okna czasowe. więcej poniżej. znacznika czasu DLM obsługuje dwa dla tabel wnioskowania: „ ” lub „ ”. Oblicza niektóre istotne metryki i statystyki na podstawie tej specyfikacji. problem type klasyfikacja regresja Aby użyć DLM, powinniśmy utworzyć monitor i dołączyć go do tabeli. Kiedy to zrobimy, DLM utworzy dwie : metric tables : ta tabela zawiera statystyki podsumowujące, takie jak min, maks, procent wartości null i zer. Zawiera również dodatkowe metryki oparte na typie problemu zdefiniowanym przez użytkownika. Na przykład , i dla modeli klasyfikacji oraz i dla modeli regresji. tabela metryk profilu precyzja odwołanie f1_score mean_squared_error mean_average_error : zawiera statystyki mierzące, jak rozkład danych zmienił się lub w stosunku do . Oblicza miary takie jak test chi-kwadrat, test KS. tabela metryki dryfu w czasie wartości bazowej (jeśli jest podana) aby zobaczyć listę kompletnych metryk dla każdej tabeli sprawdź stronę dokumentacji . Możliwe jest również tworzenie tabeli metryk Monitor . niestandardowych metryk Ważnym aspektem tworzenia systemu monitorowania jest upewnienie się, że nasz panel monitorowania ma dostęp do najnowszych danych wnioskowania w miarę ich pojawiania się. W tym celu możemy użyć , aby śledzić przetworzone wiersze w tabeli wnioskowania. Używamy tabeli wnioskowania obsługującej model jako naszej tabeli źródłowej ( ), a tabeli monitorowania jako tabeli odbiorczej ( ). Upewniamy się również, że funkcja (CDC) jest włączona w obu tabelach (jest domyślnie włączona w tabeli wnioskowania). W ten sposób przetwarzamy tylko zmiany — wstawianie/aktualizowanie/usuwanie — w tabeli źródłowej, zamiast ponownie przetwarzać całą tabelę przy każdym odświeżeniu. strumieniowania tabeli Delta readStream writeStream Change Data Capture Praktycznie Aby umożliwić monitorowanie naszej tabeli wnioskowania, należy wykonać następujące kroki: Odczytaj tabelę wnioskowania jako tabelę strumieniową Utwórz nową tabelę delta z odpowiednim schematem, rozpakowując tabelę wnioskowania wygenerowaną przez nasz punkt końcowy obsługujący model. Przygotuj tabelę bazową (jeśli istnieje) Utwórz monitor nad tabelą wyników i odśwież metrykę Zaplanuj przepływ pracy, aby rozpakować tabelę wnioskowań do odpowiedniej struktury i odświeżyć metryki Najpierw musimy zainstalować Lakehouse Monitoring API. Powinien być już zainstalowany, jeśli używasz Databricks rum time 15.3 LTS i nowszych: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() Odczytujemy tabelę wnioskowania jako tabelę strumieniową requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True Następnie musimy umieścić tabelę w odpowiednim formacie, jak opisano powyżej. Ta tabela powinna mieć jeden wiersz dla każdej prognozy z odpowiednimi cechami i wartością prognozy. Tabela wnioskowania, którą otrzymujemy z punktu końcowego obsługującego model, przechowuje żądania i odpowiedzi punktu końcowego jako zagnieżdżony format JSON. Oto przykład ładunku JSON dla kolumny żądania i odpowiedzi. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | Aby rozpakować tę tabelę do odpowiedniego schematu, możemy skorzystać z następującego kodu zaczerpniętego z dokumentacji Databricks ( ). Inference table Lakehouse Monitoring starter notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned Wynikowa tabela wyglądałaby następująco: Następnie powinniśmy zainicjować naszą tabelę odbiorczą dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() i napisz wyniki checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ Na koniec tworzymy naszą tabelę bazową. DLM używa tej tabeli do obliczania dryftów poprzez porównanie rozkładu podobnych kolumn modeli bazowych i podstawowych. Tabela bazowa powinna mieć tę samą kolumnę cech co kolumna podstawowa, a także tę samą kolumnę identyfikacji modelu. W przypadku tabeli bazowej używamy tabeli predykcji naszego , który zapisaliśmy wcześniej po przeszkoleniu naszego modelu przy użyciu najlepszego hiperparametru. Aby obliczyć metrykę dryftu, Databricks oblicza metryki profilu zarówno dla tabeli podstawowej, jak i tabeli bazowej. Tutaj możesz przeczytać o zestawu danych walidacyjnych . tabeli podstawowej i tabeli bazowej #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") Teraz możemy utworzyć nasz panel monitorujący. Możemy to zrobić za pomocą lub API Lakehouse Monitoring. Tutaj używamy drugiej opcji: interfejsu użytkownika # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) po uruchomieniu kodu potrzeba trochę czasu, aby Databricks obliczył wszystkie metryki. Aby zobaczyć pulpit, przejdź do zakładki tabeli sink (tj. ). Powinieneś zobaczyć następującą stronę. Quality unpacked_requests_table_name Klikając na „Wyświetl , zobaczysz trwające, oczekujące i przeszłe odświeżenia. Kliknij „ , aby otworzyć pulpit nawigacyjny. refresh history View Dashboard więc zaczynamy od tabeli wnioskowania ( ), przetwarzamy ją i zapisujemy wynik w i przekazujemy tę tabelę wraz z naszą tabelą bazową ( ) do naszego API monitorującego. DLM oblicza metryki profilu dla każdej tabeli ( ) i używa ich do obliczenia metryk dryfu ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics Oto wszystko! Masz wszystko, czego potrzebujesz do obsługi i monitorowania swojego modelu! W następnej części pokażę Ci, jak zautomatyzować ten proces, używając i ! Databricks Assets Bundle Gitlab W wykonaliśmy pierwsze kroki w celu zbudowania kompleksowego potoku MLOps przy użyciu Databricks i Spark, kierując się architekturą referencyjną Databricks. Oto podsumowanie kluczowych kroków, które omówiliśmy: pierwszej części tej serii samouczków : Zorganizowaliśmy nasze dane w warstwach brązowej, srebrnej i złotej w katalogu Unity, tworząc ustrukturyzowany i wydajny system zarządzania danymi. Konfigurowanie katalogu Unity dla architektury Medallion : Pokazaliśmy, jak importować surowe dane do systemu, zapewniając spójność i jakość kolejnych etapów przetwarzania. Wprowadzanie danych do katalogu Unity : Wykorzystując Databricks, wyszkoliliśmy model uczenia maszynowego dostosowany do naszego zestawu danych, postępując zgodnie z najlepszymi praktykami skalowalnego i efektywnego tworzenia modeli. Szkolenie modelu : Aby zwiększyć wydajność modelu, zastosowaliśmy HyperOpt w celu zautomatyzowania wyszukiwania optymalnych hiperparametrów, co zwiększyło dokładność i wydajność. Strojenie hiperparametrów za pomocą HyperOpt : Wykorzystaliśmy MLflow do rejestrowania i monitorowania naszych eksperymentów, prowadząc kompleksowy rejestr wersji modeli, metryk i parametrów w celu łatwego porównywania i powtarzalności. Śledzenie eksperymentów za pomocą Databricks MLflow Po wykonaniu tych podstawowych kroków Twój model jest teraz gotowy do wdrożenia. W tej drugiej części skupimy się na zintegrowaniu dwóch krytycznych komponentów z naszym systemem: : Implementacja przetwarzania wsadowego w celu generowania prognoz na podstawie dużych zbiorów danych, odpowiednia do zastosowań takich jak zbiorcze ocenianie i okresowe raportowanie. Wnioskowanie wsadowe : konfigurowanie obsługi modeli w czasie rzeczywistym w celu zapewnienia natychmiastowych prognoz, co jest niezbędne w przypadku aplikacji i usług interaktywnych. Wnioskowanie online (obsługa modeli) zapewnia, że wdrożone modele utrzymują optymalną wydajność i niezawodność na przestrzeni czasu. Monitorowanie modelu: No to do dzieła! Wdrażanie modelu Punktem wyjścia ostatniego bloga była ocena modelu. Teraz wyobraźmy sobie, że przeprowadziliśmy porównanie i odkryliśmy, że nasz model wykazuje wyższą wydajność w porównaniu z tym modelem produkcyjnym. Ponieważ chcemy (zakładamy) używać modelu w produkcji, chcemy wykorzystać wszystkie dane, które mamy. Następnym krokiem jest trenowanie i testowanie modelu przy użyciu pełnego zestawu danych. Następnie utrwalamy nasz model do późniejszego użycia, wdrażając go jako nasz model mistrzowski. Ponieważ jest to ostateczny model, którego chcemy użyć do wnioskowania, używamy klienta Feature Engineering do trenowania modelu. W ten sposób nie tylko łatwiej śledzimy pochodzenie modelu, ale także przekazujemy klientowi walidację schematu i transformację funkcji (jeśli taka istnieje). with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) możemy również użyć do trenowania i rejestrowania modeli interfejsów API Feature Store lub Feature Engineering model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) gdy używamy interfejsu API inżynierii funkcji, możemy przeglądać pochodzenie modelu w Eksploratorze katalogów Teraz zaktualizujmy opis modelu i przypisajmy mu etykietę Champion. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) Teraz sprawdź schemat, w którym zarejestrowałeś model. Powinny zostać wyświetlone wszystkie aktualizacje w następujący sposób : Jeśli używasz obszaru roboczego do rejestru modeli, powinieneś zarządzać swoimi modelami za pomocą etapów. Korzystanie z aliasów nie zadziała. Sprawdź żeby zobaczyć jak to działa Etapy modelu tutaj Wnioskowanie z modelu Ocena partii Teraz wyobraźmy sobie, że chcemy użyć naszego modelu w produkcji do wnioskowania. W tym kroku ładujemy model champion i używamy go do wygenerowania 20 rekomendacji filmowych dla każdego użytkownika. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) i widać, że użyliśmy tych samych danych treningowych do punktacji wsadowej. Chociaż w przypadku systemów rekomendacji ma to sens, w większości zastosowań chcemy użyć modelu do punktacji niektórych niewidzianych danych. Na przykład, Wyobraź sobie, że jesteś Netflix i chcesz zaktualizować rekomendacje użytkowników pod koniec dnia na podstawie ich nowej listy obserwowanych. Możemy zaplanować zadanie, które uruchomi punktację wsadową o określonej porze pod koniec dnia. Teraz możemy przejść dalej i wygenerować rekomendacje dla każdego użytkownika. W tym celu znajdujemy 20 najlepszych pozycji na użytkownika from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) Oto jak wygląda wynik Na koniec możemy zapisać prognozę jako etykietę delta w naszym UC lub opublikować ją w systemach podrzędnych Mongo DB lub Azure Cosmos DB. Wybieramy pierwszą opcję df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") Strumieniowanie/wnioskowanie online Teraz wyobraź sobie przypadek, w którym chcemy zaktualizować nasze rekomendacje na podstawie interakcji użytkowników w czasie rzeczywistym. W tym przypadku możemy użyć obsługi modeli. Gdy ktoś chce użyć Twojego modelu, może wysłać dane do serwera. Następnie serwer przekazuje te dane do wdrożonego modelu, który wchodzi w działanie, analizuje dane i generuje prognozę. Mogą być używane w aplikacjach internetowych, aplikacjach mobilnych, a nawet systemach wbudowanych. Jednym z zastosowań tego podejścia jest umożliwienie kierowania ruchem na potrzeby testów A/B. Algorytm ALS nie może być używany bezpośrednio do wnioskowania online, ponieważ wymaga ponownego trenowania modelu przy użyciu całych danych (starych + nowych) w celu aktualizacji rekomendacji. Algorytmy uczenia się Gradient Descent są przykładami modeli, które można wykorzystać do aktualizacji online. Możemy przyjrzeć się niektórym z tych algorytmów w przyszłym poście. Jednak, aby zilustrować, jak taki model miałby działać, tworzymy (bezużyteczny) model obsługujący punkt końcowy, który przewiduje ocenę filmu na podstawie tego, kiedy użytkownik oceni film! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) To utworzy i uruchomi klaster obsługujący model, więc zajmie to trochę czasu. Teraz, jeśli otworzysz okno , powinieneś zobaczyć swój punkt końcowy. Serving możemy użyć jednego punktu końcowego do obsługi wielu modeli. Następnie możemy użyć routingu ruchu dla scenariuszy, takich jak testowanie A/B lub porównanie wydajności różnych modeli w produkcji. Tabela wnioskowania Tabele wnioskowania w Databricks Model Serving działają jako automatyczny dziennik dla naszych wdrożonych modeli. Po włączeniu przechwytują przychodzące żądania (dane wysłane do prognozowania), odpowiadające im dane wyjściowe modelu (prognozy) i niektóre inne metadane jako tabelę Delta w Unity Catalog. Możemy użyć tabeli wnioskowania do , oraz procedury zbierania danych do lub naszych modeli. monitorowania i debugowania śledzenia pochodzenia ponownego trenowania dostrajania Możemy włączyć na naszym punkcie końcowym obsługi, aby monitorować model. Możemy to zrobić, określając właściwości w ładunku, gdy po raz pierwszy tworzymy punkt końcowy. Albo aktualizujemy nasz punkt końcowy później, używając polecenia i adresu URL punktu końcowego w następujący sposób (więcej inference table auto_capture_config put config ) tutaj data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) teraz wprowadźmy do punktu końcowego pewne dane dotyczące interakcji użytkownika fikcyjnego import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) Możemy sprawdzić logi punktów końcowych w tabeli . Zajmie to około 10 minut, zanim będzie można zobaczyć dane w tabeli. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) powinieneś zobaczyć coś takiego: twoja tabela ładunków Aby zrozumieć schemat tej tabeli wnioskowania, sprawdź „Schemat tabeli wnioskowania katalogu Unity==” .== tutaj Monitorowanie modelu Monitorowanie modeli i danych to złożony temat, którego opanowanie wymaga dużo czasu. Databricks Lakehouse Monitoring (DLM) zmniejsza narzut związany z budowaniem odpowiedniego systemu monitorowania, zapewniając standardowe i konfigurowalne szablony dla typowych przypadków użycia. Jednak opanowanie DLM i monitorowania modeli ogólnie wymaga wielu eksperymentów. Nie chcę tutaj przedstawiać obszernego przeglądu monitorowania modeli, ale raczej dać punkt wyjścia. Być może w przyszłości poświęcę temu tematowi osobny wpis na blogu. Krótkie podsumowanie funkcjonalności i cech DLM Teraz, gdy nasz model jest już uruchomiony, możemy użyć tabeli wnioskowania wygenerowanej przez nasz punkt końcowy obsługujący, aby monitorować kluczowe wskaźniki, takie jak wydajność i dryf modelu, aby wykrywać wszelkie odchylenia lub anomalie w naszych danych lub modelu w czasie. To proaktywne podejście pomaga nam podejmować terminowe działania naprawcze, takie jak ponowne szkolenie modelu lub aktualizacja jego funkcji, aby utrzymać optymalną wydajność i zgodność z celami biznesowymi. DLM zapewnia trzy typy analizy lub : , i . Ponieważ jesteśmy zainteresowani analizą naszej tabeli wnioskowania, skupiamy się na tej drugiej. Aby użyć tabeli do monitorowania - naszej „ ”, powinniśmy upewnić się, że tabela ma właściwą strukturę. W przypadku każdy wiersz powinien odpowiadać żądaniom z następującymi kolumnami: profile type szereg czasowy migawka wnioskowanie tabeli podstawowej tabeli wnioskowania cechy modelu prognozowanie modelu identyfikator modelu : znacznik czasu żądania wnioskowania znacznik czasu (opcjonalnie) prawda podstawowa jest ważny w przypadkach, gdy obsługujemy wiele modeli i chcemy śledzić wydajność każdego modelu w jednym pulpicie monitorującym. Jeśli dostępnych jest więcej niż jeden identyfikator modelu, DLM używa go do podziału danych i obliczania metryk i statystyk dla każdego wycinka osobno. Identyfikator modelu DLM oblicza każdą statystykę i metrykę dla określonego przedziału czasu. Do analizy wnioskowania używa kolumny oraz zdefiniowanego przez użytkownika rozmiaru okna, aby zidentyfikować okna czasowe. więcej poniżej. znacznika czasu DLM obsługuje dwa dla tabel wnioskowania: „ ” lub „ ”. Oblicza niektóre istotne metryki i statystyki na podstawie tej specyfikacji. problem type klasyfikacja regresja Aby użyć DLM, powinniśmy utworzyć monitor i dołączyć go do tabeli. Kiedy to zrobimy, DLM utworzy dwie : metric tables : ta tabela zawiera statystyki podsumowujące, takie jak min, maks, procent wartości null i zer. Zawiera również dodatkowe metryki oparte na typie problemu zdefiniowanym przez użytkownika. Na przykład , i dla modeli klasyfikacji oraz i dla modeli regresji. tabela metryk profilu precyzja odwołanie f1_score mean_squared_error mean_average_error : zawiera statystyki mierzące, jak rozkład danych zmienił się lub w stosunku do . Oblicza miary takie jak test chi-kwadrat, test KS. tabela metryki dryfu w czasie wartości bazowej (jeśli jest podana) aby zobaczyć listę kompletnych metryk dla każdej tabeli sprawdź stronę dokumentacji . Możliwe jest również tworzenie tabeli metryk Monitor . niestandardowych metryk Ważnym aspektem tworzenia systemu monitorowania jest upewnienie się, że nasz panel monitorowania ma dostęp do najnowszych danych wnioskowania w miarę ich pojawiania się. W tym celu możemy użyć , aby śledzić przetworzone wiersze w tabeli wnioskowania. Używamy tabeli wnioskowania obsługującej model jako naszej tabeli źródłowej ( ), a tabeli monitorowania jako tabeli odbiorczej ( ). Upewniamy się również, że funkcja (CDC) jest włączona w obu tabelach (jest domyślnie włączona w tabeli wnioskowania). W ten sposób przetwarzamy tylko zmiany — wstawianie/aktualizowanie/usuwanie — w tabeli źródłowej, zamiast ponownie przetwarzać całą tabelę przy każdym odświeżeniu. strumieniowania tabeli Delta readStream writeStream Change Data Capture Praktycznie Aby umożliwić monitorowanie naszej tabeli wnioskowania, należy wykonać następujące kroki: Odczytaj tabelę wnioskowania jako tabelę strumieniową Utwórz nową tabelę delta z odpowiednim schematem, rozpakowując tabelę wnioskowania wygenerowaną przez nasz punkt końcowy obsługujący model. Przygotuj tabelę bazową (jeśli istnieje) Utwórz monitor nad tabelą wyników i odśwież metrykę Zaplanuj przepływ pracy, aby rozpakować tabelę wnioskowań do odpowiedniej struktury i odświeżyć metryki Najpierw musimy zainstalować Lakehouse Monitoring API. Powinien być już zainstalowany, jeśli używasz Databricks rum time 15.3 LTS i nowszych: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() Odczytujemy tabelę wnioskowania jako tabelę strumieniową requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True Następnie musimy umieścić tabelę w odpowiednim formacie, jak opisano powyżej. Ta tabela powinna mieć jeden wiersz dla każdej prognozy z odpowiednimi cechami i wartością prognozy. Tabela wnioskowania, którą otrzymujemy z punktu końcowego obsługującego model, przechowuje żądania i odpowiedzi punktu końcowego jako zagnieżdżony format JSON. Oto przykład ładunku JSON dla kolumny żądania i odpowiedzi. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | Aby rozpakować tę tabelę do odpowiedniego schematu, możemy skorzystać z następującego kodu zaczerpniętego z dokumentacji Databricks ( ). Inference table Lakehouse Monitoring starter notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned Wynikowa tabela wyglądałaby następująco: Następnie powinniśmy zainicjować naszą tabelę odbiorczą dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() i napisz wyniki checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ Na koniec tworzymy naszą tabelę bazową. DLM używa tej tabeli do obliczania dryftów poprzez porównanie rozkładu podobnych kolumn modeli bazowych i podstawowych. Tabela bazowa powinna mieć tę samą kolumnę cech co kolumna podstawowa, a także tę samą kolumnę identyfikacji modelu. W przypadku tabeli bazowej używamy tabeli predykcji naszego , który zapisaliśmy wcześniej po przeszkoleniu naszego modelu przy użyciu najlepszego hiperparametru. Aby obliczyć metrykę dryftu, Databricks oblicza metryki profilu zarówno dla tabeli podstawowej, jak i tabeli bazowej. Tutaj możesz przeczytać o zestawu danych walidacyjnych . tabeli podstawowej i tabeli bazowej #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") Teraz możemy utworzyć nasz panel monitorujący. Możemy to zrobić za pomocą lub API Lakehouse Monitoring. Tutaj używamy drugiej opcji: interfejsu użytkownika # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) po uruchomieniu kodu potrzeba trochę czasu, aby Databricks obliczył wszystkie metryki. Aby zobaczyć pulpit, przejdź do zakładki tabeli sink (tj. ). Powinieneś zobaczyć następującą stronę. Quality unpacked_requests_table_name Klikając na „Wyświetl , zobaczysz trwające, oczekujące i przeszłe odświeżenia. Kliknij „ , aby otworzyć pulpit nawigacyjny. refresh history View Dashboard więc zaczynamy od tabeli wnioskowania ( ), przetwarzamy ją i zapisujemy wynik w i przekazujemy tę tabelę wraz z naszą tabelą bazową ( ) do naszego API monitorującego. DLM oblicza metryki profilu dla każdej tabeli ( ) i używa ich do obliczenia metryk dryfu ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics Oto wszystko! Masz wszystko, czego potrzebujesz do obsługi i monitorowania swojego modelu! W następnej części pokażę Ci, jak zautomatyzować ten proces, używając i ! Databricks Assets Bundle Gitlab