paint-brush
Нека изградим MLOps тръбопровод с Databricks и Spark – част 2от@neshom
Нова история

Нека изградим MLOps тръбопровод с Databricks и Spark – част 2

от Mohsen Jadidi42m2024/12/29
Read on Terminal Reader

Твърде дълго; Чета

Във втората част на този блог виждаме как Databricks ни дава възможност за пакетно внедряване и онлайн обслужване. Отделяме известно време за това как да настроим таблата за управление на данни и моделиране на мониторинг.
featured image - Нека изградим MLOps тръбопровод с Databricks и Spark – част 2
Mohsen Jadidi HackerNoon profile picture
0-item
1-item
2-item

В първата част от тази поредица от уроци направихме първите стъпки за изграждане на MLOps тръбопровод от край до край с помощта на Databricks и Spark, ръководени от референтната архитектура на Databricks. Ето обобщение на основните стъпки, които разгледахме:


  • Настройване на каталога Unity за Medallion Architecture : Ние организирахме нашите данни в бронзови, сребърни и златни слоеве в рамките на каталога Unity, създавайки структурирана и ефективна система за управление на данни.

  • Поглъщане на данни в каталога на Unity : Ние демонстрирахме как да импортираме необработени данни в системата, като гарантираме последователност и качество за следващите етапи на обработка.

  • Обучение на модела : Използвайки Databricks, ние обучихме модел за машинно обучение, съобразен с нашия набор от данни, следвайки най-добрите практики за мащабируемо и ефективно разработване на модели.

  • Настройка на хиперпараметър с HyperOpt : За да подобрим производителността на модела, използвахме HyperOpt, за да автоматизираме търсенето на оптимални хиперпараметри, подобрявайки точността и ефективността.

  • Проследяване на експерименти с Databricks MLflow : Използвахме MLflow, за да регистрираме и наблюдаваме нашите експерименти, поддържайки изчерпателен запис на версиите на модела, показателите и параметрите за лесно сравнение и възпроизводимост.


След като тези основни стъпки са завършени, вашият модел вече е готов за внедряване. В тази втора част ще се съсредоточим върху интегрирането на два критични компонента в нашата система:


  1. Пакетно заключение : Внедряване на пакетна обработка за генериране на прогнози за големи масиви от данни, подходящи за приложения като групово оценяване и периодично отчитане.
  2. Онлайн извод (обслужване на модел) : Настройване на обслужване на модел в реално време за предоставяне на незабавни прогнози, от съществено значение за интерактивни приложения и услуги.
  3. Мониторинг на модела: за да гарантирате, че вашите внедрени модели поддържат оптимална производителност и надеждност във времето.


Нека влезем в него!

Внедряване на модела

Отправната точка на последния блог беше оценката на модела. Сега си представете, че направихме сравнението и установихме, че нашият модел показва по-висока производителност в сравнение с този сериен модел. Тъй като искаме (предполагаме) да използваме модела в производството, искаме да се възползваме от всички данни, които имаме. Следващата стъпка е да обучите и тествате модела, като използвате пълния набор от данни. След това запазете нашия модел за по-късна употреба, като го внедрите като наш шампионски модел. Тъй като това е крайният модел, който искаме да използваме за извод, ние използваме клиента Feature Engineering, за да обучим модела. По този начин ние не само проследяваме по-лесно родословието на модела, но и прехвърляме валидирането на схемата и трансформацията на функциите (ако има такива) на клиента.


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


можем също да използваме API за хранилище на функции или API за инженерство на функции, за да обучим и регистрираме моделите

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


когато използваме API за проектиране на функции, можем да видим родословието на модела в Catalog Explorer

родословие на данни в Dataticks Unity Catalog


Сега нека актуализираме описанието на модела и да му присвоим етикет Champion.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Сега продължете напред и проверете схемата, в която сте регистрирали модела. трябва да видите всичките си актуализации, както следва

Регистър на модела в Databricks Unity Catalog

Етапи на модела : Ако използвате работно пространство за регистър на моделите, трябва да извършвате етапи за управление на вашите модели. Използването на псевдоними няма да работи. Вижте тук за да видите как работи

Извод на модела

Партидно точкуване

Сега си представете, че искаме да използваме нашия модел в производството за извод. В тази стъпка зареждаме шампионския модел и го използваме, за да генерираме 20 препоръки за филми за всеки потребител.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


