В направихме първите стъпки за изграждане на MLOps тръбопровод от край до край с помощта на Databricks и Spark, ръководени от референтната архитектура на Databricks. Ето обобщение на основните стъпки, които разгледахме: първата част от тази поредица от уроци : Ние организирахме нашите данни в бронзови, сребърни и златни слоеве в рамките на каталога Unity, създавайки структурирана и ефективна система за управление на данни. Настройване на каталога Unity за Medallion Architecture : Ние демонстрирахме как да импортираме необработени данни в системата, като гарантираме последователност и качество за следващите етапи на обработка. Поглъщане на данни в каталога на Unity : Използвайки Databricks, ние обучихме модел за машинно обучение, съобразен с нашия набор от данни, следвайки най-добрите практики за мащабируемо и ефективно разработване на модели. Обучение на модела : За да подобрим производителността на модела, използвахме HyperOpt, за да автоматизираме търсенето на оптимални хиперпараметри, подобрявайки точността и ефективността. Настройка на хиперпараметър с HyperOpt : Използвахме MLflow, за да регистрираме и наблюдаваме нашите експерименти, поддържайки изчерпателен запис на версиите на модела, показателите и параметрите за лесно сравнение и възпроизводимост. Проследяване на експерименти с Databricks MLflow След като тези основни стъпки са завършени, вашият модел вече е готов за внедряване. В тази втора част ще се съсредоточим върху интегрирането на два критични компонента в нашата система: : Внедряване на пакетна обработка за генериране на прогнози за големи масиви от данни, подходящи за приложения като групово оценяване и периодично отчитане. Пакетно заключение : Настройване на обслужване на модел в реално време за предоставяне на незабавни прогнози, от съществено значение за интерактивни приложения и услуги. Онлайн извод (обслужване на модел) за да гарантирате, че вашите внедрени модели поддържат оптимална производителност и надеждност във времето. Мониторинг на модела: Нека влезем в него! Внедряване на модела Отправната точка на последния блог беше оценката на модела. Сега си представете, че направихме сравнението и установихме, че нашият модел показва по-висока производителност в сравнение с този сериен модел. Тъй като искаме (предполагаме) да използваме модела в производството, искаме да се възползваме от всички данни, които имаме. Следващата стъпка е да обучите и тествате модела, като използвате пълния набор от данни. След това запазете нашия модел за по-късна употреба, като го внедрите като наш шампионски модел. Тъй като това е крайният модел, който искаме да използваме за извод, ние използваме клиента Feature Engineering, за да обучим модела. По този начин ние не само проследяваме по-лесно родословието на модела, но и прехвърляме валидирането на схемата и трансформацията на функциите (ако има такива) на клиента. with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) можем също да използваме за да обучим и регистрираме моделите API за хранилище на функции или API за инженерство на функции, model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) когато използваме API за проектиране на функции, можем да видим родословието на модела в Catalog Explorer Сега нека актуализираме описанието на модела и да му присвоим етикет Champion. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) Сега продължете напред и проверете схемата, в която сте регистрирали модела. трябва да видите всичките си актуализации, както следва : Ако използвате работно пространство за регистър на моделите, трябва да извършвате етапи за управление на вашите модели. Използването на псевдоними няма да работи. Вижте за да видите как работи Етапи на модела тук Извод на модела Партидно точкуване Сега си представете, че искаме да използваме нашия модел в производството за извод. В тази стъпка зареждаме шампионския модел и го използваме, за да генерираме 20 препоръки за филми за всеки потребител. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) и можете да видите, че използвахме едни и същи данни за обучение за партидно оценяване. Въпреки че в случай на препоръчителни системи има смисъл, в повечето приложения искаме да използваме модела, за да оценим някои невиждани данни. Например, Imaging сте Netflix и искате да актуализирате потребителските препоръки в края на деня въз основа на техния нов списък за гледане. Можем да насрочим работа, която изпълнява партидното оценяване в определено време в края на деня. Сега можем да продължим и да генерираме препоръките за всеки потребител. За това намираме топ 20 артикула за всеки потребител from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) ето как изглежда резултатът Накрая можем да съхраним прогнозата като делта етикет в нашия UC или да ги публикуваме в системи надолу по веригата Mongo DB или Azure Cosmos DB. Отиваме на първия вариант df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") Поточно предаване/онлайн извод Сега си представете случай, в който искаме да актуализираме нашите препоръки въз основа на потребителски взаимодействия в реално време. За този случай можем да използваме сервиране на модели. Когато някой иска да използва вашия модел, той може да изпрати данни на сървъра. След това сървърът подава тези данни към вашия внедрен модел, който влиза в действие, анализира данните и генерира прогноза. Те могат да се използват в уеб приложения, мобилни приложения или дори вградени системи. Едно от приложенията на този подход е да се даде възможност за маршрутизиране на трафика за A/B тестване. Алгоритъмът ALS не може да се използва директно за онлайн извод, тъй като изисква повторно обучение на модела, като се използват всички данни (стари + нови), за да се актуализират препоръките. Алгоритмите за обучение Gradient Descent са примери за модел, който може да се използва за онлайн актуализации. Може да разгледаме някои от тези алгоритми в бъдеща публикация. Въпреки това, само за да илюстрираме как би работил такъв модел, ние създаваме (безполезен) модел, обслужващ крайна точка, която предвижда рейтинг на филм въз основа на това, когато потребителят оцени филм! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) Това ще създаде клъстер за сервиране на обяд, така че отнема известно време. Сега, ако отворите прозореца , трябва да видите вашата крайна точка. Serving можем да използваме една крайна точка, за да обслужваме множество модели. След това можем да използваме маршрутизиране на трафика за сценарии като A/B тестване или да сравним производителността на различните модели в производството. Таблица за изводи Таблиците за изводи в Databricks Model Serving действат като автоматичен журнал за нашите внедрени модели. Когато са активирани, те улавят входящи заявки (данни, изпратени за прогнозиране), съответните изходи на модела (прогнози) и някои други метаданни като делта таблица в Unity Catalog. Можем да използваме таблица за изводи за , и процедура за събиране на данни за или на нашите модели. наблюдение и отстраняване на грешки проследяване на родословието повторно обучение фина настройка Можем да активираме на нашата обслужваща крайна точка, за да наблюдаваме модела. Можем да го направим, като посочим свойствата в полезния товар, когато за първи път създадем крайната точка. Или актуализираме нашата крайна точка след това, като използваме командата и URL адреса на крайната точка , както следва (повече inference table auto_capture_config put config ) тук data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) сега нека захраним крайната точка с някои фиктивни данни за взаимодействие с потребителя import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) Можем да проверим регистрационните файлове на крайната точка в таблицата . Отнема около 10 минути, докато можете да видите данните в таблицата. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) трябва да видите нещо подобно на вашата таблица с полезен товар За да разберете схемата на тази таблица за изводи, проверете „Схема на таблицата за изводи на Unity каталог==“ .== тук Мониторинг на модела Мониторинг на модели и данни - сложна тема, която изисква много време за овладяване. Databricks Lakehouse Monitoring (DLM) намалява режийните разходи за изграждане на подходяща система за наблюдение, като предоставя стандартни и адаптивни шаблони за обичайни случаи на употреба. Въпреки това, овладяването на DLM и мониторинга на модела като цяло изисква много експерименти. Тук не искам да ви давам обширен преглед на мониторинга на модела, а по-скоро да ви дам отправна точка. Може би в бъдеще ще посветя блог на тази тема. Кратко резюме на функциите и характеристиките на DLM Сега, след като нашият модел е готов и работи, можем да използваме таблица за изводи, генерирана от нашата обслужваща крайна точка, за да наблюдаваме ключови показатели като производителност на модела и дрейф, за да открием всякакви отклонения или аномалии в нашите данни или модел с течение на времето. Този проактивен подход ни помага да предприемем навременни коригиращи действия, като преобучение на модела или актуализиране на неговите характеристики, за да поддържаме оптимална производителност и съответствие с бизнес целите. DLM предоставя три типа анализ или : , и . Тъй като се интересуваме от анализа на нашата таблица за изводи, ние се фокусираме върху последната. За да използваме таблица за наблюдение - нашата „ “, трябва да се уверим, че таблицата има правилната структура. За всеки ред трябва да съответства на заявка със следните колони: profile type Времеви серии Моментна снимка Извод основна таблица таблицата за изводи характеристики на модела прогнозиране на модела ID на модела : клеймо за време на заявката за извод клеймо за време (по избор) основна истина е важен за случаите, когато обслужваме множество модели и искаме да проследим ефективността на всеки модел в едно табло за наблюдение. Ако има повече от един наличен идентификатор на модел, DLM го използва, за да раздели данните и да изчисли показатели и статики за всеки сегмент поотделно. Идентификационният номер на модела DLM изчислява всяка статистика и показател за определен интервал от време. За анализ на изводи той използва колоната плюс дефиниран от потребителя размер на прозореца за идентифициране на времевите прозорци. повече по-долу. с времеви клейма DLM поддържа два за таблици за изводи: „ “ или „ “. Той изчислява някои от съответните показатели и статистики въз основа на тази спецификация. problem type класификация регресия За да използваме DLM, трябва да създадем монитор и да го прикрепим към маса. Когато правим това, DLM създава две : metric tables : тази таблица съдържа обобщени статистически данни като мин., макс., процент нула и нули. Той също така съдържа допълнителни показатели въз основа на типа проблем, дефиниран от потребителя. Например , и за класификационните модели и и за регресионните модели. таблица с показатели на профила precision recall f1_score mean_squared_error mean_average_error : тя съдържа статистически данни, които измерват как разпределението на данните се е променило или спрямо . Той изчислява мерки като хи-квадрат тест, KS тест. таблица с показатели за отклонение във времето базова стойност (ако е предоставена) за да видите списъка с пълните показатели за всяка таблица, проверете страницата с документация . Възможно е също така да създадете на таблицата с показатели на монитора . персонализирани показатели Важен аспект от изграждането на система за наблюдение е да се уверим, че нашето табло за наблюдение има достъп до най-новите данни за изводи, когато пристигнат. За това можем да използваме , за да следим обработените редове в таблицата за изводи. Ние използваме таблицата за изводи на модела, обслужваща като наша изходна таблица ( ), а таблицата за наблюдение като таблица приемник ( ). Ние също така се уверяваме, че (CDC) е активирано и в двете таблици (то е активирано по подразбиране в таблицата за изводи). По този начин ние обработваме само промени - вмъкване/актуализиране/изтриване - в изходната таблица, вместо да обработваме отново цялата таблица при всяко опресняване. поточно предаване на делта таблица readStream writeStream записването на данни за промяна Практически За да активираме наблюдението върху нашата таблица за изводи, предприемаме следните стъпки: Прочетете таблицата за изводи като таблица за поточно предаване Създайте нова делта таблица с правилната схема, като разопаковате таблицата за изводи, която е генерирана от нашата крайна точка, обслужваща модела. Подгответе базовата таблица (ако има такава) Създайте монитор върху получената таблица и опреснете показателя Планирайте работен поток, за да разопаковате таблицата за изводи в правилната структура и да опресните показателите Първо трябва да инсталираме API за наблюдение на Lakehouse. Трябва вече да е инсталиран, ако използвате Databricks rum time 15.3 LTS и по-нови: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() Нека прочетем таблицата за изводи като таблица за поточно предаване requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True След това трябва да поставим таблицата в правилния формат, както е описано по-горе. Тази таблица трябва да има един ред за всяка прогноза със съответните характеристики и стойност на прогнозата. Таблицата за изводи, която получаваме от крайната точка, обслужваща модела, съхранява заявките и отговорите на крайната точка като вложен JSON формат. Ето пример за JSON полезен товар за колоната за заявка и отговор. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | За да разопаковаме тази таблица в правилната схема, можем да използваме следния код, който е адаптиран от документацията на Databricks ( ). Inference table Lakehouse Monitoring starter notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned Получената таблица ще изглежда така: След това трябва да инициализираме нашата таблица за мивка dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() и напишете резултатите checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ Накрая създаваме нашата базова таблица. DLM използва тази таблица, за да изчисли отклоненията чрез сравняване на разпределението на подобни колони на базови и първични модели. Базовата таблица трябва да има същата колона с характеристики като основната колона, както и същата колона за идентификация на модела. За базова таблица използваме таблицата за прогнозиране на нашия , който съхраняваме по-рано, след като сме обучили нашия модел, използвайки най-добрия хиперпараметър. За да изчисли показателя за отклонение, Databricks изчислява показателите на профила както за основната, така и за базовата таблица. Тук можете да прочетете за набор от данни за валидиране . основната таблица и базовата таблица #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") Сега сме готови да създадем нашето табло за наблюдение. Можем да го направим или с помощта на или API за наблюдение на Lakehouse. Тук използваме втория вариант: потребителския интерфейс # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) след като стартираме кода, отнема известно време, докато Databricks изчисли всички показатели. За да видите таблото за управление, отидете в раздела на вашата таблица за приемане (т.е. ). Трябва да видите страница, както следва. Quality unpacked_requests_table_name Ако щракнете върху ще видите вашите текущи, чакащи и минали опреснявания. щракнете върху за да отворите таблото си за управление. refresh history View Dashboard така че започваме с таблицата за изводи ( ), обработваме я и запазваме резултата в и предаваме тази таблица заедно с нашата базова таблица ( ) към нашия API за наблюдение. DLM изчислява показателите на профила за всяка таблица ( ) и ги използва за изчисляване на показателите за отклонение ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics Ето го! имате всичко необходимо за обслужване и наблюдение на вашия модел! В следващата част ще ви покажа как да автоматизирате този процес с помощта на и ! Databricks Assets Bundle Gitlab В направихме първите стъпки за изграждане на MLOps тръбопровод от край до край с помощта на Databricks и Spark, ръководени от референтната архитектура на Databricks. Ето обобщение на основните стъпки, които разгледахме: първата част от тази поредица от уроци : Ние организирахме нашите данни в бронзови, сребърни и златни слоеве в рамките на каталога Unity, създавайки структурирана и ефективна система за управление на данни. Настройване на каталога Unity за Medallion Architecture : Ние демонстрирахме как да импортираме необработени данни в системата, като гарантираме последователност и качество за следващите етапи на обработка. Поглъщане на данни в каталога на Unity : Използвайки Databricks, ние обучихме модел за машинно обучение, съобразен с нашия набор от данни, следвайки най-добрите практики за мащабируемо и ефективно разработване на модели. Обучение на модела : За да подобрим производителността на модела, използвахме HyperOpt, за да автоматизираме търсенето на оптимални хиперпараметри, подобрявайки точността и ефективността. Настройка на хиперпараметри с HyperOpt : Използвахме MLflow, за да регистрираме и наблюдаваме нашите експерименти, поддържайки изчерпателен запис на версиите на модела, показателите и параметрите за лесно сравнение и възпроизводимост. Проследяване на експерименти с Databricks MLflow След като тези основни стъпки са завършени, вашият модел вече е готов за внедряване. В тази втора част ще се съсредоточим върху интегрирането на два критични компонента в нашата система: : Внедряване на пакетна обработка за генериране на прогнози за големи масиви от данни, подходящи за приложения като групово оценяване и периодично отчитане. Пакетно заключение : Настройване на обслужване на модел в реално време за предоставяне на незабавни прогнози, от съществено значение за интерактивни приложения и услуги. Онлайн извод (обслужване на модел) за да гарантирате, че вашите внедрени модели поддържат оптимална производителност и надеждност във времето. Мониторинг на модела: Нека влезем в него! Внедряване на модела Отправната точка на последния блог беше оценката на модела. Сега си представете, че направихме сравнението и установихме, че нашият модел показва по-висока производителност в сравнение с този сериен модел. Тъй като искаме (предполагаме) да използваме модела в производството, искаме да се възползваме от всички данни, които имаме. Следващата стъпка е да обучите и тествате модела, като използвате пълния набор от данни. След това запазете нашия модел за по-късна употреба, като го внедрите като наш шампионски модел. Тъй като това е крайният модел, който искаме да използваме за извод, ние използваме клиента Feature Engineering, за да обучим модела. По този начин ние не само проследяваме по-лесно родословието на модела, но и прехвърляме валидирането на схемата и трансформацията на функциите (ако има такива) на клиента. with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) можем също да използваме за да обучим и регистрираме моделите API за хранилище на функции или API за инженерство на функции, model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) когато използваме API за проектиране на функции, можем да видим родословието на модела в Catalog Explorer Сега нека актуализираме описанието на модела и да му присвоим етикет Champion. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) Сега продължете напред и проверете схемата, в която сте регистрирали модела. трябва да видите всичките си актуализации, както следва : Ако използвате работно пространство за регистър на моделите, трябва да извършвате етапи за управление на вашите модели. Използването на псевдоними няма да работи. Вижте за да видите как работи Етапи на модела тук Извод на модела Партидно точкуване Сега си представете, че искаме да използваме нашия модел в производството за извод. В тази стъпка зареждаме шампионския модел и го използваме, за да генерираме 20 препоръки за филми за всеки потребител. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) и можете да видите, че използвахме едни и същи данни за обучение за партидно оценяване. Въпреки че в случай на препоръчителни системи има смисъл, в повечето приложения искаме да използваме модела, за да оценим някои невиждани данни. Например, Imaging сте Netflix и искате да актуализирате потребителските препоръки в края на деня въз основа на техния нов списък за гледане. Можем да насрочим работа, която изпълнява партидното оценяване в определено време в края на деня. Сега можем да продължим и да генерираме препоръките за всеки потребител. За това намираме топ 20 артикула за всеки потребител from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) ето как изглежда резултатът Накрая можем да съхраним прогнозата като делта етикет в нашия UC или да ги публикуваме в системи надолу по веригата Mongo DB или Azure Cosmos DB. Отиваме на първия вариант df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") Поточно предаване/онлайн извод Сега си представете случай, в който искаме да актуализираме нашите препоръки въз основа на потребителски взаимодействия в реално време. За този случай можем да използваме сервиране на модели. Когато някой иска да използва вашия модел, той може да изпрати данни на сървъра. След това сървърът подава тези данни към вашия внедрен модел, който влиза в действие, анализира данните и генерира прогноза. Те могат да се използват в уеб приложения, мобилни приложения или дори вградени системи. Едно от приложенията на този подход е да се даде възможност за маршрутизиране на трафика за A/B тестване. Алгоритъмът ALS не може да се използва директно за онлайн извод, тъй като изисква повторно обучение на модела, като се използват всички данни (стари + нови), за да се актуализират препоръките. Алгоритмите за обучение Gradient Descent са примери за модел, който може да се използва за онлайн актуализации. Може да разгледаме някои от тези алгоритми в бъдеща публикация. Въпреки това, само за да илюстрираме как би работил такъв модел, ние създаваме (безполезен) модел, обслужващ крайна точка, която предвижда рейтинг на филм въз основа на това, когато потребителят оцени филм! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) Това ще създаде клъстер за сервиране на обяд, така че отнема известно време. Сега, ако отворите прозореца , трябва да видите вашата крайна точка. Serving можем да използваме една крайна точка, за да обслужваме множество модели. След това можем да използваме маршрутизиране на трафика за сценарии като A/B тестване или да сравним производителността на различните модели в производството. Таблица за изводи Таблиците за изводи в Databricks Model Serving действат като автоматичен журнал за нашите внедрени модели. Когато са активирани, те улавят входящи заявки (данни, изпратени за прогнозиране), съответните изходи на модела (прогнози) и някои други метаданни като делта таблица в Unity Catalog. Можем да използваме таблица за изводи за , и процедура за събиране на данни за или на нашите модели. наблюдение и отстраняване на грешки проследяване на родословието повторно обучение фина настройка Можем да активираме на нашата обслужваща крайна точка, за да наблюдаваме модела. Можем да го направим, като посочим свойствата в полезния товар, когато за първи път създадем крайната точка. Или актуализираме нашата крайна точка след това, като използваме командата и URL адреса на крайната точка , както следва (повече inference table auto_capture_config put config ) тук data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) сега нека захраним крайната точка с някои фиктивни данни за взаимодействие с потребителя import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) Можем да проверим регистрационните файлове на крайната точка в таблицата . Отнема около 10 минути, докато можете да видите данните в таблицата. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) трябва да видите нещо подобно на вашата таблица с полезен товар За да разберете схемата на тази таблица за изводи, проверете „Схема на таблицата за изводи на Unity каталог==“ .== тук Мониторинг на модела Мониторинг на модели и данни - сложна тема, която изисква много време за овладяване. Databricks Lakehouse Monitoring (DLM) намалява режийните разходи за изграждане на подходяща система за наблюдение, като предоставя стандартни и адаптивни шаблони за обичайни случаи на употреба. Въпреки това, овладяването на DLM и мониторинга на модела като цяло изисква много експерименти. Тук не искам да ви давам обширен преглед на мониторинга на модела, а по-скоро да ви дам отправна точка. Може би в бъдеще ще посветя блог на тази тема. Кратко резюме на функциите и характеристиките на DLM Сега, след като нашият модел е готов и работи, можем да използваме таблица за изводи, генерирана от нашата обслужваща крайна точка, за да наблюдаваме ключови показатели като производителност на модела и дрейф, за да открием всякакви отклонения или аномалии в нашите данни или модел с течение на времето. Този проактивен подход ни помага да предприемем навременни коригиращи действия, като преобучение на модела или актуализиране на неговите функции, за да поддържаме оптимална производителност и съответствие с бизнес целите. DLM предоставя три типа анализ или : , и . Тъй като се интересуваме от анализа на нашата таблица за изводи, ние се фокусираме върху последната. За да използваме таблица за наблюдение - нашата „ “, трябва да се уверим, че таблицата има правилната структура. За всеки ред трябва да съответства на заявка със следните колони: profile type Времеви серии Моментна снимка Извод основна таблица таблицата за изводи характеристики на модела прогнозиране на модела ID на модела : клеймо за време на заявката за извод клеймо за време (по избор) основна истина е важен за случаите, когато обслужваме множество модели и искаме да проследим ефективността на всеки модел в едно табло за наблюдение. Ако има повече от един наличен идентификатор на модел, DLM го използва за разделяне на данните и изчисляване на показатели и статики за всеки сегмент поотделно. Идентификационният номер на модела DLM изчислява всяка статистика и показател за определен интервал от време. За анализ на изводи той използва колоната плюс дефиниран от потребителя размер на прозореца за идентифициране на времевите прозорци. повече по-долу. с времеви клейма DLM поддържа два за таблици за изводи: „ “ или „ “. Той изчислява някои от съответните показатели и статистики въз основа на тази спецификация. problem type класификация регресия За да използваме DLM, трябва да създадем монитор и да го прикрепим към маса. Когато правим това, DLM създава две : metric tables : тази таблица съдържа обобщени статистически данни като мин., макс., процент на нула и нули. Той също така съдържа допълнителни показатели въз основа на типа проблем, дефиниран от потребителя. Например , и за класификационните модели и и за регресионните модели. таблица с показатели на профила precision recall f1_score mean_squared_error mean_average_error : тя съдържа статистически данни, които измерват как разпределението на данните се е променило или спрямо . Той изчислява мерки като хи-квадрат тест, KS тест. таблица с показатели за отклонение във времето базова стойност (ако е предоставена) за да видите списъка с пълните показатели за всяка таблица, проверете страницата с документация . Възможно е също така да създадете на таблицата с показатели на монитора . персонализирани показатели Важен аспект от изграждането на система за наблюдение е да се уверим, че нашето табло за наблюдение има достъп до най-новите данни за изводи, когато пристигнат. За това можем да използваме , за да следим обработените редове в таблицата за изводи. Ние използваме таблицата за изводи на модела, обслужваща като нашата изходна таблица ( ), а таблицата за наблюдение като таблица приемник ( ). Ние също така се уверяваме, че (CDC) е активирано и в двете таблици (то е активирано по подразбиране в таблицата за изводи). По този начин ние обработваме само промени - вмъкване/актуализиране/изтриване - в изходната таблица, вместо да обработваме отново цялата таблица при всяко опресняване. поточно предаване на делта таблица readStream writeStream записването на данни за промяна Практически За да активираме наблюдението върху нашата таблица за изводи, предприемаме следните стъпки: Прочетете таблицата за изводи като таблица за поточно предаване Създайте нова делта таблица с правилната схема, като разопаковате таблицата за изводи, която е генерирана от нашата крайна точка, обслужваща модела. Подгответе базовата таблица (ако има такава) Създайте монитор върху получената таблица и опреснете показателя Планирайте работен поток, за да разопаковате таблицата за изводи в правилната структура и да опресните показателите Първо трябва да инсталираме API за наблюдение на Lakehouse. Трябва вече да е инсталиран, ако използвате Databricks rum time 15.3 LTS и по-нови: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() Нека прочетем таблицата за изводи като таблица за поточно предаване requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True След това трябва да поставим таблицата в правилния формат, както е описано по-горе. Тази таблица трябва да има един ред за всяка прогноза със съответните характеристики и стойност на прогнозата. Таблицата за изводи, която получаваме от крайната точка, обслужваща модела, съхранява заявките и отговорите на крайната точка като вложен JSON формат. Ето пример за JSON полезен товар за колоната за заявка и отговор. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | За да разопаковаме тази таблица в правилната схема, можем да използваме следния код, който е адаптиран от документацията на Databricks ( ). Inference table Lakehouse Monitoring starter notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned Получената таблица ще изглежда така: След това трябва да инициализираме нашата таблица за мивка dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() и напишете резултатите checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ Накрая създаваме нашата базова таблица. DLM използва тази таблица, за да изчисли отклоненията чрез сравняване на разпределението на подобни колони на базови и първични модели. Базовата таблица трябва да има същата колона с характеристики като основната колона, както и същата колона за идентификация на модела. За базова таблица използваме таблицата за прогнозиране на нашия , който съхраняваме по-рано, след като сме обучили нашия модел, използвайки най-добрия хиперпараметър. За да изчисли показателя за отклонение, Databricks изчислява показателите на профила както за основната, така и за базовата таблица. Тук можете да прочетете за набор от данни за валидиране . основната таблица и базовата таблица #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") Сега сме готови да създадем нашето табло за наблюдение. Можем да го направим или с помощта на или API за наблюдение на Lakehouse. Тук използваме втория вариант: потребителския интерфейс # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) след като стартираме кода, отнема известно време, докато Databricks изчисли всички показатели. За да видите таблото за управление, отидете в раздела на вашата таблица за приемане (т.е. ). Трябва да видите страница, както следва. Quality unpacked_requests_table_name Ако щракнете върху ще видите вашите текущи, чакащи и минали опреснявания. щракнете върху за да отворите таблото си за управление. refresh history View Dashboard така че започваме с таблицата за изводи ( ), обработваме я и запазваме резултата в и предаваме тази таблица заедно с нашата базова таблица ( ) към нашия API за наблюдение. DLM изчислява показателите на профила за всяка таблица ( ) и ги използва за изчисляване на показателите за отклонение ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics Ето го! имате всичко необходимо за обслужване и наблюдение на вашия модел! В следващата част ще ви покажа как да автоматизирате този процес с помощта на и ! Databricks Assets Bundle Gitlab