ໃນ ສ່ວນທໍາອິດຂອງຊຸດການສອນນີ້ , ພວກເຮົາໄດ້ເອົາຂັ້ນຕອນທໍາອິດສໍາລັບການກໍ່ສ້າງທໍ່ສົ່ງ MLOps ໃນຕອນທ້າຍໂດຍໃຊ້ Databricks ແລະ Spark, ນໍາພາໂດຍໂຄງສ້າງອ້າງອີງຂອງ Databricks. ນີ້ແມ່ນສະຫຼຸບຂັ້ນຕອນທີ່ສຳຄັນທີ່ພວກເຮົາໄດ້ກວມເອົາ:
ການຕັ້ງຄ່າ Unity Catalog ສໍາລັບສະຖາປັດຕະຍະກໍາ Medallion : ພວກເຮົາໄດ້ຈັດວາງຂໍ້ມູນຂອງພວກເຮົາເປັນຊັ້ນທອງແດງ, ເງິນ, ແລະຄໍາພາຍໃນ Unity Catalog, ການສ້າງຕັ້ງລະບົບການຄຸ້ມຄອງຂໍ້ມູນທີ່ມີໂຄງສ້າງແລະປະສິດທິພາບ.
Ingesting Data into Unity Catalog : ພວກເຮົາໄດ້ສະແດງໃຫ້ເຫັນວິທີການນໍາເຂົ້າຂໍ້ມູນດິບເຂົ້າໄປໃນລະບົບ, ຮັບປະກັນຄວາມສອດຄ່ອງແລະຄຸນນະພາບສໍາລັບຂັ້ນຕອນການປຸງແຕ່ງຕໍ່ໄປ.
ການຝຶກອົບຮົມແບບຈໍາລອງ : ການນໍາໃຊ້ Databricks, ພວກເຮົາໄດ້ຝຶກອົບຮົມຮູບແບບການຮຽນຮູ້ເຄື່ອງຈັກທີ່ເຫມາະສົມກັບຊຸດຂໍ້ມູນຂອງພວກເຮົາ, ປະຕິບັດຕາມການປະຕິບັດທີ່ດີທີ່ສຸດສໍາລັບການພັດທະນາຕົວແບບທີ່ສາມາດຂະຫຍາຍໄດ້ແລະມີປະສິດທິພາບ.
Hyperparameter Tuning ກັບ HyperOpt : ເພື່ອເພີ່ມປະສິດຕິພາບຂອງຕົວແບບ, ພວກເຮົາໄດ້ຈ້າງ HyperOpt ເພື່ອອັດຕະໂນມັດການຄົ້ນຫາສໍາລັບ hyperparameters ທີ່ດີທີ່ສຸດ, ປັບປຸງຄວາມຖືກຕ້ອງແລະປະສິດທິພາບ.
ການຕິດຕາມການທົດລອງກັບ Databricks MLflow : ພວກເຮົາໄດ້ນໍາໃຊ້ MLflow ເພື່ອບັນທຶກແລະຕິດຕາມການທົດລອງຂອງພວກເຮົາ, ຮັກສາບັນທຶກສະບັບແບບຈໍາລອງ, metrics, ແລະຕົວກໍານົດການສໍາລັບການປຽບທຽບງ່າຍແລະການແຜ່ພັນ.
ດ້ວຍຂັ້ນຕອນພື້ນຖານເຫຼົ່ານີ້ສຳເລັດແລ້ວ, ດຽວນີ້ຕົວແບບຂອງທ່ານໄດ້ຖືກນຳມາໃຊ້ເພື່ອນຳໃຊ້. ໃນພາກທີສອງນີ້, ພວກເຮົາຈະສຸມໃສ່ການລວມເອົາສອງອົງປະກອບທີ່ສໍາຄັນເຂົ້າໄປໃນລະບົບຂອງພວກເຮົາ:
ໃຫ້ເຂົ້າໄປໃນມັນ!
ຈຸດອອກເດີນທາງຂອງ blog ສຸດທ້າຍແມ່ນການປະເມີນແບບຈໍາລອງ. ຕອນນີ້ຈິນຕະນາການວ່າພວກເຮົາໄດ້ເຮັດການປຽບທຽບແລະພົບວ່າຕົວແບບຂອງພວກເຮົາສະແດງໃຫ້ເຫັນການປະຕິບັດທີ່ສູງກວ່າເມື່ອທຽບກັບຮູບແບບການຜະລິດນີ້. ດັ່ງທີ່ພວກເຮົາຕ້ອງການ (ສົມມຸດ) ໃຊ້ຕົວແບບໃນການຜະລິດ, ພວກເຮົາຕ້ອງການໃຊ້ປະໂຫຍດຈາກຂໍ້ມູນທັງຫມົດທີ່ພວກເຮົາມີ. ຂັ້ນຕອນຕໍ່ໄປແມ່ນການຝຶກອົບຮົມແລະການທົດສອບຕົວແບບໂດຍໃຊ້ຊຸດຂໍ້ມູນເຕັມ. ຫຼັງຈາກນັ້ນ, ສືບຕໍ່ຕົວແບບຂອງພວກເຮົາສໍາລັບການນໍາໃຊ້ໃນພາຍຫລັງໂດຍການນໍາໃຊ້ມັນເປັນຕົວແບບແຊ້ມຂອງພວກເຮົາ. ເນື່ອງຈາກນີ້ແມ່ນຕົວແບບສຸດທ້າຍທີ່ເຮົາຕ້ອງການໃຊ້ສໍາລັບການສະຫຼຸບ, ພວກເຮົາໃຊ້ຄຸນສົມບັດວິສະວະກໍາລູກຄ້າເພື່ອຝຶກອົບຮົມແບບຈໍາລອງ. ດ້ວຍວິທີນີ້, ພວກເຮົາບໍ່ພຽງແຕ່ຕິດຕາມສາຍພັນຂອງຕົວແບບງ່າຍຂຶ້ນ, ແຕ່ຍັງເຮັດໃຫ້ການກວດສອບຄວາມຖືກຕ້ອງຂອງ schema ແລະການຫັນປ່ຽນຄຸນສົມບັດ (ຖ້າມີ) ໃຫ້ກັບລູກຄ້າ.
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
ພວກເຮົາຍັງສາມາດໃຊ້ Feature Store ຫຼື Feature Engineering APIs ເພື່ອຝຶກອົບຮົມ ແລະບັນທຶກຕົວແບບ
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
ເມື່ອພວກເຮົາໃຊ້ API ວິສະວະກໍາຄຸນນະສົມບັດ, ພວກເຮົາສາມາດເບິ່ງສາຍພັນຂອງຕົວແບບໃນ Catalog Explorer
ຕອນນີ້ໃຫ້ອັບເດດລາຍລະອຽດຂອງຕົວແບບ ແລະມອບໝາຍປ້າຍແຊ້ມໃຫ້ກັບມັນ.
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
ໃນປັດຈຸບັນສືບຕໍ່ເດີນຫນ້າແລະກວດເບິ່ງ schema ທີ່ທ່ານລົງທະບຽນຮູບແບບ. ທ່ານຄວນເບິ່ງການປັບປຸງຂອງທ່ານທັງຫມົດດັ່ງຕໍ່ໄປນີ້
ຂັ້ນຕອນຂອງຕົວແບບ : ຖ້າຫາກວ່າທ່ານນໍາໃຊ້ພື້ນທີ່ເຮັດວຽກສໍາລັບການຈົດທະບຽນຕົວແບບທີ່ທ່ານຄວນຈະຂັ້ນຕອນການຄຸ້ມຄອງຕົວແບບຂອງທ່ານ. ການໃຊ້ນາມແຝງຈະບໍ່ເຮັດວຽກ. ກວດເບິ່ງ ທີ່ນີ້ ເພື່ອເບິ່ງວ່າມັນເຮັດວຽກແນວໃດ
ໃນປັດຈຸບັນຈິນຕະນາການພວກເຮົາຕ້ອງການທີ່ຈະນໍາໃຊ້ຕົວແບບຂອງພວກເຮົາໃນການຜະລິດສໍາລັບການ inference. ໃນຂັ້ນຕອນນີ້, ພວກເຮົາໂຫລດຮູບແບບແຊ້ມແລະນໍາໃຊ້ມັນເພື່ອສ້າງ 20 ຄໍາແນະນໍາຮູບເງົາສໍາລັບຜູ້ໃຊ້ແຕ່ລະຄົນ.
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
ແລະທ່ານສາມາດເບິ່ງພວກເຮົາໄດ້ນໍາໃຊ້ຂໍ້ມູນການຝຶກອົບຮົມດຽວກັນສໍາລັບການໃຫ້ຄະແນນ batch. ເຖິງແມ່ນວ່າໃນກໍລະນີຂອງລະບົບຄໍາແນະນໍາມັນເຮັດໃຫ້ຄວາມຮູ້ສຶກ, ໃນຄໍາຮ້ອງສະຫມັກສ່ວນໃຫຍ່ພວກເຮົາຕ້ອງການໃຊ້ຕົວແບບເພື່ອຄະແນນບາງຂໍ້ມູນທີ່ບໍ່ເຫັນ. ຕົວຢ່າງ, ການຖ່າຍຮູບຂອງເຈົ້າແມ່ນ Netflix ແລະຕ້ອງການປັບປຸງຄໍາແນະນໍາຂອງຜູ້ໃຊ້ໃນຕອນທ້າຍຂອງມື້ໂດຍອີງໃສ່ບັນຊີລາຍຊື່ທີ່ເບິ່ງໃຫມ່ຂອງພວກເຂົາ. ພວກເຮົາສາມາດຈັດຕາຕະລາງວຽກທີ່ດໍາເນີນການໃຫ້ຄະແນນ batch ໃນເວລາສະເພາະໃນຕອນທ້າຍຂອງມື້.
ໃນປັດຈຸບັນພວກເຮົາສາມາດສືບຕໍ່ເດີນຫນ້າແລະສ້າງຄໍາແນະນໍາສໍາລັບຜູ້ໃຊ້ແຕ່ລະຄົນ. ສໍາລັບນີ້ພວກເຮົາຊອກຫາ 20 ອັນດັບຫນຶ່ງຕໍ່ຜູ້ໃຊ້
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
ນີ້ແມ່ນວິທີທີ່ຜົນໄດ້ຮັບເບິ່ງຄືວ່າ
ໃນທີ່ສຸດພວກເຮົາສາມາດເກັບຮັກສາການຄາດຄະເນເປັນປ້າຍຊື່ delta ໃນ UC ຂອງພວກເຮົາຫຼືເຜີຍແຜ່ພວກມັນໃຫ້ກັບລະບົບລຸ່ມນ້ໍາ Mongo DB ຫຼື Azure Cosmos DB. ພວກເຮົາໄປກັບທາງເລືອກ firs
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
ຕອນນີ້ຈິນຕະນາການກໍລະນີທີ່ພວກເຮົາຕ້ອງການປັບປຸງຄໍາແນະນໍາຂອງພວກເຮົາໂດຍອີງໃສ່ການໂຕ້ຕອບຂອງຜູ້ໃຊ້ໃນເວລາຈິງ. ສໍາລັບກໍລະນີນີ້ພວກເຮົາສາມາດນໍາໃຊ້ການຮັບໃຊ້ແບບຈໍາລອງ. ເມື່ອໃຜຜູ້ຫນຶ່ງຕ້ອງການໃຊ້ຕົວແບບຂອງເຈົ້າ, ພວກເຂົາສາມາດສົ່ງຂໍ້ມູນໄປຫາເຄື່ອງແມ່ຂ່າຍ. ຫຼັງຈາກນັ້ນ, ເຊີບເວີຈະປ້ອນຂໍ້ມູນນັ້ນໄປຫາຕົວແບບທີ່ນຳໃຊ້ຂອງທ່ານ, ເຊິ່ງເຂົ້າສູ່ການປະຕິບັດ, ວິເຄາະຂໍ້ມູນ ແລະສ້າງການຄາດຄະເນ. ພວກເຂົາສາມາດຖືກໃຊ້ໃນແອັບພລິເຄຊັນເວັບ, ແອັບຯມືຖື, ຫຼືແມ້ກະທັ້ງລະບົບຝັງຕົວ. ຫນຶ່ງໃນຄໍາຮ້ອງສະຫມັກຂອງວິທີການນີ້ແມ່ນເພື່ອເຮັດໃຫ້ເສັ້ນທາງການຈະລາຈອນສໍາລັບການທົດສອບ A / B.
ALS algorithm ບໍ່ສາມາດຖືກນໍາໃຊ້ໂດຍກົງສໍາລັບການ inference ອອນໄລນ໌ເນື່ອງຈາກວ່າມັນຮຽກຮ້ອງໃຫ້ມີການຝຶກອົບຮົມ retraining ຮູບແບບການນໍາໃຊ້ຂໍ້ມູນທັງຫມົດ (ເກົ່າ + ໃຫມ່) ເພື່ອປັບປຸງຄໍາແນະນໍາ. Gradient Descent algorithms ແມ່ນຕົວຢ່າງຂອງຕົວແບບທີ່ສາມາດໃຊ້ສໍາລັບການອັບເດດອອນໄລນ໌. ພວກເຮົາອາດຈະເບິ່ງບາງ algorithms ເຫຼົ່ານີ້ໃນການຕອບໃນອະນາຄົດ.
ຢ່າງໃດກໍຕາມ, ພຽງແຕ່ເພື່ອສະແດງໃຫ້ເຫັນວິທີການດັ່ງກ່າວຈະເຮັດວຽກ, ພວກເຮົາສ້າງຮູບແບບ (ບໍ່ມີປະໂຫຍດ) ໃຫ້ບໍລິການຈຸດສິ້ນສຸດທີ່ຄາດຄະເນການຈັດອັນດັບຮູບເງົາໂດຍອີງໃສ່ທຸກຄັ້ງທີ່ຜູ້ໃຊ້ອັດຕາຮູບເງົາ!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
ນີ້ຈະສ້າງແລະອາຫານທ່ຽງແບບຈໍາລອງໃຫ້ບໍລິການສໍາລັບພວກເຮົາດັ່ງນັ້ນມັນໃຊ້ເວລາບາງເວລາ. ຕອນນີ້ຖ້າທ່ານເປີດປ່ອງຢ້ຽມ Serving
, ທ່ານຄວນເຫັນຈຸດສິ້ນສຸດຂອງທ່ານ.
ພວກເຮົາສາມາດນໍາໃຊ້ endpoint ຫນຶ່ງເພື່ອຮັບໃຊ້ຫຼາຍຮູບແບບ. ຫຼັງຈາກນັ້ນ, ພວກເຮົາສາມາດນໍາໃຊ້ເສັ້ນທາງການຈະລາຈອນສໍາລັບສະຖານະການເຊັ່ນ: ການທົດສອບ A / B ຫຼືປຽບທຽບການປະຕິບັດຂອງຕົວແບບທີ່ແຕກຕ່າງກັນໃນການຜະລິດ.
ຕາຕະລາງ Inference ໃນ Databricks Model Serving ເຮັດຫນ້າທີ່ເປັນບັນທຶກອັດຕະໂນມັດສໍາລັບແບບຈໍາລອງຂອງພວກເຮົາ. ເມື່ອເປີດໃຊ້, ພວກມັນບັນທຶກຄໍາຮ້ອງຂໍທີ່ເຂົ້າມາ (ຂໍ້ມູນທີ່ສົ່ງສໍາລັບການຄາດຄະເນ), ຜົນໄດ້ຮັບຂອງຕົວແບບທີ່ສອດຄ້ອງກັນ (ການຄາດເດົາ), ແລະບາງ metadata ອື່ນໆເປັນຕາຕະລາງ Delta ພາຍໃນ Unity Catalog. ພວກເຮົາສາມາດນໍາໃຊ້ຕາຕະລາງ inference ສໍາລັບ ການຕິດຕາມແລະ debugging , ການຕິດຕາມເຊື້ອສາຍ , ແລະຂັ້ນຕອນການເກັບກໍາຂໍ້ມູນສໍາລັບ ການ retraining ຫຼື ປັບປັບຕົວ ແບບຂອງພວກເຮົາ.
ພວກເຮົາສາມາດເປີດໃຊ້ inference table
ໃນຈຸດສິ້ນສຸດການຮັບໃຊ້ຂອງພວກເຮົາເພື່ອຕິດຕາມຕົວແບບ. ພວກເຮົາສາມາດເຮັດໄດ້ໂດຍການລະບຸຄຸນສົມບັດ auto_capture_config
ໃນ payload ເມື່ອພວກເຮົາສ້າງຈຸດສິ້ນສຸດຄັ້ງທໍາອິດ. ຫຼືພວກເຮົາປັບປຸງຈຸດສິ້ນສຸດຂອງພວກເຮົາຫຼັງຈາກນັ້ນໂດຍໃຊ້ຄໍາສັ່ງ put
ແລະ config
endpoint URL ດັ່ງຕໍ່ໄປນີ້ (ເພີ່ມເຕີມ ທີ່ນີ້ )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
ຕອນນີ້ໃຫ້ອາຫານຈຸດສິ້ນສຸດດ້ວຍຂໍ້ມູນການໂຕ້ຕອບຂອງຜູ້ໃຊ້ dummy ບາງອັນ
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
ພວກເຮົາສາມາດກວດເບິ່ງບັນທຶກຈຸດສິ້ນສຸດໃນຕາຕະລາງ <catalog>.<schema>.<payload_table>
. ມັນໃຊ້ເວລາປະມານ 10 ນາທີຈົນກ່ວາທ່ານສາມາດເບິ່ງຂໍ້ມູນໃນຕາຕະລາງ.
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
ທ່ານຄວນເຫັນບາງສິ່ງບາງຢ່າງເຊັ່ນນີ້ຕາຕະລາງ payload ຂອງທ່ານ
ເພື່ອເຂົ້າໃຈ schema ຂອງຕາຕະລາງ inference ນີ້, ກວດເບິ່ງ "Unity catalog inference table schema ==" ທີ່ນີ້ .==
ແບບຈໍາລອງແລະຂໍ້ມູນການຕິດຕາມຫົວຂໍ້ທີ່ສັບສົນທີ່ຕ້ອງໃຊ້ເວລາຫຼາຍເພື່ອແມ່ບົດ. Databricks Lakehouse Monitoring (DLM) ຫຼຸດຜ່ອນສ່ວນເກີນຂອງການສ້າງລະບົບການຕິດຕາມທີ່ເຫມາະສົມໂດຍການໃຫ້ແມ່ແບບມາດຕະຖານແລະສາມາດປັບແຕ່ງໄດ້ສໍາລັບກໍລະນີທີ່ໃຊ້ທົ່ວໄປ. ແນວໃດກໍ່ຕາມ, ການສ້າງຕົ້ນສະບັບ DLM ແລະການຕິດຕາມແບບຈໍາລອງໂດຍທົ່ວໄປຮຽກຮ້ອງໃຫ້ມີການທົດລອງຫຼາຍ. ຂ້າພະເຈົ້າບໍ່ຕ້ອງການໃຫ້ທ່ານເຫັນພາບລວມຢ່າງກວ້າງຂວາງຂອງການຕິດຕາມຕົວແບບຢູ່ທີ່ນີ້, ແຕ່ແທນທີ່ຈະໃຫ້ທ່ານເປັນຈຸດເລີ່ມຕົ້ນ. ຂ້ອຍອາດຈະອຸທິດ blog ໃຫ້ກັບຫົວຂໍ້ນີ້ໃນອະນາຄົດ.
ສະຫຼຸບຫຍໍ້ຂອງການເຮັດວຽກ ແລະຄຸນສົມບັດຂອງ DLM
ໃນປັດຈຸບັນທີ່ພວກເຮົາມີຕົວແບບຂອງພວກເຮົາແລະດໍາເນີນການ, ພວກເຮົາສາມາດນໍາໃຊ້ຕາຕະລາງ inference ທີ່ສ້າງຂຶ້ນໂດຍຈຸດສິ້ນສຸດການຮັບໃຊ້ຂອງພວກເຮົາເພື່ອຕິດຕາມຕົວຊີ້ບອກທີ່ສໍາຄັນເຊັ່ນການປະຕິບັດຕົວແບບແລະການລອຍຕົວເພື່ອກວດພົບຄວາມແຕກແຍກຫຼືຄວາມຜິດປົກກະຕິໃນຂໍ້ມູນຫຼືຕົວແບບຂອງພວກເຮົາໃນໄລຍະເວລາ. ວິທີການທີ່ຫ້າວຫັນນີ້ຊ່ວຍໃຫ້ພວກເຮົາດໍາເນີນການແກ້ໄຂໄດ້ທັນເວລາ, ເຊັ່ນ: ການຝຶກອົບຮົມແບບຈໍາລອງຫຼືການປັບປຸງລັກສະນະຂອງຕົນ, ເພື່ອຮັກສາປະສິດທິພາບທີ່ດີທີ່ສຸດແລະສອດຄ່ອງກັບຈຸດປະສົງທາງທຸລະກິດ.
DLM ສະຫນອງສາມປະເພດຂອງການວິເຄາະຫຼື profile type
: ຊຸດເວລາ , Snapshot ແລະ Inference . ເນື່ອງຈາກພວກເຮົາມີຄວາມສົນໃຈໃນການວິເຄາະຕາຕະລາງ inference ຂອງພວກເຮົາ, ພວກເຮົາສຸມໃສ່ອັນສຸດທ້າຍ. ການນໍາໃຊ້ຕາຕະລາງສໍາລັບການຕິດຕາມກວດກາ - " ຕາຕະລາງຕົ້ນຕໍ " ຂອງພວກເຮົາ, ພວກເຮົາຄວນໃຫ້ແນ່ໃຈວ່າຕາຕະລາງມີໂຄງສ້າງທີ່ຖືກຕ້ອງ. ສໍາລັບ ຕາຕະລາງ inference , ແຕ່ລະແຖວຄວນສອດຄ່ອງກັບຄໍາຮ້ອງຂໍທີ່ມີຖັນຕໍ່ໄປນີ້:
ລັກສະນະແບບຈໍາລອງ
ການຄາດຄະເນຕົວແບບ
ID ຕົວແບບ
timestamp : timestamp ຂອງຄໍາຮ້ອງຂໍ inference
ຄວາມຈິງພື້ນຖານ (ທາງເລືອກ)
id ຕົວແບບ ແມ່ນສໍາຄັນສໍາລັບກໍລະນີທີ່ພວກເຮົາໃຫ້ບໍລິການຫຼາຍແບບແລະພວກເຮົາຕ້ອງການຕິດຕາມການປະຕິບັດຂອງແຕ່ລະແບບໃນ dashboard ຕິດຕາມກວດກາ. ຖ້າມີຫຼາຍກວ່າຫນຶ່ງ id ຕົວແບບທີ່ມີຢູ່, DLM ໃຊ້ມັນເພື່ອຕັດຂໍ້ມູນ ແລະຄໍານວນ metrics ແລະ statics ສໍາລັບແຕ່ລະ slice ແຍກຕ່າງຫາກ.
DLM ຄິດໄລ່ແຕ່ລະສະຖິຕິ ແລະ metrics ສໍາລັບຊ່ວງເວລາທີ່ກໍານົດໄວ້. ສໍາລັບການວິເຄາະ inference, ມັນໄດ້ນໍາໃຊ້ຖັນ ສະແຕມເວລາ , ບວກກັບຂະຫນາດປ່ອງຢ້ຽມທີ່ຜູ້ໃຊ້ກໍານົດເພື່ອກໍານົດເວລາປ່ອງຢ້ຽມ. ເພີ່ມເຕີມຂ້າງລຸ່ມນີ້.
DLM ສະຫນັບສະຫນູນສອງ problem type
ສໍາລັບຕາຕະລາງ inference: " ການຈັດປະເພດ " ຫຼື " regression ". ມັນຄິດໄລ່ບາງຕົວຊີ້ບອກ ແລະສະຖິຕິທີ່ກ່ຽວຂ້ອງໂດຍອີງໃສ່ຂໍ້ມູນສະເພາະນີ້.
ເພື່ອໃຊ້ DLM, ພວກເຮົາຄວນສ້າງຈໍພາບແລະຕິດມັນກັບຕາຕະລາງ. ເມື່ອພວກເຮົາເຮັດ DLM ນີ້ສ້າງ metric tables
ສອງອັນ:
ຕາຕະລາງການວັດແທກໂປຣໄຟລ໌ : ຕາຕະລາງນີ້ມີສະຖິຕິສະຫຼຸບເຊັ່ນ: min, max, ເປີເຊັນຂອງ null ແລະ zeros. ມັນຍັງປະກອບດ້ວຍການວັດແທກເພີ່ມເຕີມໂດຍອີງໃສ່ປະເພດບັນຫາທີ່ກໍານົດໂດຍຜູ້ໃຊ້. ຕົວຢ່າງ ຄວາມແມ່ນຍໍາ , recall ແລະ f1_score ສໍາລັບຮູບແບບການຈັດປະເພດ, ແລະ mean_squared_error ແລະ mean_average_error ສໍາລັບແບບຈໍາລອງການຖົດຖອຍ.
ຕາຕະລາງ drift metric : ມັນປະກອບດ້ວຍສະຖິຕິທີ່ວັດແທກວິທີການແຈກຢາຍຂໍ້ມູນມີການປ່ຽນແປງ ໃນໄລຍະເວລາ ຫຼືທຽບກັບ ຄ່າພື້ນຖານ (ຖ້າສະຫນອງໃຫ້) . ມັນຄິດໄລ່ມາດຕະການເຊັ່ນການທົດສອບ Chi-square, ການທົດສອບ KS.
ເພື່ອເບິ່ງລາຍຊື່ຕົວວັດແທກທີ່ສົມບູນສໍາລັບແຕ່ລະຕາຕະລາງ, ໃຫ້ກວດເບິ່ງຫນ້າເອກະສານ ຕາຕະລາງ metric . ມັນເປັນໄປໄດ້ທີ່ຈະສ້າງ ຕົວວັດແທກທີ່ກໍາຫນົດເອງ .
ລັກສະນະທີ່ສໍາຄັນຂອງການສ້າງລະບົບການຕິດຕາມແມ່ນເພື່ອໃຫ້ແນ່ໃຈວ່າ dashboard ຕິດຕາມກວດກາຂອງພວກເຮົາມີການເຂົ້າເຖິງຂໍ້ມູນ inference ຫລ້າສຸດເມື່ອພວກເຂົາມາຮອດ. ສໍາລັບວ່າພວກເຮົາສາມາດນໍາໃຊ້ Delta table streaming ຕິດຕາມແຖວທີ່ຖືກປຸງແຕ່ງໃນຕາຕະລາງ inference. ພວກເຮົານໍາໃຊ້ຕາຕະລາງ inference ຂອງຕົວແບບການບໍລິການເປັນຕາຕະລາງແຫຼ່ງຂອງພວກເຮົາ ( readStream
), ແລະຕາຕະລາງການຕິດຕາມກວດກາເປັນຕາຕະລາງ sink ( writeStream
). ພວກເຮົາຍັງໃຫ້ແນ່ໃຈວ່າ ການຈັບຂໍ້ມູນການປ່ຽນແປງ (CDC) ຖືກເປີດໃຊ້ຢູ່ໃນຕາຕະລາງທັງສອງ (ມັນຖືກເປີດໃຊ້ໂດຍຄ່າເລີ່ມຕົ້ນໃນຕາຕະລາງ Inference). ວິທີນີ້ພວກເຮົາປະມວນຜົນພຽງແຕ່ການປ່ຽນແປງ - ໃສ່ / ປັບປຸງ / ລົບ - ໃນຕາຕະລາງແຫຼ່ງແທນທີ່ຈະກ່ວາການປຸງແຕ່ງຕາຕະລາງທັງຫມົດຄືນໃຫມ່ທຸກໆຄັ້ງ.
ເພື່ອເຮັດໃຫ້ການຕິດຕາມກວດກາຕາຕະລາງ inference ຂອງພວກເຮົາ, ພວກເຮົາດໍາເນີນຂັ້ນຕອນດັ່ງຕໍ່ໄປນີ້:
ທໍາອິດພວກເຮົາຈໍາເປັນຕ້ອງໄດ້ຕິດຕັ້ງ Lakehouse Monitoring API. ມັນຄວນຈະຖືກຕິດຕັ້ງແລ້ວຖ້າທ່ານໃຊ້ Databricks rum time 15.3 LTS ແລະຂ້າງເທິງ:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
ໃຫ້ອ່ານຕາຕະລາງ inference ເປັນຕາຕະລາງ streaming
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
ຕໍ່ໄປພວກເຮົາຕ້ອງວາງຕາຕະລາງໃນຮູບແບບທີ່ຖືກຕ້ອງຕາມທີ່ໄດ້ອະທິບາຍຂ້າງເທິງ. ຕາຕະລາງນີ້ຄວນຈະມີແຖວຫນຶ່ງສໍາລັບແຕ່ລະການຄາດຄະເນທີ່ມີຄຸນສົມບັດທີ່ກ່ຽວຂ້ອງແລະມູນຄ່າການຄາດຄະເນ. ຕາຕະລາງ inference ທີ່ພວກເຮົາໄດ້ຮັບຈາກຮູບແບບການໃຫ້ບໍລິການ endpoint, ເກັບຮັກສາຄໍາຮ້ອງຂໍ endpoint ແລະການຕອບສະຫນອງເປັນຮູບແບບ JSON ຊ້ອນ. ນີ້ແມ່ນຕົວຢ່າງຂອງ JSON payload ສໍາລັບຄໍລໍາການຮ້ອງຂໍແລະການຕອບສະຫນອງ.
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
ເພື່ອ unpack ຕາຕະລາງນີ້ໄປຫາ schema ທີ່ຖືກຕ້ອງ, ພວກເຮົາສາມາດນໍາໃຊ້ລະຫັດຕໍ່ໄປນີ້ທີ່ດັດແປງມາຈາກເອກະສານ Databricks ( ຕາຕະລາງການອ້າງອີງ Lakehouse Monitoring starter notebook ).
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
ຕາຕະລາງຜົນໄດ້ຮັບຈະມີລັກສະນະນີ້:
ຕໍ່ໄປ, ພວກເຮົາຄວນຈະເລີ່ມຕົ້ນຕາຕະລາງການຫລົ້ມຈົມຂອງພວກເຮົາ
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
ແລະຂຽນຜົນໄດ້ຮັບ
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
ສຸດທ້າຍ, ພວກເຮົາສ້າງຕາຕະລາງພື້ນຖານຂອງພວກເຮົາ. DLM ໃຊ້ຕາຕະລາງນີ້ເພື່ອຄິດໄລ່ drifts ໂດຍການປຽບທຽບການແຈກຢາຍຂອງຖັນທີ່ຄ້າຍຄືກັນຂອງພື້ນຖານແລະຕົວແບບຕົ້ນຕໍ. ຕາຕະລາງພື້ນຖານຄວນມີຖັນລັກສະນະດຽວກັນກັບຖັນຕົ້ນຕໍເຊັ່ນດຽວກັນກັບຖັນການກໍານົດຕົວແບບດຽວກັນ. ສໍາລັບຕາຕະລາງພື້ນຖານພວກເຮົາໃຊ້ຕາຕະລາງການຄາດຄະເນຂອງ ຊຸດຂໍ້ມູນການກວດສອບ ຂອງພວກເຮົາທີ່ພວກເຮົາເກັບຮັກສາໄວ້ກ່ອນຫນ້ານີ້ຫຼັງຈາກທີ່ພວກເຮົາຝຶກອົບຮົມຕົວແບບຂອງພວກເຮົາໂດຍໃຊ້ hyperparameter ທີ່ດີທີ່ສຸດ. ເພື່ອຄິດໄລ່ການວັດແທກ drift, Databricks ຄິດໄລ່ຕົວຊີ້ບອກ profile ສໍາລັບທັງຕາຕະລາງຕົ້ນຕໍແລະພື້ນຖານ. ທີ່ນີ້ທ່ານສາມາດອ່ານກ່ຽວກັບ ຕາຕະລາງປະຖົມແລະຕາຕະລາງພື້ນຖານ .
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
ໃນປັດຈຸບັນພວກເຮົາກໍາລັງອ່ານເພື່ອສ້າງ dashboard ຕິດຕາມກວດກາຂອງພວກເຮົາ. ພວກເຮົາສາມາດເຮັດໄດ້ບໍ່ວ່າຈະໂດຍໃຊ້ UI ຫຼື Lakehouse Monitoring API. ນີ້ພວກເຮົາໃຊ້ທາງເລືອກທີສອງ:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
ຫຼັງຈາກທີ່ພວກເຮົາດໍາເນີນການລະຫັດມັນໃຊ້ເວລາບາງເວລາຈົນກ່ວາ Databricks ຄິດໄລ່ metric ທັງຫມົດ. ເພື່ອເບິ່ງ dashboard ໄປທີ່ແຖບ Quality
ຂອງຕາຕະລາງ sink ຂອງທ່ານ (ເຊັ່ນ: unpacked_requests_table_name
). ທ່ານຄວນເບິ່ງຫນ້າດັ່ງຕໍ່ໄປນີ້.
ຖ້າທ່ານຄລິກໃສ່ການເບິ່ງ refresh history
ທ່ານຈະເຫັນການແລ່ນ, ລໍຖ້າ ແລະການໂຫຼດຫນ້າຈໍຄືນທີ່ຜ່ານມາ. ໃຫ້ຄລິກໃສ່ View Dashboard
ເພື່ອເປີດ dashboard ຂອງທ່ານ.
ດັ່ງນັ້ນພວກເຮົາເລີ່ມຕົ້ນດ້ວຍຕາຕະລາງ inference ( my_endpoint_payload
), ປຸງແຕ່ງມັນແລະບັນທຶກຜົນໄດ້ຮັບໃສ່ my_endpoint_payload_unpacked
ແລະສົ່ງຕາຕະລາງນີ້ພ້ອມກັບຕາຕະລາງພື້ນຖານຂອງພວກເຮົາ ( base_table_als
) ໄປຫາ API ຂອງພວກເຮົາ. DLM ຄິດໄລ່ການວັດແທກໂປຣໄຟລ໌ສຳລັບແຕ່ລະຕາຕະລາງ ( my_endpoint_payload_unpacked_profile_metric
) ແລະໃຊ້ພວກມັນເພື່ອຄິດໄລ່ຕົວວັດແທກການລອຍຕົວ ( my_endpoint_payload_unpacked_drift_metrics
)
ເຈົ້າໄປເລີຍ! ເຈົ້າມີທຸກຢ່າງທີ່ເຈົ້າຕ້ອງການເພື່ອຮັບໃຊ້ ແລະຕິດຕາມເຈົ້າແບບ!
ໃນສ່ວນຕໍ່ໄປຂ້ອຍຈະສະແດງວິທີການອັດຕະໂນມັດຂະບວນການນີ້ໂດຍໃຊ້ Databricks Assets Bundle ແລະ Gitlab !
ໃນ ສ່ວນທໍາອິດຂອງຊຸດການສອນນີ້ , ພວກເຮົາໄດ້ເອົາຂັ້ນຕອນທໍາອິດສໍາລັບການກໍ່ສ້າງທໍ່ສົ່ງ MLOps ໃນຕອນທ້າຍໂດຍໃຊ້ Databricks ແລະ Spark, ນໍາພາໂດຍໂຄງສ້າງອ້າງອີງຂອງ Databricks. ນີ້ແມ່ນບົດສະຫຼຸບຂອງຂັ້ນຕອນສຳຄັນທີ່ພວກເຮົາໄດ້ກວມເອົາ:
ການຕັ້ງຄ່າ Unity Catalog ສໍາລັບສະຖາປັດຕະຍະກໍາ Medallion : ພວກເຮົາໄດ້ຈັດວາງຂໍ້ມູນຂອງພວກເຮົາເປັນຊັ້ນທອງແດງ, ເງິນ, ແລະຄໍາພາຍໃນ Unity Catalog, ການສ້າງຕັ້ງລະບົບການຄຸ້ມຄອງຂໍ້ມູນທີ່ມີໂຄງສ້າງແລະປະສິດທິພາບ.
Ingesting Data into Unity Catalog : ພວກເຮົາໄດ້ສະແດງໃຫ້ເຫັນວິທີການນໍາເຂົ້າຂໍ້ມູນດິບເຂົ້າໄປໃນລະບົບ, ຮັບປະກັນຄວາມສອດຄ່ອງແລະຄຸນນະພາບສໍາລັບຂັ້ນຕອນການປຸງແຕ່ງຕໍ່ໄປ.
ການຝຶກອົບຮົມແບບຈໍາລອງ : ການນໍາໃຊ້ Databricks, ພວກເຮົາໄດ້ຝຶກອົບຮົມຮູບແບບການຮຽນຮູ້ເຄື່ອງຈັກທີ່ເຫມາະສົມກັບຊຸດຂໍ້ມູນຂອງພວກເຮົາ, ປະຕິບັດຕາມການປະຕິບັດທີ່ດີທີ່ສຸດສໍາລັບການພັດທະນາຕົວແບບທີ່ສາມາດຂະຫຍາຍໄດ້ແລະມີປະສິດທິພາບ.
Hyperparameter Tuning ກັບ HyperOpt : ເພື່ອເພີ່ມປະສິດຕິພາບຂອງຕົວແບບ, ພວກເຮົາໄດ້ຈ້າງ HyperOpt ເພື່ອອັດຕະໂນມັດການຄົ້ນຫາສໍາລັບ hyperparameters ທີ່ດີທີ່ສຸດ, ປັບປຸງຄວາມຖືກຕ້ອງແລະປະສິດທິພາບ.
ການຕິດຕາມການທົດລອງກັບ Databricks MLflow : ພວກເຮົາໄດ້ນໍາໃຊ້ MLflow ເພື່ອບັນທຶກແລະຕິດຕາມການທົດລອງຂອງພວກເຮົາ, ຮັກສາບັນທຶກສະບັບແບບຈໍາລອງ, metrics, ແລະຕົວກໍານົດການສໍາລັບການປຽບທຽບງ່າຍແລະການແຜ່ພັນ.
ດ້ວຍຂັ້ນຕອນພື້ນຖານເຫຼົ່ານີ້ສຳເລັດແລ້ວ, ດຽວນີ້ຕົວແບບຂອງທ່ານໄດ້ຖືກນຳມາໃຊ້ເພື່ອນຳໃຊ້. ໃນພາກທີສອງນີ້, ພວກເຮົາຈະສຸມໃສ່ການລວມເອົາສອງອົງປະກອບທີ່ສໍາຄັນເຂົ້າໄປໃນລະບົບຂອງພວກເຮົາ:
ໃຫ້ເຂົ້າໄປໃນມັນ!
ຈຸດອອກເດີນທາງຂອງ blog ສຸດທ້າຍແມ່ນການປະເມີນແບບຈໍາລອງ. ຕອນນີ້ຈິນຕະນາການວ່າພວກເຮົາໄດ້ເຮັດການປຽບທຽບແລະພົບວ່າຕົວແບບຂອງພວກເຮົາສະແດງໃຫ້ເຫັນການປະຕິບັດທີ່ສູງກວ່າເມື່ອທຽບກັບຮູບແບບການຜະລິດນີ້. ດັ່ງທີ່ພວກເຮົາຕ້ອງການ (ສົມມຸດ) ໃຊ້ຕົວແບບໃນການຜະລິດ, ພວກເຮົາຕ້ອງການໃຊ້ປະໂຫຍດຈາກຂໍ້ມູນທັງຫມົດທີ່ພວກເຮົາມີ. ຂັ້ນຕອນຕໍ່ໄປແມ່ນການຝຶກອົບຮົມແລະການທົດສອບຕົວແບບໂດຍໃຊ້ຊຸດຂໍ້ມູນເຕັມ. ຫຼັງຈາກນັ້ນ, ສືບຕໍ່ຕົວແບບຂອງພວກເຮົາສໍາລັບການນໍາໃຊ້ໃນພາຍຫລັງໂດຍການນໍາໃຊ້ມັນເປັນຕົວແບບແຊ້ມຂອງພວກເຮົາ. ເນື່ອງຈາກນີ້ແມ່ນຕົວແບບສຸດທ້າຍທີ່ເຮົາຕ້ອງການໃຊ້ສໍາລັບການສະຫຼຸບ, ພວກເຮົາໃຊ້ຄຸນສົມບັດວິສະວະກໍາລູກຄ້າເພື່ອຝຶກອົບຮົມແບບຈໍາລອງ. ດ້ວຍວິທີນີ້, ພວກເຮົາບໍ່ພຽງແຕ່ຕິດຕາມສາຍພັນຂອງຕົວແບບງ່າຍຂຶ້ນ, ແຕ່ຍັງສົ່ງການກວດສອບຄວາມຖືກຕ້ອງຂອງ schema ແລະການຫັນປ່ຽນຄຸນສົມບັດ (ຖ້າມີ) ໃຫ້ກັບລູກຄ້າ.
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
ພວກເຮົາຍັງສາມາດໃຊ້ Feature Store ຫຼື Feature Engineering APIs ເພື່ອຝຶກອົບຮົມ ແລະບັນທຶກຕົວແບບ
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
ເມື່ອພວກເຮົາໃຊ້ API ວິສະວະກໍາຄຸນນະສົມບັດ, ພວກເຮົາສາມາດເບິ່ງສາຍພັນຂອງຕົວແບບໃນ Catalog Explorer
ຕອນນີ້ໃຫ້ອັບເດດລາຍລະອຽດຂອງຕົວແບບ ແລະມອບໝາຍປ້າຍແຊ້ມໃຫ້ກັບມັນ.
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
ໃນປັດຈຸບັນສືບຕໍ່ເດີນຫນ້າແລະກວດເບິ່ງ schema ທີ່ທ່ານລົງທະບຽນຮູບແບບ. ທ່ານຄວນເບິ່ງການປັບປຸງຂອງທ່ານທັງຫມົດດັ່ງຕໍ່ໄປນີ້
ຂັ້ນຕອນຂອງຕົວແບບ : ຖ້າຫາກວ່າທ່ານນໍາໃຊ້ພື້ນທີ່ເຮັດວຽກສໍາລັບການຈົດທະບຽນຕົວແບບທີ່ທ່ານຄວນຈະຂັ້ນຕອນການຄຸ້ມຄອງຕົວແບບຂອງທ່ານ. ການໃຊ້ນາມແຝງຈະບໍ່ເຮັດວຽກ. ກວດເບິ່ງ ທີ່ນີ້ ເພື່ອເບິ່ງວ່າມັນເຮັດວຽກແນວໃດ
ຕອນນີ້ຈິນຕະນາການວ່າພວກເຮົາຕ້ອງການໃຊ້ຕົວແບບຂອງພວກເຮົາໃນການຜະລິດສໍາລັບການ inference. ໃນຂັ້ນຕອນນີ້, ພວກເຮົາໂຫລດຮູບແບບແຊ້ມແລະນໍາໃຊ້ມັນເພື່ອສ້າງ 20 ຄໍາແນະນໍາຮູບເງົາສໍາລັບຜູ້ໃຊ້ແຕ່ລະຄົນ.
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
ແລະທ່ານສາມາດເບິ່ງພວກເຮົາໄດ້ນໍາໃຊ້ຂໍ້ມູນການຝຶກອົບຮົມດຽວກັນສໍາລັບການໃຫ້ຄະແນນ batch. ເຖິງແມ່ນວ່າໃນກໍລະນີຂອງລະບົບຄໍາແນະນໍາມັນເຮັດໃຫ້ຄວາມຮູ້ສຶກ, ໃນຄໍາຮ້ອງສະຫມັກສ່ວນໃຫຍ່ພວກເຮົາຕ້ອງການໃຊ້ຕົວແບບເພື່ອຄະແນນບາງຂໍ້ມູນທີ່ບໍ່ເຫັນ. ຕົວຢ່າງ, ການຖ່າຍຮູບຂອງເຈົ້າແມ່ນ Netflix ແລະຕ້ອງການປັບປຸງຄໍາແນະນໍາຂອງຜູ້ໃຊ້ໃນຕອນທ້າຍຂອງມື້ໂດຍອີງໃສ່ບັນຊີລາຍຊື່ທີ່ເບິ່ງໃຫມ່ຂອງພວກເຂົາ. ພວກເຮົາສາມາດຈັດຕາຕະລາງວຽກທີ່ດໍາເນີນການໃຫ້ຄະແນນ batch ໃນເວລາສະເພາະໃນຕອນທ້າຍຂອງມື້.
ໃນປັດຈຸບັນພວກເຮົາສາມາດສືບຕໍ່ເດີນຫນ້າແລະສ້າງຄໍາແນະນໍາສໍາລັບຜູ້ໃຊ້ແຕ່ລະຄົນ. ສໍາລັບນີ້ພວກເຮົາຊອກຫາ 20 ອັນດັບທໍາອິດຕໍ່ຜູ້ໃຊ້
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
ນີ້ແມ່ນວິທີທີ່ຜົນໄດ້ຮັບເບິ່ງຄືວ່າ
ໃນທີ່ສຸດພວກເຮົາສາມາດເກັບຮັກສາການຄາດຄະເນເປັນປ້າຍຊື່ delta ໃນ UC ຂອງພວກເຮົາຫຼືເຜີຍແຜ່ພວກມັນໃຫ້ກັບລະບົບລຸ່ມນ້ໍາ Mongo DB ຫຼື Azure Cosmos DB. ພວກເຮົາໄປກັບທາງເລືອກ firs
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
ຕອນນີ້ຈິນຕະນາການກໍລະນີທີ່ພວກເຮົາຕ້ອງການປັບປຸງຄໍາແນະນໍາຂອງພວກເຮົາໂດຍອີງໃສ່ການໂຕ້ຕອບຂອງຜູ້ໃຊ້ໃນເວລາຈິງ. ສໍາລັບກໍລະນີນີ້ພວກເຮົາສາມາດນໍາໃຊ້ການຮັບໃຊ້ແບບຈໍາລອງ. ເມື່ອຜູ້ໃດຜູ້ຫນຶ່ງຕ້ອງການໃຊ້ຕົວແບບຂອງເຈົ້າ, ພວກເຂົາສາມາດສົ່ງຂໍ້ມູນໄປຫາເຄື່ອງແມ່ຂ່າຍ. ຫຼັງຈາກນັ້ນ, ເຊີບເວີຈະປ້ອນຂໍ້ມູນນັ້ນໄປຫາຕົວແບບທີ່ນຳໃຊ້ຂອງທ່ານ, ເຊິ່ງເຂົ້າສູ່ການປະຕິບັດ, ວິເຄາະຂໍ້ມູນ ແລະສ້າງການຄາດຄະເນ. ພວກເຂົາສາມາດຖືກນໍາໃຊ້ໃນຄໍາຮ້ອງສະຫມັກເວັບ, ແອັບຯມືຖື, ຫຼືແມ້ກະທັ້ງລະບົບຝັງຕົວ. ຫນຶ່ງໃນຄໍາຮ້ອງສະຫມັກຂອງວິທີການນີ້ແມ່ນເພື່ອເຮັດໃຫ້ເສັ້ນທາງການຈະລາຈອນສໍາລັບການທົດສອບ A / B.
ALS algorithm ບໍ່ສາມາດຖືກນໍາໃຊ້ໂດຍກົງສໍາລັບການ inference ອອນໄລນ໌ເນື່ອງຈາກວ່າມັນຮຽກຮ້ອງໃຫ້ມີການຝຶກອົບຮົມ retraining ຮູບແບບການນໍາໃຊ້ຂໍ້ມູນທັງຫມົດ (ເກົ່າ + ໃຫມ່) ເພື່ອປັບປຸງຄໍາແນະນໍາ. Gradient Descent algorithms ແມ່ນຕົວຢ່າງຂອງຕົວແບບທີ່ສາມາດໃຊ້ສໍາລັບການອັບເດດອອນໄລນ໌. ພວກເຮົາອາດຈະເບິ່ງບາງ algorithms ເຫຼົ່ານີ້ໃນການຕອບໃນອະນາຄົດ.
ແນວໃດກໍ່ຕາມ, ພຽງແຕ່ເພື່ອສະແດງໃຫ້ເຫັນວ່າຮູບແບບດັ່ງກ່າວຈະເຮັດວຽກແນວໃດ, ພວກເຮົາກຳລັງສ້າງແບບຈໍາລອງ (ບໍ່ມີປະໂຫຍດ) ໃຫ້ບໍລິການຈຸດສິ້ນສຸດທີ່ຄາດຄະເນການຈັດອັນດັບຮູບເງົາໂດຍອີງໃສ່ທຸກຄັ້ງທີ່ຜູ້ໃຊ້ໃຫ້ອັດຕາຮູບເງົາ!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
ນີ້ຈະສ້າງແລະອາຫານທ່ຽງແບບຈໍາລອງໃຫ້ບໍລິການສໍາລັບພວກເຮົາດັ່ງນັ້ນມັນໃຊ້ເວລາບາງເວລາ. ຕອນນີ້ຖ້າທ່ານເປີດປ່ອງຢ້ຽມ Serving
, ທ່ານຄວນເຫັນຈຸດສິ້ນສຸດຂອງທ່ານ.
ພວກເຮົາສາມາດນໍາໃຊ້ endpoint ຫນຶ່ງເພື່ອຮັບໃຊ້ຫຼາຍຮູບແບບ. ຫຼັງຈາກນັ້ນ, ພວກເຮົາສາມາດນໍາໃຊ້ເສັ້ນທາງການຈະລາຈອນສໍາລັບສະຖານະການເຊັ່ນ: ການທົດສອບ A / B ຫຼືປຽບທຽບການປະຕິບັດຂອງຕົວແບບທີ່ແຕກຕ່າງກັນໃນການຜະລິດ.
ຕາຕະລາງ Inference ໃນ Databricks Model Serving ເຮັດຫນ້າທີ່ເປັນບັນທຶກອັດຕະໂນມັດສໍາລັບແບບຈໍາລອງຂອງພວກເຮົາ. ເມື່ອເປີດໃຊ້, ພວກມັນບັນທຶກຄໍາຮ້ອງຂໍທີ່ເຂົ້າມາ (ຂໍ້ມູນທີ່ສົ່ງສໍາລັບການຄາດຄະເນ), ຜົນໄດ້ຮັບຂອງຕົວແບບທີ່ສອດຄ້ອງກັນ (ການຄາດເດົາ), ແລະບາງ metadata ອື່ນໆເປັນຕາຕະລາງ Delta ພາຍໃນ Unity Catalog. ພວກເຮົາສາມາດນໍາໃຊ້ຕາຕະລາງ inference ສໍາລັບ ການຕິດຕາມແລະ debugging , ການຕິດຕາມເຊື້ອສາຍ , ແລະຂັ້ນຕອນການເກັບກໍາຂໍ້ມູນສໍາລັບ ການ retraining ຫຼື ປັບປັບຕົວ ແບບຂອງພວກເຮົາ.
ພວກເຮົາສາມາດເປີດໃຊ້ inference table
ໃນຈຸດສິ້ນສຸດການຮັບໃຊ້ຂອງພວກເຮົາເພື່ອຕິດຕາມຕົວແບບ. ພວກເຮົາສາມາດເຮັດໄດ້ໂດຍການລະບຸຄຸນສົມບັດ auto_capture_config
ໃນ payload ເມື່ອພວກເຮົາສ້າງຈຸດສິ້ນສຸດຄັ້ງທໍາອິດ. ຫຼືພວກເຮົາປັບປຸງຈຸດສິ້ນສຸດຂອງພວກເຮົາຫຼັງຈາກນັ້ນໂດຍໃຊ້ຄໍາສັ່ງ put
ແລະ config
endpoint URL ດັ່ງຕໍ່ໄປນີ້ (ເພີ່ມເຕີມ ທີ່ນີ້ )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
ຕອນນີ້ໃຫ້ອາຫານຈຸດສິ້ນສຸດດ້ວຍຂໍ້ມູນການໂຕ້ຕອບຂອງຜູ້ໃຊ້ dummy ບາງອັນ
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
ພວກເຮົາສາມາດກວດເບິ່ງບັນທຶກຈຸດສິ້ນສຸດໃນຕາຕະລາງ <catalog>.<schema>.<payload_table>
. ມັນໃຊ້ເວລາປະມານ 10 ນາທີຈົນກ່ວາທ່ານສາມາດເບິ່ງຂໍ້ມູນໃນຕາຕະລາງ.
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
ທ່ານຄວນເຫັນບາງສິ່ງບາງຢ່າງເຊັ່ນນີ້ຕາຕະລາງ payload ຂອງທ່ານ
ເພື່ອເຂົ້າໃຈ schema ຂອງຕາຕະລາງ inference ນີ້, ກວດເບິ່ງ "Unity catalog inference table schema ==" ທີ່ນີ້ .==
ແບບຈໍາລອງແລະຂໍ້ມູນການຕິດຕາມຫົວຂໍ້ທີ່ສັບສົນທີ່ຕ້ອງໃຊ້ເວລາຫຼາຍເພື່ອແມ່ບົດ. Databricks Lakehouse Monitoring (DLM) ຫຼຸດຜ່ອນສ່ວນເກີນຂອງການສ້າງລະບົບການຕິດຕາມທີ່ເຫມາະສົມໂດຍການໃຫ້ແມ່ແບບມາດຕະຖານແລະສາມາດປັບແຕ່ງໄດ້ສໍາລັບກໍລະນີທີ່ໃຊ້ທົ່ວໄປ. ແນວໃດກໍ່ຕາມ, ການສ້າງຕົ້ນສະບັບ DLM ແລະການຕິດຕາມແບບຈໍາລອງໂດຍທົ່ວໄປຮຽກຮ້ອງໃຫ້ມີການທົດລອງຫຼາຍ. ຂ້າພະເຈົ້າບໍ່ຕ້ອງການໃຫ້ທ່ານເຫັນພາບລວມຢ່າງກວ້າງຂວາງຂອງການຕິດຕາມຕົວແບບຢູ່ທີ່ນີ້, ແຕ່ແທນທີ່ຈະໃຫ້ທ່ານເປັນຈຸດເລີ່ມຕົ້ນ. ຂ້ອຍອາດຈະອຸທິດ blog ໃຫ້ກັບຫົວຂໍ້ນີ້ໃນອະນາຄົດ.
ສະຫຼຸບຫຍໍ້ຂອງການເຮັດວຽກ ແລະຄຸນສົມບັດຂອງ DLM
ໃນປັດຈຸບັນທີ່ພວກເຮົາມີຕົວແບບຂອງພວກເຮົາແລະດໍາເນີນການ, ພວກເຮົາສາມາດນໍາໃຊ້ຕາຕະລາງ inference ທີ່ສ້າງຂຶ້ນໂດຍຈຸດສິ້ນສຸດການຮັບໃຊ້ຂອງພວກເຮົາເພື່ອຕິດຕາມຕົວຊີ້ບອກທີ່ສໍາຄັນເຊັ່ນການປະຕິບັດຕົວແບບແລະການລອຍຕົວເພື່ອກວດພົບຄວາມແຕກແຍກຫຼືຄວາມຜິດປົກກະຕິໃນຂໍ້ມູນຫຼືຕົວແບບຂອງພວກເຮົາໃນໄລຍະເວລາ. ວິທີການທີ່ຫ້າວຫັນນີ້ຊ່ວຍໃຫ້ພວກເຮົາດໍາເນີນການແກ້ໄຂໄດ້ທັນເວລາ, ເຊັ່ນ: ການຝຶກອົບຮົມແບບຈໍາລອງຫຼືການປັບປຸງລັກສະນະຂອງຕົນ, ເພື່ອຮັກສາປະສິດທິພາບທີ່ດີທີ່ສຸດແລະສອດຄ່ອງກັບຈຸດປະສົງທາງທຸລະກິດ.
DLM ສະຫນອງສາມປະເພດຂອງການວິເຄາະຫຼື profile type
: ຊຸດເວລາ , Snapshot ແລະ Inference . ເນື່ອງຈາກພວກເຮົາມີຄວາມສົນໃຈໃນການວິເຄາະຕາຕະລາງ inference ຂອງພວກເຮົາ, ພວກເຮົາສຸມໃສ່ອັນສຸດທ້າຍ. ການນໍາໃຊ້ຕາຕະລາງສໍາລັບການຕິດຕາມກວດກາ - " ຕາຕະລາງຕົ້ນຕໍ " ຂອງພວກເຮົາ, ພວກເຮົາຄວນໃຫ້ແນ່ໃຈວ່າຕາຕະລາງມີໂຄງສ້າງທີ່ຖືກຕ້ອງ. ສໍາລັບ ຕາຕະລາງ inference , ແຕ່ລະແຖວຄວນສອດຄ່ອງກັບຄໍາຮ້ອງຂໍທີ່ມີຖັນຕໍ່ໄປນີ້:
ລັກສະນະແບບຈໍາລອງ
ການຄາດຄະເນຕົວແບບ
ID ຕົວແບບ
timestamp : timestamp ຂອງຄໍາຮ້ອງຂໍ inference
ຄວາມຈິງພື້ນຖານ (ທາງເລືອກ)
id ຕົວແບບ ແມ່ນສໍາຄັນສໍາລັບກໍລະນີທີ່ພວກເຮົາໃຫ້ບໍລິການຫຼາຍແບບແລະພວກເຮົາຕ້ອງການຕິດຕາມການປະຕິບັດຂອງແຕ່ລະແບບໃນ dashboard ຕິດຕາມກວດກາ. ຖ້າມີຫຼາຍກວ່າຫນຶ່ງ id ຕົວແບບທີ່ມີຢູ່, DLM ໃຊ້ມັນເພື່ອຕັດຂໍ້ມູນ ແລະຄໍານວນ metrics ແລະ statics ສໍາລັບແຕ່ລະ slice ແຍກຕ່າງຫາກ.
DLM ຄິດໄລ່ແຕ່ລະສະຖິຕິ ແລະ metrics ສໍາລັບຊ່ວງເວລາທີ່ກໍານົດໄວ້. ສໍາລັບການວິເຄາະ inference, ມັນໄດ້ນໍາໃຊ້ຖັນ ສະແຕມເວລາ , ບວກກັບຂະຫນາດປ່ອງຢ້ຽມທີ່ຜູ້ໃຊ້ກໍານົດເພື່ອກໍານົດເວລາປ່ອງຢ້ຽມ. ເພີ່ມເຕີມຂ້າງລຸ່ມນີ້.
DLM ສະຫນັບສະຫນູນສອງ problem type
ສໍາລັບຕາຕະລາງ inference: " ການຈັດປະເພດ " ຫຼື " regression ". ມັນຄິດໄລ່ບາງຕົວຊີ້ບອກ ແລະສະຖິຕິທີ່ກ່ຽວຂ້ອງໂດຍອີງໃສ່ຂໍ້ມູນສະເພາະນີ້.
ເພື່ອໃຊ້ DLM, ພວກເຮົາຄວນສ້າງຈໍພາບແລະຕິດມັນກັບຕາຕະລາງ. ເມື່ອພວກເຮົາເຮັດ DLM ນີ້ສ້າງ metric tables
ສອງອັນ:
ຕາຕະລາງການວັດແທກໂປຣໄຟລ໌ : ຕາຕະລາງນີ້ມີສະຖິຕິສະຫຼຸບເຊັ່ນ: min, max, ເປີເຊັນຂອງ null ແລະ zeros. ມັນຍັງປະກອບດ້ວຍການວັດແທກເພີ່ມເຕີມໂດຍອີງໃສ່ປະເພດບັນຫາທີ່ກໍານົດໂດຍຜູ້ໃຊ້. ຕົວຢ່າງ ຄວາມແມ່ນຍໍາ , recall ແລະ f1_score ສໍາລັບຮູບແບບການຈັດປະເພດ, ແລະ mean_squared_error ແລະ mean_average_error ສໍາລັບແບບຈໍາລອງການຖົດຖອຍ.
ຕາຕະລາງ drift metric : ມັນປະກອບດ້ວຍສະຖິຕິທີ່ວັດແທກວິທີການແຈກຢາຍຂໍ້ມູນມີການປ່ຽນແປງ ໃນໄລຍະເວລາ ຫຼືທຽບກັບ ຄ່າພື້ນຖານ (ຖ້າສະຫນອງໃຫ້) . ມັນຄິດໄລ່ມາດຕະການເຊັ່ນການທົດສອບ Chi-square, ການທົດສອບ KS.
ເພື່ອເບິ່ງລາຍຊື່ຕົວວັດແທກທີ່ສົມບູນສໍາລັບແຕ່ລະຕາຕະລາງ, ໃຫ້ກວດເບິ່ງຫນ້າເອກະສານ ຕາຕະລາງ metric . ມັນເປັນໄປໄດ້ທີ່ຈະສ້າງ ຕົວວັດແທກທີ່ກໍາຫນົດເອງ .
ລັກສະນະທີ່ສໍາຄັນຂອງການສ້າງລະບົບການຕິດຕາມແມ່ນເພື່ອໃຫ້ແນ່ໃຈວ່າ dashboard ຕິດຕາມກວດກາຂອງພວກເຮົາມີການເຂົ້າເຖິງຂໍ້ມູນ inference ຫລ້າສຸດເມື່ອພວກເຂົາມາຮອດ. ສໍາລັບວ່າພວກເຮົາສາມາດນໍາໃຊ້ Delta table streaming ຕິດຕາມແຖວທີ່ຖືກປຸງແຕ່ງໃນຕາຕະລາງ inference. ພວກເຮົານໍາໃຊ້ຕາຕະລາງ inference ຂອງຕົວແບບການບໍລິການເປັນຕາຕະລາງແຫຼ່ງຂອງພວກເຮົາ ( readStream
), ແລະຕາຕະລາງການຕິດຕາມກວດກາເປັນຕາຕະລາງ sink ( writeStream
). ພວກເຮົາຍັງໃຫ້ແນ່ໃຈວ່າ ການຈັບຂໍ້ມູນການປ່ຽນແປງ (CDC) ຖືກເປີດໃຊ້ຢູ່ໃນຕາຕະລາງທັງສອງ (ມັນຖືກເປີດໃຊ້ໂດຍຄ່າເລີ່ມຕົ້ນໃນຕາຕະລາງ Inference). ວິທີນີ້ພວກເຮົາປະມວນຜົນພຽງແຕ່ການປ່ຽນແປງ - ໃສ່ / ປັບປຸງ / ລົບ - ໃນຕາຕະລາງແຫຼ່ງແທນທີ່ຈະກ່ວາການປຸງແຕ່ງຕາຕະລາງທັງຫມົດຄືນໃຫມ່ທຸກໆຄັ້ງ.
ເພື່ອເຮັດໃຫ້ການຕິດຕາມກວດກາຕາຕະລາງ inference ຂອງພວກເຮົາ, ພວກເຮົາດໍາເນີນຂັ້ນຕອນດັ່ງຕໍ່ໄປນີ້:
ທໍາອິດພວກເຮົາຈໍາເປັນຕ້ອງໄດ້ຕິດຕັ້ງ Lakehouse Monitoring API. ມັນຄວນຈະຖືກຕິດຕັ້ງແລ້ວຖ້າທ່ານໃຊ້ Databricks rum time 15.3 LTS ແລະຂ້າງເທິງ:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
ໃຫ້ອ່ານຕາຕະລາງ inference ເປັນຕາຕະລາງ streaming
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
ຕໍ່ໄປພວກເຮົາຕ້ອງວາງຕາຕະລາງໃນຮູບແບບທີ່ຖືກຕ້ອງຕາມທີ່ໄດ້ອະທິບາຍຂ້າງເທິງ. ຕາຕະລາງນີ້ຄວນຈະມີແຖວຫນຶ່ງສໍາລັບແຕ່ລະການຄາດຄະເນທີ່ມີຄຸນສົມບັດທີ່ກ່ຽວຂ້ອງແລະມູນຄ່າການຄາດຄະເນ. ຕາຕະລາງ inference ທີ່ພວກເຮົາໄດ້ຮັບຈາກຮູບແບບການໃຫ້ບໍລິການ endpoint, ເກັບຮັກສາຄໍາຮ້ອງຂໍ endpoint ແລະການຕອບສະຫນອງເປັນຮູບແບບ JSON ຊ້ອນ. ນີ້ແມ່ນຕົວຢ່າງຂອງ JSON payload ສໍາລັບຄໍລໍາການຮ້ອງຂໍແລະການຕອບສະຫນອງ.
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
ເພື່ອ unpack ຕາຕະລາງນີ້ໄປຫາ schema ທີ່ຖືກຕ້ອງ, ພວກເຮົາສາມາດນໍາໃຊ້ລະຫັດຕໍ່ໄປນີ້ທີ່ດັດແປງມາຈາກເອກະສານ Databricks ( ຕາຕະລາງການອ້າງອີງ Lakehouse Monitoring starter notebook ).
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
ຕາຕະລາງຜົນໄດ້ຮັບຈະມີລັກສະນະນີ້:
ຕໍ່ໄປ, ພວກເຮົາຄວນຈະເລີ່ມຕົ້ນຕາຕະລາງການຫລົ້ມຈົມຂອງພວກເຮົາ
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
ແລະຂຽນຜົນໄດ້ຮັບ
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
ສຸດທ້າຍ, ພວກເຮົາສ້າງຕາຕະລາງພື້ນຖານຂອງພວກເຮົາ. DLM ໃຊ້ຕາຕະລາງນີ້ເພື່ອຄິດໄລ່ drifts ໂດຍການປຽບທຽບການແຈກຢາຍຂອງຖັນທີ່ຄ້າຍຄືກັນຂອງພື້ນຖານແລະຕົວແບບຕົ້ນຕໍ. ຕາຕະລາງພື້ນຖານຄວນມີຖັນລັກສະນະດຽວກັນກັບຖັນຕົ້ນຕໍເຊັ່ນດຽວກັນກັບຖັນການກໍານົດຕົວແບບດຽວກັນ. ສໍາລັບຕາຕະລາງພື້ນຖານພວກເຮົາໃຊ້ຕາຕະລາງການຄາດຄະເນຂອງ ຊຸດຂໍ້ມູນການກວດສອບ ຂອງພວກເຮົາທີ່ພວກເຮົາເກັບຮັກສາໄວ້ກ່ອນຫນ້ານີ້ຫຼັງຈາກທີ່ພວກເຮົາຝຶກອົບຮົມຕົວແບບຂອງພວກເຮົາໂດຍໃຊ້ hyperparameter ທີ່ດີທີ່ສຸດ. ເພື່ອຄິດໄລ່ການວັດແທກ drift, Databricks ຄິດໄລ່ຕົວຊີ້ບອກ profile ສໍາລັບທັງຕາຕະລາງຕົ້ນຕໍແລະພື້ນຖານ. ທີ່ນີ້ທ່ານສາມາດອ່ານກ່ຽວກັບ ຕາຕະລາງປະຖົມແລະຕາຕະລາງພື້ນຖານ .
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
ໃນປັດຈຸບັນພວກເຮົາກໍາລັງອ່ານເພື່ອສ້າງ dashboard ຕິດຕາມກວດກາຂອງພວກເຮົາ. ພວກເຮົາສາມາດເຮັດໄດ້ບໍ່ວ່າຈະໂດຍໃຊ້ UI ຫຼື Lakehouse Monitoring API. ນີ້ພວກເຮົາໃຊ້ທາງເລືອກທີສອງ:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
ຫຼັງຈາກທີ່ພວກເຮົາດໍາເນີນການລະຫັດມັນໃຊ້ເວລາບາງເວລາຈົນກ່ວາ Databricks ຄິດໄລ່ metric ທັງຫມົດ. ເພື່ອເບິ່ງ dashboard ໄປທີ່ແຖບ Quality
ຂອງຕາຕະລາງ sink ຂອງທ່ານ (ເຊັ່ນ: unpacked_requests_table_name
). ທ່ານຄວນເບິ່ງຫນ້າດັ່ງຕໍ່ໄປນີ້.
ຖ້າທ່ານຄລິກໃສ່ການເບິ່ງ refresh history
ທ່ານຈະເຫັນການແລ່ນ, ລໍຖ້າ ແລະການໂຫຼດຫນ້າຈໍຄືນທີ່ຜ່ານມາ. ໃຫ້ຄລິກໃສ່ View Dashboard
ເພື່ອເປີດ dashboard ຂອງທ່ານ.
ດັ່ງນັ້ນພວກເຮົາເລີ່ມຕົ້ນດ້ວຍຕາຕະລາງ inference ( my_endpoint_payload
), ປຸງແຕ່ງມັນແລະບັນທຶກຜົນໄດ້ຮັບໃສ່ my_endpoint_payload_unpacked
ແລະສົ່ງຕາຕະລາງນີ້ພ້ອມກັບຕາຕະລາງພື້ນຖານຂອງພວກເຮົາ ( base_table_als
) ໄປຫາ API ຂອງພວກເຮົາ. DLM ຄິດໄລ່ການວັດແທກໂປຣໄຟລ໌ສຳລັບແຕ່ລະຕາຕະລາງ ( my_endpoint_payload_unpacked_profile_metric
) ແລະໃຊ້ພວກມັນເພື່ອຄິດໄລ່ຕົວວັດແທກການລອຍຕົວ ( my_endpoint_payload_unpacked_drift_metrics
)
ເຈົ້າໄປເລີຍ! ເຈົ້າມີທຸກຢ່າງທີ່ເຈົ້າຕ້ອງການເພື່ອຮັບໃຊ້ ແລະຕິດຕາມເຈົ້າແບບ!
ໃນສ່ວນຕໍ່ໄປຂ້ອຍຈະສະແດງວິທີການອັດຕະໂນມັດຂະບວນການນີ້ໂດຍໃຊ້ Databricks Assets Bundle ແລະ Gitlab !