и можете да видите, че използвахме едни и същи данни за обучение за партидно оценяване. Въпреки че в случай на препоръчителни системи има смисъл, в повечето приложения искаме да използваме модела, за да оценим някои невиждани данни. Например, Imaging сте Netflix и искате да актуализирате потребителските препоръки в края на деня въз основа на техния нов списък за гледане. Можем да насрочим работа, която изпълнява партидното оценяване в определено време в края на деня.


Сега можем да продължим и да генерираме препоръките за всеки потребител. За това намираме топ 20 артикула за всеки потребител

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


ето как изглежда резултатът

Накрая можем да съхраним прогнозата като делта етикет в нашия UC или да ги публикуваме в системи надолу по веригата Mongo DB или Azure Cosmos DB. Отиваме на първия вариант


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Поточно предаване/онлайн извод

Сега си представете случай, в който искаме да актуализираме нашите препоръки въз основа на потребителски взаимодействия в реално време. За този случай можем да използваме сервиране на модели. Когато някой иска да използва вашия модел, той може да изпрати данни на сървъра. След това сървърът подава тези данни към вашия внедрен модел, който влиза в действие, анализира данните и генерира прогноза. Те могат да се използват в уеб приложения, мобилни приложения или дори вградени системи. Едно от приложенията на този подход е да се даде възможност за маршрутизиране на трафика за A/B тестване.


Алгоритъмът ALS не може да се използва директно за онлайн извод, тъй като изисква повторно обучение на модела, като се използват всички данни (стари + нови), за да се актуализират препоръките. Алгоритмите за обучение Gradient Descent са примери за модел, който може да се използва за онлайн актуализации. Може да разгледаме някои от тези алгоритми в бъдеща публикация.


Въпреки това, само за да илюстрираме как би работил такъв модел, ние създаваме (безполезен) модел, обслужващ крайна точка, която предвижда рейтинг на филм въз основа на това, когато потребителят оцени филм!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


Това ще създаде клъстер за сервиране на обяд, така че отнема известно време. Сега, ако отворите прозореца Serving , трябва да видите вашата крайна точка.


можем да използваме една крайна точка, за да обслужваме множество модели. След това можем да използваме маршрутизиране на трафика за сценарии като A/B тестване или да сравним производителността на различните модели в производството.

Таблица за изводи

Таблиците за изводи в Databricks Model Serving действат като автоматичен журнал за нашите внедрени модели. Когато са активирани, те улавят входящи заявки (данни, изпратени за прогнозиране), съответните изходи на модела (прогнози) и някои други метаданни като делта таблица в Unity Catalog. Можем да използваме таблица за изводи за наблюдение и отстраняване на грешки , проследяване на родословието и процедура за събиране на данни за повторно обучение или фина настройка на нашите модели.


Можем да активираме inference table на нашата обслужваща крайна точка, за да наблюдаваме модела. Можем да го направим, като посочим свойствата auto_capture_config в полезния товар, когато за първи път създадем крайната точка. Или актуализираме нашата крайна точка след това, като използваме командата put и URL адреса на крайната точка config , както следва (повече тук )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


сега нека захраним крайната точка с някои фиктивни данни за взаимодействие с потребителя

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


Можем да проверим регистрационните файлове на крайната точка в таблицата <catalog>.<schema>.<payload_table> . Отнема около 10 минути, докато можете да видите данните в таблицата.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


трябва да видите нещо подобно на вашата таблица с полезен товар

Databricks модел, обслужващ таблица с полезен товар


За да разберете схемата на тази таблица за изводи, проверете „Схема на таблицата за изводи на Unity каталог==“ тук .==


Мониторинг на модела

Мониторинг на модели и данни - сложна тема, която изисква много време за овладяване. Databricks Lakehouse Monitoring (DLM) намалява режийните разходи за изграждане на подходяща система за наблюдение, като предоставя стандартни и адаптивни шаблони за обичайни случаи на употреба. Въпреки това, овладяването на DLM и мониторинга на модела като цяло изисква много експерименти. Тук не искам да ви давам обширен преглед на мониторинга на модела, а по-скоро да ви дам отправна точка. Може би в бъдеще ще посветя блог на тази тема.


Кратко резюме на функциите и характеристиките на DLM

Сега, след като нашият модел е готов и работи, можем да използваме таблица за изводи, генерирана от нашата обслужваща крайна точка, за да наблюдаваме ключови показатели като производителност на модела и дрейф, за да открием всякакви отклонения или аномалии в нашите данни или модел с течение на времето. Този проактивен подход ни помага да предприемем навременни коригиращи действия, като преобучение на модела или актуализиране на неговите характеристики, за да поддържаме оптимална производителност и съответствие с бизнес целите.


Databricks Lakehouse Monitoring Data Architecture източник: Databricks


DLM предоставя три типа анализ или profile type : Времеви серии , Моментна снимка и Извод . Тъй като се интересуваме от анализа на нашата таблица за изводи, ние се фокусираме върху последната. За да използваме таблица за наблюдение - нашата „ основна таблица “, трябва да се уверим, че таблицата има правилната структура. За таблицата за изводи всеки ред трябва да съответства на заявка със следните колони:

  • характеристики на модела

  • прогнозиране на модела

  • ID на модела

  • клеймо за време : клеймо за време на заявката за извод

  • основна истина (по избор)


Идентификационният номер на модела е важен за случаите, когато обслужваме множество модели и искаме да проследим ефективността на всеки модел в едно табло за наблюдение. Ако има повече от един наличен идентификатор на модел, DLM го използва, за да раздели данните и да изчисли показатели и статики за всеки сегмент поотделно.


DLM изчислява всяка статистика и показател за определен интервал от време. За анализ на изводи той използва колоната с времеви клейма плюс дефиниран от потребителя размер на прозореца за идентифициране на времевите прозорци. повече по-долу.


DLM поддържа два problem type за таблици за изводи: „ класификация “ или „ регресия “. Той изчислява някои от съответните показатели и статистики въз основа на тази спецификация.


За да използваме DLM, трябва да създадем монитор и да го прикрепим към маса. Когато правим това, DLM създава две metric tables :

  • таблица с показатели на профила : тази таблица съдържа обобщени статистически данни като мин., макс., процент нула и нули. Той също така съдържа допълнителни показатели въз основа на типа проблем, дефиниран от потребителя. Например precision , recall и f1_score за класификационните модели и mean_squared_error и mean_average_error за регресионните модели.

  • таблица с показатели за отклонение : тя съдържа статистически данни, които измерват как разпределението на данните се е променило във времето или спрямо базова стойност (ако е предоставена) . Той изчислява мерки като хи-квадрат тест, KS тест.


за да видите списъка с пълните показатели за всяка таблица, проверете страницата с документация на таблицата с показатели на монитора . Възможно е също така да създадете персонализирани показатели .


Важен аспект от изграждането на система за наблюдение е да се уверим, че нашето табло за наблюдение има достъп до най-новите данни за изводи, когато пристигнат. За това можем да използваме поточно предаване на делта таблица , за да следим обработените редове в таблицата за изводи. Ние използваме таблицата за изводи на модела, обслужваща като наша изходна таблица ( readStream ), а таблицата за наблюдение като таблица приемник ( writeStream ). Ние също така се уверяваме, че записването на данни за промяна (CDC) е активирано и в двете таблици (то е активирано по подразбиране в таблицата за изводи). По този начин ние обработваме само промени - вмъкване/актуализиране/изтриване - в изходната таблица, вместо да обработваме отново цялата таблица при всяко опресняване.

Практически

За да активираме наблюдението върху нашата таблица за изводи, предприемаме следните стъпки:

  1. Прочетете таблицата за изводи като таблица за поточно предаване
  2. Създайте нова делта таблица с правилната схема, като разопаковате таблицата за изводи, която е генерирана от нашата крайна точка, обслужваща модела.
  3. Подгответе базовата таблица (ако има такава)
  4. Създайте монитор върху получената таблица и опреснете показателя
  5. Планирайте работен поток, за да разопаковате таблицата за изводи в правилната структура и да опресните показателите


Първо трябва да инсталираме API за наблюдение на Lakehouse. Трябва вече да е инсталиран, ако използвате Databricks rum time 15.3 LTS и по-нови:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Нека прочетем таблицата за изводи като таблица за поточно предаване

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


След това трябва да поставим таблицата в правилния формат, както е описано по-горе. Тази таблица трябва да има един ред за всяка прогноза със съответните характеристики и стойност на прогнозата. Таблицата за изводи, която получаваме от крайната точка, обслужваща модела, съхранява заявките и отговорите на крайната точка като вложен JSON формат. Ето пример за JSON полезен товар за колоната за заявка и отговор.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


За да разопаковаме тази таблица в правилната схема, можем да използваме следния код, който е адаптиран от документацията на Databricks ( Inference table Lakehouse Monitoring starter notebook ).


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


Получената таблица ще изглежда така:

Таблицата с полезен товар е разопакована

След това трябва да инициализираме нашата таблица за мивка

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


и напишете резултатите

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Накрая създаваме нашата базова таблица. DLM използва тази таблица, за да изчисли отклоненията чрез сравняване на разпределението на подобни колони на базови и първични модели. Базовата таблица трябва да има същата колона с характеристики като основната колона, както и същата колона за идентификация на модела. За базова таблица използваме таблицата за прогнозиране на нашия набор от данни за валидиране , който съхраняваме по-рано, след като сме обучили нашия модел, използвайки най-добрия хиперпараметър. За да изчисли показателя за отклонение, Databricks изчислява показателите на профила както за основната, така и за базовата таблица. Тук можете да прочетете за основната таблица и базовата таблица .


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Сега сме готови да създадем нашето табло за наблюдение. Можем да го направим или с помощта на потребителския интерфейс или API за наблюдение на Lakehouse. Тук използваме втория вариант:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


след като стартираме кода, отнема известно време, докато Databricks изчисли всички показатели. За да видите таблото за управление, отидете в раздела Quality на вашата таблица за приемане (т.е. unpacked_requests_table_name ). Трябва да видите страница, както следва.

Изглед за наблюдение на модел Databricks


Ако щракнете върху refresh history ще видите вашите текущи, чакащи и минали опреснявания. щракнете върху View Dashboard за да отворите таблото си за управление.

Табло за наблюдение на модел Databricks



така че започваме с таблицата за изводи ( my_endpoint_payload ), обработваме я и запазваме резултата в my_endpoint_payload_unpacked и предаваме тази таблица заедно с нашата базова таблица ( base_table_als ) към нашия API за наблюдение. DLM изчислява показателите на профила за всяка таблица ( my_endpoint_payload_unpacked_profile_metric ) и ги използва за изчисляване на показателите за отклонение ( my_endpoint_payload_unpacked_drift_metrics )


Ето го! имате всичко необходимо за обслужване и наблюдение на вашия модел!


В следващата част ще ви покажа как да автоматизирате този процес с помощта на Databricks Assets Bundle и Gitlab !

В първата част от тази поредица от уроци направихме първите стъпки за изграждане на MLOps тръбопровод от край до край с помощта на Databricks и Spark, ръководени от референтната архитектура на Databricks. Ето обобщение на основните стъпки, които разгледахме:


  • Настройване на каталога Unity за Medallion Architecture : Ние организирахме нашите данни в бронзови, сребърни и златни слоеве в рамките на каталога Unity, създавайки структурирана и ефективна система за управление на данни.

  • Поглъщане на данни в каталога на Unity : Ние демонстрирахме как да импортираме необработени данни в системата, като гарантираме последователност и качество за следващите етапи на обработка.

  • Обучение на модела : Използвайки Databricks, ние обучихме модел за машинно обучение, съобразен с нашия набор от данни, следвайки най-добрите практики за мащабируемо и ефективно разработване на модели.

  • Настройка на хиперпараметри с HyperOpt : За да подобрим производителността на модела, използвахме HyperOpt, за да автоматизираме търсенето на оптимални хиперпараметри, подобрявайки точността и ефективността.

  • Проследяване на експерименти с Databricks MLflow : Използвахме MLflow, за да регистрираме и наблюдаваме нашите експерименти, поддържайки изчерпателен запис на версиите на модела, показателите и параметрите за лесно сравнение и възпроизводимост.


След като тези основни стъпки са завършени, вашият модел вече е готов за внедряване. В тази втора част ще се съсредоточим върху интегрирането на два критични компонента в нашата система:


  1. Пакетно заключение : Внедряване на пакетна обработка за генериране на прогнози за големи масиви от данни, подходящи за приложения като групово оценяване и периодично отчитане.
  2. Онлайн извод (обслужване на модел) : Настройване на обслужване на модел в реално време за предоставяне на незабавни прогнози, от съществено значение за интерактивни приложения и услуги.
  3. Мониторинг на модела: за да гарантирате, че вашите внедрени модели поддържат оптимална производителност и надеждност във времето.


Нека влезем в него!

Внедряване на модела

Отправната точка на последния блог беше оценката на модела. Сега си представете, че направихме сравнението и установихме, че нашият модел показва по-висока производителност в сравнение с този сериен модел. Тъй като искаме (предполагаме) да използваме модела в производството, искаме да се възползваме от всички данни, които имаме. Следващата стъпка е да обучите и тествате модела, като използвате пълния набор от данни. След това запазете нашия модел за по-късна употреба, като го внедрите като наш шампионски модел. Тъй като това е крайният модел, който искаме да използваме за извод, ние използваме клиента Feature Engineering, за да обучим модела. По този начин ние не само проследяваме по-лесно родословието на модела, но и прехвърляме валидирането на схемата и трансформацията на функциите (ако има такива) на клиента.


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


можем също да използваме API за хранилище на функции или API за инженерство на функции, за да обучим и регистрираме моделите

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


когато използваме API за проектиране на функции, можем да видим родословието на модела в Catalog Explorer

родословие на данни в Dataticks Unity Catalog


Сега нека актуализираме описанието на модела и да му присвоим етикет Champion.

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


Сега продължете напред и проверете схемата, в която сте регистрирали модела. трябва да видите всичките си актуализации, както следва

Регистър на модела в Databricks Unity Catalog

Етапи на модела : Ако използвате работно пространство за регистър на моделите, трябва да извършвате етапи за управление на вашите модели. Използването на псевдоними няма да работи. Вижте тук за да видите как работи

Извод на модела

Партидно точкуване

Сега си представете, че искаме да използваме нашия модел в производството за извод. В тази стъпка зареждаме шампионския модел и го използваме, за да генерираме 20 препоръки за филми за всеки потребител.


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


и можете да видите, че използвахме едни и същи данни за обучение за партидно оценяване. Въпреки че в случай на препоръчителни системи има смисъл, в повечето приложения искаме да използваме модела, за да оценим някои невиждани данни. Например, Imaging сте Netflix и искате да актуализирате потребителските препоръки в края на деня въз основа на техния нов списък за гледане. Можем да насрочим работа, която изпълнява партидното оценяване в определено време в края на деня.


Сега можем да продължим и да генерираме препоръките за всеки потребител. За това намираме топ 20 артикула за всеки потребител

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


ето как изглежда резултатът

Накрая можем да съхраним прогнозата като делта етикет в нашия UC или да ги публикуваме в системи надолу по веригата Mongo DB или Azure Cosmos DB. Отиваме на първия вариант


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


Поточно предаване/онлайн извод

Сега си представете случай, в който искаме да актуализираме нашите препоръки въз основа на потребителски взаимодействия в реално време. За този случай можем да използваме сервиране на модели. Когато някой иска да използва вашия модел, той може да изпрати данни на сървъра. След това сървърът подава тези данни към вашия внедрен модел, който влиза в действие, анализира данните и генерира прогноза. Те могат да се използват в уеб приложения, мобилни приложения или дори вградени системи. Едно от приложенията на този подход е да се даде възможност за маршрутизиране на трафика за A/B тестване.


Алгоритъмът ALS не може да се използва директно за онлайн извод, тъй като изисква повторно обучение на модела, като се използват всички данни (стари + нови), за да се актуализират препоръките. Алгоритмите за обучение Gradient Descent са примери за модел, който може да се използва за онлайн актуализации. Може да разгледаме някои от тези алгоритми в бъдеща публикация.


Въпреки това, само за да илюстрираме как би работил такъв модел, ние създаваме (безполезен) модел, обслужващ крайна точка, която предвижда рейтинг на филм въз основа на това, когато потребителят оцени филм!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


Това ще създаде клъстер за сервиране на обяд, така че отнема известно време. Сега, ако отворите прозореца Serving , трябва да видите вашата крайна точка.


можем да използваме една крайна точка, за да обслужваме множество модели. След това можем да използваме маршрутизиране на трафика за сценарии като A/B тестване или да сравним производителността на различните модели в производството.

Таблица за изводи

Таблиците за изводи в Databricks Model Serving действат като автоматичен журнал за нашите внедрени модели. Когато са активирани, те улавят входящи заявки (данни, изпратени за прогнозиране), съответните изходи на модела (прогнози) и някои други метаданни като делта таблица в Unity Catalog. Можем да използваме таблица за изводи за наблюдение и отстраняване на грешки , проследяване на родословието и процедура за събиране на данни за повторно обучение или фина настройка на нашите модели.


Можем да активираме inference table на нашата обслужваща крайна точка, за да наблюдаваме модела. Можем да го направим, като посочим свойствата auto_capture_config в полезния товар, когато за първи път създадем крайната точка. Или актуализираме нашата крайна точка след това, като използваме командата put и URL адреса на крайната точка config , както следва (повече тук )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


сега нека захраним крайната точка с някои фиктивни данни за взаимодействие с потребителя

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


Можем да проверим регистрационните файлове на крайната точка в таблицата <catalog>.<schema>.<payload_table> . Отнема около 10 минути, докато можете да видите данните в таблицата.


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


трябва да видите нещо подобно на вашата таблица с полезен товар

Databricks модел, обслужващ таблица с полезен товар


За да разберете схемата на тази таблица за изводи, проверете „Схема на таблицата за изводи на Unity каталог==“ тук .==


Мониторинг на модела

Мониторинг на модели и данни - сложна тема, която изисква много време за овладяване. Databricks Lakehouse Monitoring (DLM) намалява режийните разходи за изграждане на подходяща система за наблюдение, като предоставя стандартни и адаптивни шаблони за обичайни случаи на употреба. Въпреки това, овладяването на DLM и мониторинга на модела като цяло изисква много експерименти. Тук не искам да ви давам обширен преглед на мониторинга на модела, а по-скоро да ви дам отправна точка. Може би в бъдеще ще посветя блог на тази тема.


Кратко резюме на функциите и характеристиките на DLM

Сега, след като нашият модел е готов и работи, можем да използваме таблица за изводи, генерирана от нашата обслужваща крайна точка, за да наблюдаваме ключови показатели като производителност на модела и дрейф, за да открием всякакви отклонения или аномалии в нашите данни или модел с течение на времето. Този проактивен подход ни помага да предприемем навременни коригиращи действия, като преобучение на модела или актуализиране на неговите функции, за да поддържаме оптимална производителност и съответствие с бизнес целите.


Databricks Lakehouse Monitoring Data Architecture източник: Databricks


DLM предоставя три типа анализ или profile type : Времеви серии , Моментна снимка и Извод . Тъй като се интересуваме от анализа на нашата таблица за изводи, ние се фокусираме върху последната. За да използваме таблица за наблюдение - нашата „ основна таблица “, трябва да се уверим, че таблицата има правилната структура. За таблицата за изводи всеки ред трябва да съответства на заявка със следните колони:

  • характеристики на модела

  • прогнозиране на модела

  • ID на модела

  • клеймо за време : клеймо за време на заявката за извод

  • основна истина (по избор)


Идентификационният номер на модела е важен за случаите, когато обслужваме множество модели и искаме да проследим ефективността на всеки модел в едно табло за наблюдение. Ако има повече от един наличен идентификатор на модел, DLM го използва за разделяне на данните и изчисляване на показатели и статики за всеки сегмент поотделно.


DLM изчислява всяка статистика и показател за определен интервал от време. За анализ на изводи той използва колоната с времеви клейма плюс дефиниран от потребителя размер на прозореца за идентифициране на времевите прозорци. повече по-долу.


DLM поддържа два problem type за таблици за изводи: „ класификация “ или „ регресия “. Той изчислява някои от съответните показатели и статистики въз основа на тази спецификация.


За да използваме DLM, трябва да създадем монитор и да го прикрепим към маса. Когато правим това, DLM създава две metric tables :

  • таблица с показатели на профила : тази таблица съдържа обобщени статистически данни като мин., макс., процент на нула и нули. Той също така съдържа допълнителни показатели въз основа на типа проблем, дефиниран от потребителя. Например precision , recall и f1_score за класификационните модели и mean_squared_error и mean_average_error за регресионните модели.

  • таблица с показатели за отклонение : тя съдържа статистически данни, които измерват как разпределението на данните се е променило във времето или спрямо базова стойност (ако е предоставена) . Той изчислява мерки като хи-квадрат тест, KS тест.


за да видите списъка с пълните показатели за всяка таблица, проверете страницата с документация на таблицата с показатели на монитора . Възможно е също така да създадете персонализирани показатели .


Важен аспект от изграждането на система за наблюдение е да се уверим, че нашето табло за наблюдение има достъп до най-новите данни за изводи, когато пристигнат. За това можем да използваме поточно предаване на делта таблица , за да следим обработените редове в таблицата за изводи. Ние използваме таблицата за изводи на модела, обслужваща като нашата изходна таблица ( readStream ), а таблицата за наблюдение като таблица приемник ( writeStream ). Ние също така се уверяваме, че записването на данни за промяна (CDC) е активирано и в двете таблици (то е активирано по подразбиране в таблицата за изводи). По този начин ние обработваме само промени - вмъкване/актуализиране/изтриване - в изходната таблица, вместо да обработваме отново цялата таблица при всяко опресняване.

Практически

За да активираме наблюдението върху нашата таблица за изводи, предприемаме следните стъпки:

  1. Прочетете таблицата за изводи като таблица за поточно предаване
  2. Създайте нова делта таблица с правилната схема, като разопаковате таблицата за изводи, която е генерирана от нашата крайна точка, обслужваща модела.
  3. Подгответе базовата таблица (ако има такава)
  4. Създайте монитор върху получената таблица и опреснете показателя
  5. Планирайте работен поток, за да разопаковате таблицата за изводи в правилната структура и да опресните показателите


Първо трябва да инсталираме API за наблюдение на Lakehouse. Трябва вече да е инсталиран, ако използвате Databricks rum time 15.3 LTS и по-нови:


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


Нека прочетем таблицата за изводи като таблица за поточно предаване

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


След това трябва да поставим таблицата в правилния формат, както е описано по-горе. Тази таблица трябва да има един ред за всяка прогноза със съответните характеристики и стойност на прогнозата. Таблицата за изводи, която получаваме от крайната точка, обслужваща модела, съхранява заявките и отговорите на крайната точка като вложен JSON формат. Ето пример за JSON полезен товар за колоната за заявка и отговор.

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


За да разопаковаме тази таблица в правилната схема, можем да използваме следния код, който е адаптиран от документацията на Databricks ( Inference table Lakehouse Monitoring starter notebook ).


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


Получената таблица ще изглежда така:

Таблицата с полезен товар е разопакована

След това трябва да инициализираме нашата таблица за мивка

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


и напишете резултатите

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


Накрая създаваме нашата базова таблица. DLM използва тази таблица, за да изчисли отклоненията чрез сравняване на разпределението на подобни колони на базови и първични модели. Базовата таблица трябва да има същата колона с характеристики като основната колона, както и същата колона за идентификация на модела. За базова таблица използваме таблицата за прогнозиране на нашия набор от данни за валидиране , който съхраняваме по-рано, след като сме обучили нашия модел, използвайки най-добрия хиперпараметър. За да изчисли показателя за отклонение, Databricks изчислява показателите на профила както за основната, така и за базовата таблица. Тук можете да прочетете за основната таблица и базовата таблица .


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


Сега сме готови да създадем нашето табло за наблюдение. Можем да го направим или с помощта на потребителския интерфейс или API за наблюдение на Lakehouse. Тук използваме втория вариант:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


след като стартираме кода, отнема известно време, докато Databricks изчисли всички показатели. За да видите таблото за управление, отидете в раздела Quality на вашата таблица за приемане (т.е. unpacked_requests_table_name ). Трябва да видите страница, както следва.

Изглед за наблюдение на модел Databricks


Ако щракнете върху refresh history ще видите вашите текущи, чакащи и минали опреснявания. щракнете върху View Dashboard за да отворите таблото си за управление.

Табло за наблюдение на модел Databricks



така че започваме с таблицата за изводи ( my_endpoint_payload ), обработваме я и запазваме резултата в my_endpoint_payload_unpacked и предаваме тази таблица заедно с нашата базова таблица ( base_table_als ) към нашия API за наблюдение. DLM изчислява показателите на профила за всяка таблица ( my_endpoint_payload_unpacked_profile_metric ) и ги използва за изчисляване на показателите за отклонение ( my_endpoint_payload_unpacked_drift_metrics )


Ето го! имате всичко необходимо за обслужване и наблюдение на вашия модел!


В следващата част ще ви покажа как да автоматизирате този процес с помощта на Databricks Assets Bundle и Gitlab !

L O A D I N G
. . . comments & more!

About Author

Mohsen Jadidi HackerNoon profile picture
Mohsen Jadidi@neshom
tech, culture, economy and collective work

ЗАКАЧВАЙТЕ ЕТИКЕТИ

ТАЗИ СТАТИЯ Е ПРЕДСТАВЕНА В...