ໃນ , ພວກເຮົາໄດ້ເອົາຂັ້ນຕອນທໍາອິດສໍາລັບການກໍ່ສ້າງທໍ່ສົ່ງ MLOps ໃນຕອນທ້າຍໂດຍໃຊ້ Databricks ແລະ Spark, ນໍາພາໂດຍໂຄງສ້າງອ້າງອີງຂອງ Databricks. ນີ້ແມ່ນສະຫຼຸບຂັ້ນຕອນທີ່ສຳຄັນທີ່ພວກເຮົາໄດ້ກວມເອົາ: ສ່ວນທໍາອິດຂອງຊຸດການສອນນີ້ : ພວກເຮົາໄດ້ຈັດວາງຂໍ້ມູນຂອງພວກເຮົາເປັນຊັ້ນທອງແດງ, ເງິນ, ແລະຄໍາພາຍໃນ Unity Catalog, ການສ້າງຕັ້ງລະບົບການຄຸ້ມຄອງຂໍ້ມູນທີ່ມີໂຄງສ້າງແລະປະສິດທິພາບ. ການຕັ້ງຄ່າ Unity Catalog ສໍາລັບສະຖາປັດຕະຍະກໍາ Medallion : ພວກເຮົາໄດ້ສະແດງໃຫ້ເຫັນວິທີການນໍາເຂົ້າຂໍ້ມູນດິບເຂົ້າໄປໃນລະບົບ, ຮັບປະກັນຄວາມສອດຄ່ອງແລະຄຸນນະພາບສໍາລັບຂັ້ນຕອນການປຸງແຕ່ງຕໍ່ໄປ. Ingesting Data into Unity Catalog : ການນໍາໃຊ້ Databricks, ພວກເຮົາໄດ້ຝຶກອົບຮົມຮູບແບບການຮຽນຮູ້ເຄື່ອງຈັກທີ່ເຫມາະສົມກັບຊຸດຂໍ້ມູນຂອງພວກເຮົາ, ປະຕິບັດຕາມການປະຕິບັດທີ່ດີທີ່ສຸດສໍາລັບການພັດທະນາຕົວແບບທີ່ສາມາດຂະຫຍາຍໄດ້ແລະມີປະສິດທິພາບ. ການຝຶກອົບຮົມແບບຈໍາລອງ : ເພື່ອເພີ່ມປະສິດຕິພາບຂອງຕົວແບບ, ພວກເຮົາໄດ້ຈ້າງ HyperOpt ເພື່ອອັດຕະໂນມັດການຄົ້ນຫາສໍາລັບ hyperparameters ທີ່ດີທີ່ສຸດ, ປັບປຸງຄວາມຖືກຕ້ອງແລະປະສິດທິພາບ. Hyperparameter Tuning ກັບ HyperOpt : ພວກເຮົາໄດ້ນໍາໃຊ້ MLflow ເພື່ອບັນທຶກແລະຕິດຕາມການທົດລອງຂອງພວກເຮົາ, ຮັກສາບັນທຶກສະບັບແບບຈໍາລອງ, metrics, ແລະຕົວກໍານົດການສໍາລັບການປຽບທຽບງ່າຍແລະການແຜ່ພັນ. ການຕິດຕາມການທົດລອງກັບ Databricks MLflow ດ້ວຍຂັ້ນຕອນພື້ນຖານເຫຼົ່ານີ້ສຳເລັດແລ້ວ, ດຽວນີ້ຕົວແບບຂອງທ່ານໄດ້ຖືກນຳມາໃຊ້ເພື່ອນຳໃຊ້. ໃນພາກທີສອງນີ້, ພວກເຮົາຈະສຸມໃສ່ການລວມເອົາສອງອົງປະກອບທີ່ສໍາຄັນເຂົ້າໄປໃນລະບົບຂອງພວກເຮົາ: : ການປະຕິບັດການປະມວນຜົນ batch ເພື່ອສ້າງການຄາດຄະເນກ່ຽວກັບຊຸດຂໍ້ມູນຂະຫນາດໃຫຍ່, ເຫມາະສົມສໍາລັບຄໍາຮ້ອງສະຫມັກເຊັ່ນການໃຫ້ຄະແນນຫຼາຍແລະການລາຍງານແຕ່ລະໄລຍະ. Batch Inference : ການຕັ້ງຄ່າຕົວແບບທີ່ໃຊ້ເວລາທີ່ແທ້ຈິງເພື່ອໃຫ້ການຄາດຄະເນທັນທີທັນໃດ, ຈໍາເປັນສໍາລັບການໂຕ້ຕອບການນໍາໃຊ້ແລະການບໍລິການ. Online Inference (Model Serving) ເພື່ອຮັບປະກັນຕົວແບບທີ່ນໍາໃຊ້ຂອງທ່ານຮັກສາການປະຕິບັດທີ່ດີທີ່ສຸດແລະຄວາມຫນ້າເຊື່ອຖືໃນໄລຍະ. ການຕິດຕາມຕົວແບບ: ໃຫ້ເຂົ້າໄປໃນມັນ! ການນຳໃຊ້ຕົວແບບ ຈຸດອອກເດີນທາງຂອງ blog ສຸດທ້າຍແມ່ນການປະເມີນແບບຈໍາລອງ. ຕອນນີ້ຈິນຕະນາການວ່າພວກເຮົາໄດ້ເຮັດການປຽບທຽບແລະພົບວ່າຕົວແບບຂອງພວກເຮົາສະແດງໃຫ້ເຫັນການປະຕິບັດທີ່ສູງກວ່າເມື່ອທຽບກັບຮູບແບບການຜະລິດນີ້. ດັ່ງທີ່ພວກເຮົາຕ້ອງການ (ສົມມຸດ) ໃຊ້ຕົວແບບໃນການຜະລິດ, ພວກເຮົາຕ້ອງການໃຊ້ປະໂຫຍດຈາກຂໍ້ມູນທັງຫມົດທີ່ພວກເຮົາມີ. ຂັ້ນຕອນຕໍ່ໄປແມ່ນການຝຶກອົບຮົມແລະການທົດສອບຕົວແບບໂດຍໃຊ້ຊຸດຂໍ້ມູນເຕັມ. ຫຼັງຈາກນັ້ນ, ສືບຕໍ່ຕົວແບບຂອງພວກເຮົາສໍາລັບການນໍາໃຊ້ໃນພາຍຫລັງໂດຍການນໍາໃຊ້ມັນເປັນຕົວແບບແຊ້ມຂອງພວກເຮົາ. ເນື່ອງຈາກນີ້ແມ່ນຕົວແບບສຸດທ້າຍທີ່ເຮົາຕ້ອງການໃຊ້ສໍາລັບການສະຫຼຸບ, ພວກເຮົາໃຊ້ຄຸນສົມບັດວິສະວະກໍາລູກຄ້າເພື່ອຝຶກອົບຮົມແບບຈໍາລອງ. ດ້ວຍວິທີນີ້, ພວກເຮົາບໍ່ພຽງແຕ່ຕິດຕາມສາຍພັນຂອງຕົວແບບງ່າຍຂຶ້ນ, ແຕ່ຍັງເຮັດໃຫ້ການກວດສອບຄວາມຖືກຕ້ອງຂອງ schema ແລະການຫັນປ່ຽນຄຸນສົມບັດ (ຖ້າມີ) ໃຫ້ກັບລູກຄ້າ. with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) ພວກເຮົາຍັງສາມາດໃຊ້ ເພື່ອຝຶກອົບຮົມ ແລະບັນທຶກຕົວແບບ Feature Store ຫຼື Feature Engineering APIs model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) ເມື່ອພວກເຮົາໃຊ້ API ວິສະວະກໍາຄຸນນະສົມບັດ, ພວກເຮົາສາມາດເບິ່ງສາຍພັນຂອງຕົວແບບໃນ Catalog Explorer ຕອນນີ້ໃຫ້ອັບເດດລາຍລະອຽດຂອງຕົວແບບ ແລະມອບໝາຍປ້າຍແຊ້ມໃຫ້ກັບມັນ. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) ໃນປັດຈຸບັນສືບຕໍ່ເດີນຫນ້າແລະກວດເບິ່ງ schema ທີ່ທ່ານລົງທະບຽນຮູບແບບ. ທ່ານຄວນເບິ່ງການປັບປຸງຂອງທ່ານທັງຫມົດດັ່ງຕໍ່ໄປນີ້ : ຖ້າຫາກວ່າທ່ານນໍາໃຊ້ພື້ນທີ່ເຮັດວຽກສໍາລັບການຈົດທະບຽນຕົວແບບທີ່ທ່ານຄວນຈະຂັ້ນຕອນການຄຸ້ມຄອງຕົວແບບຂອງທ່ານ. ການໃຊ້ນາມແຝງຈະບໍ່ເຮັດວຽກ. ກວດເບິ່ງ ເພື່ອເບິ່ງວ່າມັນເຮັດວຽກແນວໃດ ຂັ້ນຕອນຂອງຕົວແບບ ທີ່ນີ້ ຕົວແບບ Inference ຄະແນນ Batch ໃນປັດຈຸບັນຈິນຕະນາການພວກເຮົາຕ້ອງການທີ່ຈະນໍາໃຊ້ຕົວແບບຂອງພວກເຮົາໃນການຜະລິດສໍາລັບການ inference. ໃນຂັ້ນຕອນນີ້, ພວກເຮົາໂຫລດຮູບແບບແຊ້ມແລະນໍາໃຊ້ມັນເພື່ອສ້າງ 20 ຄໍາແນະນໍາຮູບເງົາສໍາລັບຜູ້ໃຊ້ແຕ່ລະຄົນ. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) ແລະທ່ານສາມາດເບິ່ງພວກເຮົາໄດ້ນໍາໃຊ້ຂໍ້ມູນການຝຶກອົບຮົມດຽວກັນສໍາລັບການໃຫ້ຄະແນນ batch. ເຖິງແມ່ນວ່າໃນກໍລະນີຂອງລະບົບຄໍາແນະນໍາມັນເຮັດໃຫ້ຄວາມຮູ້ສຶກ, ໃນຄໍາຮ້ອງສະຫມັກສ່ວນໃຫຍ່ພວກເຮົາຕ້ອງການໃຊ້ຕົວແບບເພື່ອຄະແນນບາງຂໍ້ມູນທີ່ບໍ່ເຫັນ. ຕົວຢ່າງ, ການຖ່າຍຮູບຂອງເຈົ້າແມ່ນ Netflix ແລະຕ້ອງການປັບປຸງຄໍາແນະນໍາຂອງຜູ້ໃຊ້ໃນຕອນທ້າຍຂອງມື້ໂດຍອີງໃສ່ບັນຊີລາຍຊື່ທີ່ເບິ່ງໃຫມ່ຂອງພວກເຂົາ. ພວກເຮົາສາມາດຈັດຕາຕະລາງວຽກທີ່ດໍາເນີນການໃຫ້ຄະແນນ batch ໃນເວລາສະເພາະໃນຕອນທ້າຍຂອງມື້. ໃນປັດຈຸບັນພວກເຮົາສາມາດສືບຕໍ່ເດີນຫນ້າແລະສ້າງຄໍາແນະນໍາສໍາລັບຜູ້ໃຊ້ແຕ່ລະຄົນ. ສໍາລັບນີ້ພວກເຮົາຊອກຫາ 20 ອັນດັບຫນຶ່ງຕໍ່ຜູ້ໃຊ້ from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) ນີ້ແມ່ນວິທີທີ່ຜົນໄດ້ຮັບເບິ່ງຄືວ່າ ໃນທີ່ສຸດພວກເຮົາສາມາດເກັບຮັກສາການຄາດຄະເນເປັນປ້າຍຊື່ delta ໃນ UC ຂອງພວກເຮົາຫຼືເຜີຍແຜ່ພວກມັນໃຫ້ກັບລະບົບລຸ່ມນ້ໍາ Mongo DB ຫຼື Azure Cosmos DB. ພວກເຮົາໄປກັບທາງເລືອກ firs df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") ສະຕຣີມ/ອິນເຕີແນັດ ຕອນນີ້ຈິນຕະນາການກໍລະນີທີ່ພວກເຮົາຕ້ອງການປັບປຸງຄໍາແນະນໍາຂອງພວກເຮົາໂດຍອີງໃສ່ການໂຕ້ຕອບຂອງຜູ້ໃຊ້ໃນເວລາຈິງ. ສໍາລັບກໍລະນີນີ້ພວກເຮົາສາມາດນໍາໃຊ້ການຮັບໃຊ້ແບບຈໍາລອງ. ເມື່ອໃຜຜູ້ຫນຶ່ງຕ້ອງການໃຊ້ຕົວແບບຂອງເຈົ້າ, ພວກເຂົາສາມາດສົ່ງຂໍ້ມູນໄປຫາເຄື່ອງແມ່ຂ່າຍ. ຫຼັງຈາກນັ້ນ, ເຊີບເວີຈະປ້ອນຂໍ້ມູນນັ້ນໄປຫາຕົວແບບທີ່ນຳໃຊ້ຂອງທ່ານ, ເຊິ່ງເຂົ້າສູ່ການປະຕິບັດ, ວິເຄາະຂໍ້ມູນ ແລະສ້າງການຄາດຄະເນ. ພວກເຂົາສາມາດຖືກໃຊ້ໃນແອັບພລິເຄຊັນເວັບ, ແອັບຯມືຖື, ຫຼືແມ້ກະທັ້ງລະບົບຝັງຕົວ. ຫນຶ່ງໃນຄໍາຮ້ອງສະຫມັກຂອງວິທີການນີ້ແມ່ນເພື່ອເຮັດໃຫ້ເສັ້ນທາງການຈະລາຈອນສໍາລັບການທົດສອບ A / B. ALS algorithm ບໍ່ສາມາດຖືກນໍາໃຊ້ໂດຍກົງສໍາລັບການ inference ອອນໄລນ໌ເນື່ອງຈາກວ່າມັນຮຽກຮ້ອງໃຫ້ມີການຝຶກອົບຮົມ retraining ຮູບແບບການນໍາໃຊ້ຂໍ້ມູນທັງຫມົດ (ເກົ່າ + ໃຫມ່) ເພື່ອປັບປຸງຄໍາແນະນໍາ. Gradient Descent algorithms ແມ່ນຕົວຢ່າງຂອງຕົວແບບທີ່ສາມາດໃຊ້ສໍາລັບການອັບເດດອອນໄລນ໌. ພວກເຮົາອາດຈະເບິ່ງບາງ algorithms ເຫຼົ່ານີ້ໃນການຕອບໃນອະນາຄົດ. ຢ່າງໃດກໍຕາມ, ພຽງແຕ່ເພື່ອສະແດງໃຫ້ເຫັນວິທີການດັ່ງກ່າວຈະເຮັດວຽກ, ພວກເຮົາສ້າງຮູບແບບ (ບໍ່ມີປະໂຫຍດ) ໃຫ້ບໍລິການຈຸດສິ້ນສຸດທີ່ຄາດຄະເນການຈັດອັນດັບຮູບເງົາໂດຍອີງໃສ່ທຸກຄັ້ງທີ່ຜູ້ໃຊ້ອັດຕາຮູບເງົາ! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) ນີ້ຈະສ້າງແລະອາຫານທ່ຽງແບບຈໍາລອງໃຫ້ບໍລິການສໍາລັບພວກເຮົາດັ່ງນັ້ນມັນໃຊ້ເວລາບາງເວລາ. ຕອນນີ້ຖ້າທ່ານເປີດປ່ອງຢ້ຽມ , ທ່ານຄວນເຫັນຈຸດສິ້ນສຸດຂອງທ່ານ. Serving ພວກເຮົາສາມາດນໍາໃຊ້ endpoint ຫນຶ່ງເພື່ອຮັບໃຊ້ຫຼາຍຮູບແບບ. ຫຼັງຈາກນັ້ນ, ພວກເຮົາສາມາດນໍາໃຊ້ເສັ້ນທາງການຈະລາຈອນສໍາລັບສະຖານະການເຊັ່ນ: ການທົດສອບ A / B ຫຼືປຽບທຽບການປະຕິບັດຂອງຕົວແບບທີ່ແຕກຕ່າງກັນໃນການຜະລິດ. ຕາຕະລາງ Inference ຕາຕະລາງ Inference ໃນ Databricks Model Serving ເຮັດຫນ້າທີ່ເປັນບັນທຶກອັດຕະໂນມັດສໍາລັບແບບຈໍາລອງຂອງພວກເຮົາ. ເມື່ອເປີດໃຊ້, ພວກມັນບັນທຶກຄໍາຮ້ອງຂໍທີ່ເຂົ້າມາ (ຂໍ້ມູນທີ່ສົ່ງສໍາລັບການຄາດຄະເນ), ຜົນໄດ້ຮັບຂອງຕົວແບບທີ່ສອດຄ້ອງກັນ (ການຄາດເດົາ), ແລະບາງ metadata ອື່ນໆເປັນຕາຕະລາງ Delta ພາຍໃນ Unity Catalog. ພວກເຮົາສາມາດນໍາໃຊ້ຕາຕະລາງ inference ສໍາລັບ , , ແລະຂັ້ນຕອນການເກັບກໍາຂໍ້ມູນສໍາລັບ ຫຼື ແບບຂອງພວກເຮົາ. ການຕິດຕາມແລະ debugging ການຕິດຕາມເຊື້ອສາຍ ການ retraining ປັບປັບຕົວ ພວກເຮົາສາມາດເປີດໃຊ້ ໃນຈຸດສິ້ນສຸດການຮັບໃຊ້ຂອງພວກເຮົາເພື່ອຕິດຕາມຕົວແບບ. ພວກເຮົາສາມາດເຮັດໄດ້ໂດຍການລະບຸຄຸນສົມບັດ ໃນ payload ເມື່ອພວກເຮົາສ້າງຈຸດສິ້ນສຸດຄັ້ງທໍາອິດ. ຫຼືພວກເຮົາປັບປຸງຈຸດສິ້ນສຸດຂອງພວກເຮົາຫຼັງຈາກນັ້ນໂດຍໃຊ້ຄໍາສັ່ງ ແລະ endpoint URL ດັ່ງຕໍ່ໄປນີ້ (ເພີ່ມເຕີມ inference table auto_capture_config put config ) ທີ່ນີ້ data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) ຕອນນີ້ໃຫ້ອາຫານຈຸດສິ້ນສຸດດ້ວຍຂໍ້ມູນການໂຕ້ຕອບຂອງຜູ້ໃຊ້ dummy ບາງອັນ import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) ພວກເຮົາສາມາດກວດເບິ່ງບັນທຶກຈຸດສິ້ນສຸດໃນຕາຕະລາງ . ມັນໃຊ້ເວລາປະມານ 10 ນາທີຈົນກ່ວາທ່ານສາມາດເບິ່ງຂໍ້ມູນໃນຕາຕະລາງ. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) ທ່ານຄວນເຫັນບາງສິ່ງບາງຢ່າງເຊັ່ນນີ້ຕາຕະລາງ payload ຂອງທ່ານ ເພື່ອເຂົ້າໃຈ schema ຂອງຕາຕະລາງ inference ນີ້, ກວດເບິ່ງ "Unity catalog inference table schema ==" .== ທີ່ນີ້ ການຕິດຕາມແບບຈໍາລອງ ແບບຈໍາລອງແລະຂໍ້ມູນການຕິດຕາມຫົວຂໍ້ທີ່ສັບສົນທີ່ຕ້ອງໃຊ້ເວລາຫຼາຍເພື່ອແມ່ບົດ. Databricks Lakehouse Monitoring (DLM) ຫຼຸດຜ່ອນສ່ວນເກີນຂອງການສ້າງລະບົບການຕິດຕາມທີ່ເຫມາະສົມໂດຍການໃຫ້ແມ່ແບບມາດຕະຖານແລະສາມາດປັບແຕ່ງໄດ້ສໍາລັບກໍລະນີທີ່ໃຊ້ທົ່ວໄປ. ແນວໃດກໍ່ຕາມ, ການສ້າງຕົ້ນສະບັບ DLM ແລະການຕິດຕາມແບບຈໍາລອງໂດຍທົ່ວໄປຮຽກຮ້ອງໃຫ້ມີການທົດລອງຫຼາຍ. ຂ້າພະເຈົ້າບໍ່ຕ້ອງການໃຫ້ທ່ານເຫັນພາບລວມຢ່າງກວ້າງຂວາງຂອງການຕິດຕາມຕົວແບບຢູ່ທີ່ນີ້, ແຕ່ແທນທີ່ຈະໃຫ້ທ່ານເປັນຈຸດເລີ່ມຕົ້ນ. ຂ້ອຍອາດຈະອຸທິດ blog ໃຫ້ກັບຫົວຂໍ້ນີ້ໃນອະນາຄົດ. ສະຫຼຸບຫຍໍ້ຂອງການເຮັດວຽກ ແລະຄຸນສົມບັດຂອງ DLM ໃນປັດຈຸບັນທີ່ພວກເຮົາມີຕົວແບບຂອງພວກເຮົາແລະດໍາເນີນການ, ພວກເຮົາສາມາດນໍາໃຊ້ຕາຕະລາງ inference ທີ່ສ້າງຂຶ້ນໂດຍຈຸດສິ້ນສຸດການຮັບໃຊ້ຂອງພວກເຮົາເພື່ອຕິດຕາມຕົວຊີ້ບອກທີ່ສໍາຄັນເຊັ່ນການປະຕິບັດຕົວແບບແລະການລອຍຕົວເພື່ອກວດພົບຄວາມແຕກແຍກຫຼືຄວາມຜິດປົກກະຕິໃນຂໍ້ມູນຫຼືຕົວແບບຂອງພວກເຮົາໃນໄລຍະເວລາ. ວິທີການທີ່ຫ້າວຫັນນີ້ຊ່ວຍໃຫ້ພວກເຮົາດໍາເນີນການແກ້ໄຂໄດ້ທັນເວລາ, ເຊັ່ນ: ການຝຶກອົບຮົມແບບຈໍາລອງຫຼືການປັບປຸງລັກສະນະຂອງຕົນ, ເພື່ອຮັກສາປະສິດທິພາບທີ່ດີທີ່ສຸດແລະສອດຄ່ອງກັບຈຸດປະສົງທາງທຸລະກິດ. DLM ສະຫນອງສາມປະເພດຂອງການວິເຄາະຫຼື : , ແລະ . ເນື່ອງຈາກພວກເຮົາມີຄວາມສົນໃຈໃນການວິເຄາະຕາຕະລາງ inference ຂອງພວກເຮົາ, ພວກເຮົາສຸມໃສ່ອັນສຸດທ້າຍ. ການນໍາໃຊ້ຕາຕະລາງສໍາລັບການຕິດຕາມກວດກາ - " " ຂອງພວກເຮົາ, ພວກເຮົາຄວນໃຫ້ແນ່ໃຈວ່າຕາຕະລາງມີໂຄງສ້າງທີ່ຖືກຕ້ອງ. ສໍາລັບ ແຕ່ລະແຖວຄວນສອດຄ່ອງກັບຄໍາຮ້ອງຂໍທີ່ມີຖັນຕໍ່ໄປນີ້: profile type ຊຸດເວລາ Snapshot Inference ຕາຕະລາງຕົ້ນຕໍ , ຕາຕະລາງ inference ລັກສະນະແບບຈໍາລອງ ການຄາດຄະເນຕົວແບບ ID ຕົວແບບ : timestamp ຂອງຄໍາຮ້ອງຂໍ inference timestamp (ທາງເລືອກ) ຄວາມຈິງພື້ນຖານ ແມ່ນສໍາຄັນສໍາລັບກໍລະນີທີ່ພວກເຮົາໃຫ້ບໍລິການຫຼາຍແບບແລະພວກເຮົາຕ້ອງການຕິດຕາມການປະຕິບັດຂອງແຕ່ລະແບບໃນ dashboard ຕິດຕາມກວດກາ. ຖ້າມີຫຼາຍກວ່າຫນຶ່ງ id ຕົວແບບທີ່ມີຢູ່, DLM ໃຊ້ມັນເພື່ອຕັດຂໍ້ມູນ ແລະຄໍານວນ metrics ແລະ statics ສໍາລັບແຕ່ລະ slice ແຍກຕ່າງຫາກ. id ຕົວແບບ DLM ຄິດໄລ່ແຕ່ລະສະຖິຕິ ແລະ metrics ສໍາລັບຊ່ວງເວລາທີ່ກໍານົດໄວ້. ສໍາລັບການວິເຄາະ inference, ມັນໄດ້ນໍາໃຊ້ຖັນ , ບວກກັບຂະຫນາດປ່ອງຢ້ຽມທີ່ຜູ້ໃຊ້ກໍານົດເພື່ອກໍານົດເວລາປ່ອງຢ້ຽມ. ເພີ່ມເຕີມຂ້າງລຸ່ມນີ້. ສະແຕມເວລາ DLM ສະຫນັບສະຫນູນສອງ ສໍາລັບຕາຕະລາງ inference: " " ຫຼື " ". ມັນຄິດໄລ່ບາງຕົວຊີ້ບອກ ແລະສະຖິຕິທີ່ກ່ຽວຂ້ອງໂດຍອີງໃສ່ຂໍ້ມູນສະເພາະນີ້. problem type ການຈັດປະເພດ regression ເພື່ອໃຊ້ DLM, ພວກເຮົາຄວນສ້າງຈໍພາບແລະຕິດມັນກັບຕາຕະລາງ. ເມື່ອພວກເຮົາເຮັດ DLM ນີ້ສ້າງ ສອງອັນ: metric tables : ຕາຕະລາງນີ້ມີສະຖິຕິສະຫຼຸບເຊັ່ນ: min, max, ເປີເຊັນຂອງ null ແລະ zeros. ມັນຍັງປະກອບດ້ວຍການວັດແທກເພີ່ມເຕີມໂດຍອີງໃສ່ປະເພດບັນຫາທີ່ກໍານົດໂດຍຜູ້ໃຊ້. ຕົວຢ່າງ , ແລະ ສໍາລັບຮູບແບບການຈັດປະເພດ, ແລະ ແລະ ສໍາລັບແບບຈໍາລອງການຖົດຖອຍ. ຕາຕະລາງການວັດແທກໂປຣໄຟລ໌ ຄວາມແມ່ນຍໍາ recall f1_score mean_squared_error mean_average_error : ມັນປະກອບດ້ວຍສະຖິຕິທີ່ວັດແທກວິທີການແຈກຢາຍຂໍ້ມູນມີການປ່ຽນແປງ ຫຼືທຽບກັບ . ມັນຄິດໄລ່ມາດຕະການເຊັ່ນການທົດສອບ Chi-square, ການທົດສອບ KS. ຕາຕະລາງ drift metric ໃນໄລຍະເວລາ ຄ່າພື້ນຖານ (ຖ້າສະຫນອງໃຫ້) ເພື່ອເບິ່ງລາຍຊື່ຕົວວັດແທກທີ່ສົມບູນສໍາລັບແຕ່ລະຕາຕະລາງ, ໃຫ້ກວດເບິ່ງຫນ້າເອກະສານ . ມັນເປັນໄປໄດ້ທີ່ຈະສ້າງ ຕາຕະລາງ metric . ຕົວວັດແທກທີ່ກໍາຫນົດເອງ ລັກສະນະທີ່ສໍາຄັນຂອງການສ້າງລະບົບການຕິດຕາມແມ່ນເພື່ອໃຫ້ແນ່ໃຈວ່າ dashboard ຕິດຕາມກວດກາຂອງພວກເຮົາມີການເຂົ້າເຖິງຂໍ້ມູນ inference ຫລ້າສຸດເມື່ອພວກເຂົາມາຮອດ. ສໍາລັບວ່າພວກເຮົາສາມາດນໍາໃຊ້ ຕິດຕາມແຖວທີ່ຖືກປຸງແຕ່ງໃນຕາຕະລາງ inference. ພວກເຮົານໍາໃຊ້ຕາຕະລາງ inference ຂອງຕົວແບບການບໍລິການເປັນຕາຕະລາງແຫຼ່ງຂອງພວກເຮົາ ( ), ແລະຕາຕະລາງການຕິດຕາມກວດກາເປັນຕາຕະລາງ sink ( ). ພວກເຮົາຍັງໃຫ້ແນ່ໃຈວ່າ (CDC) ຖືກເປີດໃຊ້ຢູ່ໃນຕາຕະລາງທັງສອງ (ມັນຖືກເປີດໃຊ້ໂດຍຄ່າເລີ່ມຕົ້ນໃນຕາຕະລາງ Inference). ວິທີນີ້ພວກເຮົາປະມວນຜົນພຽງແຕ່ການປ່ຽນແປງ - ໃສ່ / ປັບປຸງ / ລົບ - ໃນຕາຕະລາງແຫຼ່ງແທນທີ່ຈະກ່ວາການປຸງແຕ່ງຕາຕະລາງທັງຫມົດຄືນໃຫມ່ທຸກໆຄັ້ງ. Delta table streaming readStream writeStream ການຈັບຂໍ້ມູນການປ່ຽນແປງ ມື ເພື່ອເຮັດໃຫ້ການຕິດຕາມກວດກາຕາຕະລາງ inference ຂອງພວກເຮົາ, ພວກເຮົາດໍາເນີນຂັ້ນຕອນດັ່ງຕໍ່ໄປນີ້: ອ່ານຕາຕະລາງ inference ເປັນຕາຕະລາງການຖ່າຍທອດ ສ້າງຕາຕະລາງ delta ໃໝ່ ທີ່ມີ schema ທີ່ຖືກຕ້ອງໂດຍ unpacking ຕາຕະລາງ inference ທີ່ສ້າງຂຶ້ນໂດຍຕົວແບບຂອງພວກເຮົາໃຫ້ບໍລິການ endpoint. ກະກຽມຕາຕະລາງພື້ນຖານ (ຖ້າມີ) ສ້າງຈໍພາບເທິງຕາຕະລາງຜົນໄດ້ຮັບແລະໂຫຼດຫນ້າຈໍຄືນ metric ຈັດຕາຕະລາງຂັ້ນຕອນການເຮັດວຽກເພື່ອແຍກຕາຕະລາງການອ້າງອີງໃສ່ໂຄງສ້າງທີ່ຖືກຕ້ອງແລະປັບປຸງການວັດແທກໃຫມ່ ທໍາອິດພວກເຮົາຈໍາເປັນຕ້ອງໄດ້ຕິດຕັ້ງ Lakehouse Monitoring API. ມັນຄວນຈະຖືກຕິດຕັ້ງແລ້ວຖ້າທ່ານໃຊ້ Databricks rum time 15.3 LTS ແລະຂ້າງເທິງ: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() ໃຫ້ອ່ານຕາຕະລາງ inference ເປັນຕາຕະລາງ streaming requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True ຕໍ່ໄປພວກເຮົາຕ້ອງວາງຕາຕະລາງໃນຮູບແບບທີ່ຖືກຕ້ອງຕາມທີ່ໄດ້ອະທິບາຍຂ້າງເທິງ. ຕາຕະລາງນີ້ຄວນຈະມີແຖວຫນຶ່ງສໍາລັບແຕ່ລະການຄາດຄະເນທີ່ມີຄຸນສົມບັດທີ່ກ່ຽວຂ້ອງແລະມູນຄ່າການຄາດຄະເນ. ຕາຕະລາງ inference ທີ່ພວກເຮົາໄດ້ຮັບຈາກຮູບແບບການໃຫ້ບໍລິການ endpoint, ເກັບຮັກສາຄໍາຮ້ອງຂໍ endpoint ແລະການຕອບສະຫນອງເປັນຮູບແບບ JSON ຊ້ອນ. ນີ້ແມ່ນຕົວຢ່າງຂອງ JSON payload ສໍາລັບຄໍລໍາການຮ້ອງຂໍແລະການຕອບສະຫນອງ. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | ເພື່ອ unpack ຕາຕະລາງນີ້ໄປຫາ schema ທີ່ຖືກຕ້ອງ, ພວກເຮົາສາມາດນໍາໃຊ້ລະຫັດຕໍ່ໄປນີ້ທີ່ດັດແປງມາຈາກເອກະສານ Databricks ( ). ຕາຕະລາງການອ້າງອີງ Lakehouse Monitoring starter notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned ຕາຕະລາງຜົນໄດ້ຮັບຈະມີລັກສະນະນີ້: ຕໍ່ໄປ, ພວກເຮົາຄວນຈະເລີ່ມຕົ້ນຕາຕະລາງການຫລົ້ມຈົມຂອງພວກເຮົາ dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() ແລະຂຽນຜົນໄດ້ຮັບ checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ ສຸດທ້າຍ, ພວກເຮົາສ້າງຕາຕະລາງພື້ນຖານຂອງພວກເຮົາ. DLM ໃຊ້ຕາຕະລາງນີ້ເພື່ອຄິດໄລ່ drifts ໂດຍການປຽບທຽບການແຈກຢາຍຂອງຖັນທີ່ຄ້າຍຄືກັນຂອງພື້ນຖານແລະຕົວແບບຕົ້ນຕໍ. ຕາຕະລາງພື້ນຖານຄວນມີຖັນລັກສະນະດຽວກັນກັບຖັນຕົ້ນຕໍເຊັ່ນດຽວກັນກັບຖັນການກໍານົດຕົວແບບດຽວກັນ. ສໍາລັບຕາຕະລາງພື້ນຖານພວກເຮົາໃຊ້ຕາຕະລາງການຄາດຄະເນຂອງ ຂອງພວກເຮົາທີ່ພວກເຮົາເກັບຮັກສາໄວ້ກ່ອນຫນ້ານີ້ຫຼັງຈາກທີ່ພວກເຮົາຝຶກອົບຮົມຕົວແບບຂອງພວກເຮົາໂດຍໃຊ້ hyperparameter ທີ່ດີທີ່ສຸດ. ເພື່ອຄິດໄລ່ການວັດແທກ drift, Databricks ຄິດໄລ່ຕົວຊີ້ບອກ profile ສໍາລັບທັງຕາຕະລາງຕົ້ນຕໍແລະພື້ນຖານ. ທີ່ນີ້ທ່ານສາມາດອ່ານກ່ຽວກັບ ຊຸດຂໍ້ມູນການກວດສອບ . ຕາຕະລາງປະຖົມແລະຕາຕະລາງພື້ນຖານ #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") ໃນປັດຈຸບັນພວກເຮົາກໍາລັງອ່ານເພື່ອສ້າງ dashboard ຕິດຕາມກວດກາຂອງພວກເຮົາ. ພວກເຮົາສາມາດເຮັດໄດ້ບໍ່ວ່າຈະໂດຍໃຊ້ ຫຼື Lakehouse Monitoring API. ນີ້ພວກເຮົາໃຊ້ທາງເລືອກທີສອງ: UI # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) ຫຼັງຈາກທີ່ພວກເຮົາດໍາເນີນການລະຫັດມັນໃຊ້ເວລາບາງເວລາຈົນກ່ວາ Databricks ຄິດໄລ່ metric ທັງຫມົດ. ເພື່ອເບິ່ງ dashboard ໄປທີ່ແຖບ ຂອງຕາຕະລາງ sink ຂອງທ່ານ (ເຊັ່ນ: ). ທ່ານຄວນເບິ່ງຫນ້າດັ່ງຕໍ່ໄປນີ້. Quality unpacked_requests_table_name ຖ້າທ່ານຄລິກໃສ່ການເບິ່ງ ທ່ານຈະເຫັນການແລ່ນ, ລໍຖ້າ ແລະການໂຫຼດຫນ້າຈໍຄືນທີ່ຜ່ານມາ. ໃຫ້ຄລິກໃສ່ ເພື່ອເປີດ dashboard ຂອງທ່ານ. refresh history View Dashboard ດັ່ງນັ້ນພວກເຮົາເລີ່ມຕົ້ນດ້ວຍຕາຕະລາງ inference ( ), ປຸງແຕ່ງມັນແລະບັນທຶກຜົນໄດ້ຮັບໃສ່ ແລະສົ່ງຕາຕະລາງນີ້ພ້ອມກັບຕາຕະລາງພື້ນຖານຂອງພວກເຮົາ ( ) ໄປຫາ API ຂອງພວກເຮົາ. DLM ຄິດໄລ່ການວັດແທກໂປຣໄຟລ໌ສຳລັບແຕ່ລະຕາຕະລາງ ( ) ແລະໃຊ້ພວກມັນເພື່ອຄິດໄລ່ຕົວວັດແທກການລອຍຕົວ ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics ເຈົ້າໄປເລີຍ! ເຈົ້າມີທຸກຢ່າງທີ່ເຈົ້າຕ້ອງການເພື່ອຮັບໃຊ້ ແລະຕິດຕາມເຈົ້າແບບ! ໃນສ່ວນຕໍ່ໄປຂ້ອຍຈະສະແດງວິທີການອັດຕະໂນມັດຂະບວນການນີ້ໂດຍໃຊ້ ແລະ ! Databricks Assets Bundle Gitlab ໃນ , ພວກເຮົາໄດ້ເອົາຂັ້ນຕອນທໍາອິດສໍາລັບການກໍ່ສ້າງທໍ່ສົ່ງ MLOps ໃນຕອນທ້າຍໂດຍໃຊ້ Databricks ແລະ Spark, ນໍາພາໂດຍໂຄງສ້າງອ້າງອີງຂອງ Databricks. ນີ້ແມ່ນບົດສະຫຼຸບຂອງຂັ້ນຕອນສຳຄັນທີ່ພວກເຮົາໄດ້ກວມເອົາ: ສ່ວນທໍາອິດຂອງຊຸດການສອນນີ້ : ພວກເຮົາໄດ້ຈັດວາງຂໍ້ມູນຂອງພວກເຮົາເປັນຊັ້ນທອງແດງ, ເງິນ, ແລະຄໍາພາຍໃນ Unity Catalog, ການສ້າງຕັ້ງລະບົບການຄຸ້ມຄອງຂໍ້ມູນທີ່ມີໂຄງສ້າງແລະປະສິດທິພາບ. ການຕັ້ງຄ່າ Unity Catalog ສໍາລັບສະຖາປັດຕະຍະກໍາ Medallion : ພວກເຮົາໄດ້ສະແດງໃຫ້ເຫັນວິທີການນໍາເຂົ້າຂໍ້ມູນດິບເຂົ້າໄປໃນລະບົບ, ຮັບປະກັນຄວາມສອດຄ່ອງແລະຄຸນນະພາບສໍາລັບຂັ້ນຕອນການປຸງແຕ່ງຕໍ່ໄປ. Ingesting Data into Unity Catalog : ການນໍາໃຊ້ Databricks, ພວກເຮົາໄດ້ຝຶກອົບຮົມຮູບແບບການຮຽນຮູ້ເຄື່ອງຈັກທີ່ເຫມາະສົມກັບຊຸດຂໍ້ມູນຂອງພວກເຮົາ, ປະຕິບັດຕາມການປະຕິບັດທີ່ດີທີ່ສຸດສໍາລັບການພັດທະນາຕົວແບບທີ່ສາມາດຂະຫຍາຍໄດ້ແລະມີປະສິດທິພາບ. ການຝຶກອົບຮົມແບບຈໍາລອງ : ເພື່ອເພີ່ມປະສິດຕິພາບຂອງຕົວແບບ, ພວກເຮົາໄດ້ຈ້າງ HyperOpt ເພື່ອອັດຕະໂນມັດການຄົ້ນຫາສໍາລັບ hyperparameters ທີ່ດີທີ່ສຸດ, ປັບປຸງຄວາມຖືກຕ້ອງແລະປະສິດທິພາບ. Hyperparameter Tuning ກັບ HyperOpt : ພວກເຮົາໄດ້ນໍາໃຊ້ MLflow ເພື່ອບັນທຶກແລະຕິດຕາມການທົດລອງຂອງພວກເຮົາ, ຮັກສາບັນທຶກສະບັບແບບຈໍາລອງ, metrics, ແລະຕົວກໍານົດການສໍາລັບການປຽບທຽບງ່າຍແລະການແຜ່ພັນ. ການຕິດຕາມການທົດລອງກັບ Databricks MLflow ດ້ວຍຂັ້ນຕອນພື້ນຖານເຫຼົ່ານີ້ສຳເລັດແລ້ວ, ດຽວນີ້ຕົວແບບຂອງທ່ານໄດ້ຖືກນຳມາໃຊ້ເພື່ອນຳໃຊ້. ໃນພາກທີສອງນີ້, ພວກເຮົາຈະສຸມໃສ່ການລວມເອົາສອງອົງປະກອບທີ່ສໍາຄັນເຂົ້າໄປໃນລະບົບຂອງພວກເຮົາ: : ການປະຕິບັດການປະມວນຜົນ batch ເພື່ອສ້າງການຄາດຄະເນກ່ຽວກັບຊຸດຂໍ້ມູນຂະຫນາດໃຫຍ່, ເຫມາະສົມສໍາລັບຄໍາຮ້ອງສະຫມັກເຊັ່ນການໃຫ້ຄະແນນຫຼາຍແລະການລາຍງານແຕ່ລະໄລຍະ. Batch Inference : ການຕັ້ງຄ່າຕົວແບບທີ່ໃຊ້ເວລາທີ່ແທ້ຈິງເພື່ອໃຫ້ການຄາດຄະເນທັນທີທັນໃດ, ຈໍາເປັນສໍາລັບການໂຕ້ຕອບການນໍາໃຊ້ແລະການບໍລິການ. Online Inference (Model Serving) ເພື່ອຮັບປະກັນຕົວແບບທີ່ນໍາໃຊ້ຂອງທ່ານຮັກສາການປະຕິບັດທີ່ດີທີ່ສຸດແລະຄວາມຫນ້າເຊື່ອຖືໃນໄລຍະ. ການຕິດຕາມຕົວແບບ: ໃຫ້ເຂົ້າໄປໃນມັນ! ການນຳໃຊ້ຕົວແບບ ຈຸດອອກເດີນທາງຂອງ blog ສຸດທ້າຍແມ່ນການປະເມີນແບບຈໍາລອງ. ຕອນນີ້ຈິນຕະນາການວ່າພວກເຮົາໄດ້ເຮັດການປຽບທຽບແລະພົບວ່າຕົວແບບຂອງພວກເຮົາສະແດງໃຫ້ເຫັນການປະຕິບັດທີ່ສູງກວ່າເມື່ອທຽບກັບຮູບແບບການຜະລິດນີ້. ດັ່ງທີ່ພວກເຮົາຕ້ອງການ (ສົມມຸດ) ໃຊ້ຕົວແບບໃນການຜະລິດ, ພວກເຮົາຕ້ອງການໃຊ້ປະໂຫຍດຈາກຂໍ້ມູນທັງຫມົດທີ່ພວກເຮົາມີ. ຂັ້ນຕອນຕໍ່ໄປແມ່ນການຝຶກອົບຮົມແລະການທົດສອບຕົວແບບໂດຍໃຊ້ຊຸດຂໍ້ມູນເຕັມ. ຫຼັງຈາກນັ້ນ, ສືບຕໍ່ຕົວແບບຂອງພວກເຮົາສໍາລັບການນໍາໃຊ້ໃນພາຍຫລັງໂດຍການນໍາໃຊ້ມັນເປັນຕົວແບບແຊ້ມຂອງພວກເຮົາ. ເນື່ອງຈາກນີ້ແມ່ນຕົວແບບສຸດທ້າຍທີ່ເຮົາຕ້ອງການໃຊ້ສໍາລັບການສະຫຼຸບ, ພວກເຮົາໃຊ້ຄຸນສົມບັດວິສະວະກໍາລູກຄ້າເພື່ອຝຶກອົບຮົມແບບຈໍາລອງ. ດ້ວຍວິທີນີ້, ພວກເຮົາບໍ່ພຽງແຕ່ຕິດຕາມສາຍພັນຂອງຕົວແບບງ່າຍຂຶ້ນ, ແຕ່ຍັງສົ່ງການກວດສອບຄວາມຖືກຕ້ອງຂອງ schema ແລະການຫັນປ່ຽນຄຸນສົມບັດ (ຖ້າມີ) ໃຫ້ກັບລູກຄ້າ. with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) ພວກເຮົາຍັງສາມາດໃຊ້ ເພື່ອຝຶກອົບຮົມ ແລະບັນທຶກຕົວແບບ Feature Store ຫຼື Feature Engineering APIs model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) ເມື່ອພວກເຮົາໃຊ້ API ວິສະວະກໍາຄຸນນະສົມບັດ, ພວກເຮົາສາມາດເບິ່ງສາຍພັນຂອງຕົວແບບໃນ Catalog Explorer ຕອນນີ້ໃຫ້ອັບເດດລາຍລະອຽດຂອງຕົວແບບ ແລະມອບໝາຍປ້າຍແຊ້ມໃຫ້ກັບມັນ. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) ໃນປັດຈຸບັນສືບຕໍ່ເດີນຫນ້າແລະກວດເບິ່ງ schema ທີ່ທ່ານລົງທະບຽນຮູບແບບ. ທ່ານຄວນເບິ່ງການປັບປຸງຂອງທ່ານທັງຫມົດດັ່ງຕໍ່ໄປນີ້ : ຖ້າຫາກວ່າທ່ານນໍາໃຊ້ພື້ນທີ່ເຮັດວຽກສໍາລັບການຈົດທະບຽນຕົວແບບທີ່ທ່ານຄວນຈະຂັ້ນຕອນການຄຸ້ມຄອງຕົວແບບຂອງທ່ານ. ການໃຊ້ນາມແຝງຈະບໍ່ເຮັດວຽກ. ກວດເບິ່ງ ເພື່ອເບິ່ງວ່າມັນເຮັດວຽກແນວໃດ ຂັ້ນຕອນຂອງຕົວແບບ ທີ່ນີ້ ການອະພິປາຍຕົວແບບ ຄະແນນ Batch ຕອນນີ້ຈິນຕະນາການວ່າພວກເຮົາຕ້ອງການໃຊ້ຕົວແບບຂອງພວກເຮົາໃນການຜະລິດສໍາລັບການ inference. ໃນຂັ້ນຕອນນີ້, ພວກເຮົາໂຫລດຮູບແບບແຊ້ມແລະນໍາໃຊ້ມັນເພື່ອສ້າງ 20 ຄໍາແນະນໍາຮູບເງົາສໍາລັບຜູ້ໃຊ້ແຕ່ລະຄົນ. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) ແລະທ່ານສາມາດເບິ່ງພວກເຮົາໄດ້ນໍາໃຊ້ຂໍ້ມູນການຝຶກອົບຮົມດຽວກັນສໍາລັບການໃຫ້ຄະແນນ batch. ເຖິງແມ່ນວ່າໃນກໍລະນີຂອງລະບົບຄໍາແນະນໍາມັນເຮັດໃຫ້ຄວາມຮູ້ສຶກ, ໃນຄໍາຮ້ອງສະຫມັກສ່ວນໃຫຍ່ພວກເຮົາຕ້ອງການໃຊ້ຕົວແບບເພື່ອຄະແນນບາງຂໍ້ມູນທີ່ບໍ່ເຫັນ. ຕົວຢ່າງ, ການຖ່າຍຮູບຂອງເຈົ້າແມ່ນ Netflix ແລະຕ້ອງການປັບປຸງຄໍາແນະນໍາຂອງຜູ້ໃຊ້ໃນຕອນທ້າຍຂອງມື້ໂດຍອີງໃສ່ບັນຊີລາຍຊື່ທີ່ເບິ່ງໃຫມ່ຂອງພວກເຂົາ. ພວກເຮົາສາມາດຈັດຕາຕະລາງວຽກທີ່ດໍາເນີນການໃຫ້ຄະແນນ batch ໃນເວລາສະເພາະໃນຕອນທ້າຍຂອງມື້. ໃນປັດຈຸບັນພວກເຮົາສາມາດສືບຕໍ່ເດີນຫນ້າແລະສ້າງຄໍາແນະນໍາສໍາລັບຜູ້ໃຊ້ແຕ່ລະຄົນ. ສໍາລັບນີ້ພວກເຮົາຊອກຫາ 20 ອັນດັບທໍາອິດຕໍ່ຜູ້ໃຊ້ from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) ນີ້ແມ່ນວິທີທີ່ຜົນໄດ້ຮັບເບິ່ງຄືວ່າ ໃນທີ່ສຸດພວກເຮົາສາມາດເກັບຮັກສາການຄາດຄະເນເປັນປ້າຍຊື່ delta ໃນ UC ຂອງພວກເຮົາຫຼືເຜີຍແຜ່ພວກມັນໃຫ້ກັບລະບົບລຸ່ມນ້ໍາ Mongo DB ຫຼື Azure Cosmos DB. ພວກເຮົາໄປກັບທາງເລືອກ firs df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") ສະຕຣີມ/ອິນເຕີແນັດ ຕອນນີ້ຈິນຕະນາການກໍລະນີທີ່ພວກເຮົາຕ້ອງການປັບປຸງຄໍາແນະນໍາຂອງພວກເຮົາໂດຍອີງໃສ່ການໂຕ້ຕອບຂອງຜູ້ໃຊ້ໃນເວລາຈິງ. ສໍາລັບກໍລະນີນີ້ພວກເຮົາສາມາດນໍາໃຊ້ການຮັບໃຊ້ແບບຈໍາລອງ. ເມື່ອຜູ້ໃດຜູ້ຫນຶ່ງຕ້ອງການໃຊ້ຕົວແບບຂອງເຈົ້າ, ພວກເຂົາສາມາດສົ່ງຂໍ້ມູນໄປຫາເຄື່ອງແມ່ຂ່າຍ. ຫຼັງຈາກນັ້ນ, ເຊີບເວີຈະປ້ອນຂໍ້ມູນນັ້ນໄປຫາຕົວແບບທີ່ນຳໃຊ້ຂອງທ່ານ, ເຊິ່ງເຂົ້າສູ່ການປະຕິບັດ, ວິເຄາະຂໍ້ມູນ ແລະສ້າງການຄາດຄະເນ. ພວກເຂົາສາມາດຖືກນໍາໃຊ້ໃນຄໍາຮ້ອງສະຫມັກເວັບ, ແອັບຯມືຖື, ຫຼືແມ້ກະທັ້ງລະບົບຝັງຕົວ. ຫນຶ່ງໃນຄໍາຮ້ອງສະຫມັກຂອງວິທີການນີ້ແມ່ນເພື່ອເຮັດໃຫ້ເສັ້ນທາງການຈະລາຈອນສໍາລັບການທົດສອບ A / B. ALS algorithm ບໍ່ສາມາດຖືກນໍາໃຊ້ໂດຍກົງສໍາລັບການ inference ອອນໄລນ໌ເນື່ອງຈາກວ່າມັນຮຽກຮ້ອງໃຫ້ມີການຝຶກອົບຮົມ retraining ຮູບແບບການນໍາໃຊ້ຂໍ້ມູນທັງຫມົດ (ເກົ່າ + ໃຫມ່) ເພື່ອປັບປຸງຄໍາແນະນໍາ. Gradient Descent algorithms ແມ່ນຕົວຢ່າງຂອງຕົວແບບທີ່ສາມາດໃຊ້ສໍາລັບການອັບເດດອອນໄລນ໌. ພວກເຮົາອາດຈະເບິ່ງບາງ algorithms ເຫຼົ່ານີ້ໃນການຕອບໃນອະນາຄົດ. ແນວໃດກໍ່ຕາມ, ພຽງແຕ່ເພື່ອສະແດງໃຫ້ເຫັນວ່າຮູບແບບດັ່ງກ່າວຈະເຮັດວຽກແນວໃດ, ພວກເຮົາກຳລັງສ້າງແບບຈໍາລອງ (ບໍ່ມີປະໂຫຍດ) ໃຫ້ບໍລິການຈຸດສິ້ນສຸດທີ່ຄາດຄະເນການຈັດອັນດັບຮູບເງົາໂດຍອີງໃສ່ທຸກຄັ້ງທີ່ຜູ້ໃຊ້ໃຫ້ອັດຕາຮູບເງົາ! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) ນີ້ຈະສ້າງແລະອາຫານທ່ຽງແບບຈໍາລອງໃຫ້ບໍລິການສໍາລັບພວກເຮົາດັ່ງນັ້ນມັນໃຊ້ເວລາບາງເວລາ. ຕອນນີ້ຖ້າທ່ານເປີດປ່ອງຢ້ຽມ , ທ່ານຄວນເຫັນຈຸດສິ້ນສຸດຂອງທ່ານ. Serving ພວກເຮົາສາມາດນໍາໃຊ້ endpoint ຫນຶ່ງເພື່ອຮັບໃຊ້ຫຼາຍຮູບແບບ. ຫຼັງຈາກນັ້ນ, ພວກເຮົາສາມາດນໍາໃຊ້ເສັ້ນທາງການຈະລາຈອນສໍາລັບສະຖານະການເຊັ່ນ: ການທົດສອບ A / B ຫຼືປຽບທຽບການປະຕິບັດຂອງຕົວແບບທີ່ແຕກຕ່າງກັນໃນການຜະລິດ. ຕາຕະລາງ Inference ຕາຕະລາງ Inference ໃນ Databricks Model Serving ເຮັດຫນ້າທີ່ເປັນບັນທຶກອັດຕະໂນມັດສໍາລັບແບບຈໍາລອງຂອງພວກເຮົາ. ເມື່ອເປີດໃຊ້, ພວກມັນບັນທຶກຄໍາຮ້ອງຂໍທີ່ເຂົ້າມາ (ຂໍ້ມູນທີ່ສົ່ງສໍາລັບການຄາດຄະເນ), ຜົນໄດ້ຮັບຂອງຕົວແບບທີ່ສອດຄ້ອງກັນ (ການຄາດເດົາ), ແລະບາງ metadata ອື່ນໆເປັນຕາຕະລາງ Delta ພາຍໃນ Unity Catalog. ພວກເຮົາສາມາດນໍາໃຊ້ຕາຕະລາງ inference ສໍາລັບ , , ແລະຂັ້ນຕອນການເກັບກໍາຂໍ້ມູນສໍາລັບ ຫຼື ແບບຂອງພວກເຮົາ. ການຕິດຕາມແລະ debugging ການຕິດຕາມເຊື້ອສາຍ ການ retraining ປັບປັບຕົວ ພວກເຮົາສາມາດເປີດໃຊ້ ໃນຈຸດສິ້ນສຸດການຮັບໃຊ້ຂອງພວກເຮົາເພື່ອຕິດຕາມຕົວແບບ. ພວກເຮົາສາມາດເຮັດໄດ້ໂດຍການລະບຸຄຸນສົມບັດ ໃນ payload ເມື່ອພວກເຮົາສ້າງຈຸດສິ້ນສຸດຄັ້ງທໍາອິດ. ຫຼືພວກເຮົາປັບປຸງຈຸດສິ້ນສຸດຂອງພວກເຮົາຫຼັງຈາກນັ້ນໂດຍໃຊ້ຄໍາສັ່ງ ແລະ endpoint URL ດັ່ງຕໍ່ໄປນີ້ (ເພີ່ມເຕີມ inference table auto_capture_config put config ) ທີ່ນີ້ data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) ຕອນນີ້ໃຫ້ອາຫານຈຸດສິ້ນສຸດດ້ວຍຂໍ້ມູນການໂຕ້ຕອບຂອງຜູ້ໃຊ້ dummy ບາງອັນ import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) ພວກເຮົາສາມາດກວດເບິ່ງບັນທຶກຈຸດສິ້ນສຸດໃນຕາຕະລາງ . ມັນໃຊ້ເວລາປະມານ 10 ນາທີຈົນກ່ວາທ່ານສາມາດເບິ່ງຂໍ້ມູນໃນຕາຕະລາງ. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) ທ່ານຄວນເຫັນບາງສິ່ງບາງຢ່າງເຊັ່ນນີ້ຕາຕະລາງ payload ຂອງທ່ານ ເພື່ອເຂົ້າໃຈ schema ຂອງຕາຕະລາງ inference ນີ້, ກວດເບິ່ງ "Unity catalog inference table schema ==" .== ທີ່ນີ້ ການຕິດຕາມແບບຈໍາລອງ ແບບຈໍາລອງແລະຂໍ້ມູນການຕິດຕາມຫົວຂໍ້ທີ່ສັບສົນທີ່ຕ້ອງໃຊ້ເວລາຫຼາຍເພື່ອແມ່ບົດ. Databricks Lakehouse Monitoring (DLM) ຫຼຸດຜ່ອນສ່ວນເກີນຂອງການສ້າງລະບົບການຕິດຕາມທີ່ເຫມາະສົມໂດຍການໃຫ້ແມ່ແບບມາດຕະຖານແລະສາມາດປັບແຕ່ງໄດ້ສໍາລັບກໍລະນີທີ່ໃຊ້ທົ່ວໄປ. ແນວໃດກໍ່ຕາມ, ການສ້າງຕົ້ນສະບັບ DLM ແລະການຕິດຕາມແບບຈໍາລອງໂດຍທົ່ວໄປຮຽກຮ້ອງໃຫ້ມີການທົດລອງຫຼາຍ. ຂ້າພະເຈົ້າບໍ່ຕ້ອງການໃຫ້ທ່ານເຫັນພາບລວມຢ່າງກວ້າງຂວາງຂອງການຕິດຕາມຕົວແບບຢູ່ທີ່ນີ້, ແຕ່ແທນທີ່ຈະໃຫ້ທ່ານເປັນຈຸດເລີ່ມຕົ້ນ. ຂ້ອຍອາດຈະອຸທິດ blog ໃຫ້ກັບຫົວຂໍ້ນີ້ໃນອະນາຄົດ. ສະຫຼຸບຫຍໍ້ຂອງການເຮັດວຽກ ແລະຄຸນສົມບັດຂອງ DLM ໃນປັດຈຸບັນທີ່ພວກເຮົາມີຕົວແບບຂອງພວກເຮົາແລະດໍາເນີນການ, ພວກເຮົາສາມາດນໍາໃຊ້ຕາຕະລາງ inference ທີ່ສ້າງຂຶ້ນໂດຍຈຸດສິ້ນສຸດການຮັບໃຊ້ຂອງພວກເຮົາເພື່ອຕິດຕາມຕົວຊີ້ບອກທີ່ສໍາຄັນເຊັ່ນການປະຕິບັດຕົວແບບແລະການລອຍຕົວເພື່ອກວດພົບຄວາມແຕກແຍກຫຼືຄວາມຜິດປົກກະຕິໃນຂໍ້ມູນຫຼືຕົວແບບຂອງພວກເຮົາໃນໄລຍະເວລາ. ວິທີການທີ່ຫ້າວຫັນນີ້ຊ່ວຍໃຫ້ພວກເຮົາດໍາເນີນການແກ້ໄຂໄດ້ທັນເວລາ, ເຊັ່ນ: ການຝຶກອົບຮົມແບບຈໍາລອງຫຼືການປັບປຸງລັກສະນະຂອງຕົນ, ເພື່ອຮັກສາປະສິດທິພາບທີ່ດີທີ່ສຸດແລະສອດຄ່ອງກັບຈຸດປະສົງທາງທຸລະກິດ. DLM ສະຫນອງສາມປະເພດຂອງການວິເຄາະຫຼື : , ແລະ . ເນື່ອງຈາກພວກເຮົາມີຄວາມສົນໃຈໃນການວິເຄາະຕາຕະລາງ inference ຂອງພວກເຮົາ, ພວກເຮົາສຸມໃສ່ອັນສຸດທ້າຍ. ການນໍາໃຊ້ຕາຕະລາງສໍາລັບການຕິດຕາມກວດກາ - " " ຂອງພວກເຮົາ, ພວກເຮົາຄວນໃຫ້ແນ່ໃຈວ່າຕາຕະລາງມີໂຄງສ້າງທີ່ຖືກຕ້ອງ. ສໍາລັບ ແຕ່ລະແຖວຄວນສອດຄ່ອງກັບຄໍາຮ້ອງຂໍທີ່ມີຖັນຕໍ່ໄປນີ້: profile type ຊຸດເວລາ Snapshot Inference ຕາຕະລາງຕົ້ນຕໍ , ຕາຕະລາງ inference ລັກສະນະແບບຈໍາລອງ ການຄາດຄະເນຕົວແບບ ID ຕົວແບບ : timestamp ຂອງຄໍາຮ້ອງຂໍ inference timestamp (ທາງເລືອກ) ຄວາມຈິງພື້ນຖານ ແມ່ນສໍາຄັນສໍາລັບກໍລະນີທີ່ພວກເຮົາໃຫ້ບໍລິການຫຼາຍແບບແລະພວກເຮົາຕ້ອງການຕິດຕາມການປະຕິບັດຂອງແຕ່ລະແບບໃນ dashboard ຕິດຕາມກວດກາ. ຖ້າມີຫຼາຍກວ່າຫນຶ່ງ id ຕົວແບບທີ່ມີຢູ່, DLM ໃຊ້ມັນເພື່ອຕັດຂໍ້ມູນ ແລະຄໍານວນ metrics ແລະ statics ສໍາລັບແຕ່ລະ slice ແຍກຕ່າງຫາກ. id ຕົວແບບ DLM ຄິດໄລ່ແຕ່ລະສະຖິຕິ ແລະ metrics ສໍາລັບຊ່ວງເວລາທີ່ກໍານົດໄວ້. ສໍາລັບການວິເຄາະ inference, ມັນໄດ້ນໍາໃຊ້ຖັນ , ບວກກັບຂະຫນາດປ່ອງຢ້ຽມທີ່ຜູ້ໃຊ້ກໍານົດເພື່ອກໍານົດເວລາປ່ອງຢ້ຽມ. ເພີ່ມເຕີມຂ້າງລຸ່ມນີ້. ສະແຕມເວລາ DLM ສະຫນັບສະຫນູນສອງ ສໍາລັບຕາຕະລາງ inference: " " ຫຼື " ". ມັນຄິດໄລ່ບາງຕົວຊີ້ບອກ ແລະສະຖິຕິທີ່ກ່ຽວຂ້ອງໂດຍອີງໃສ່ຂໍ້ມູນສະເພາະນີ້. problem type ການຈັດປະເພດ regression ເພື່ອໃຊ້ DLM, ພວກເຮົາຄວນສ້າງຈໍພາບແລະຕິດມັນກັບຕາຕະລາງ. ເມື່ອພວກເຮົາເຮັດ DLM ນີ້ສ້າງ ສອງອັນ: metric tables : ຕາຕະລາງນີ້ມີສະຖິຕິສະຫຼຸບເຊັ່ນ: min, max, ເປີເຊັນຂອງ null ແລະ zeros. ມັນຍັງປະກອບດ້ວຍການວັດແທກເພີ່ມເຕີມໂດຍອີງໃສ່ປະເພດບັນຫາທີ່ກໍານົດໂດຍຜູ້ໃຊ້. ຕົວຢ່າງ , ແລະ ສໍາລັບຮູບແບບການຈັດປະເພດ, ແລະ ແລະ ສໍາລັບແບບຈໍາລອງການຖົດຖອຍ. ຕາຕະລາງການວັດແທກໂປຣໄຟລ໌ ຄວາມແມ່ນຍໍາ recall f1_score mean_squared_error mean_average_error : ມັນປະກອບດ້ວຍສະຖິຕິທີ່ວັດແທກວິທີການແຈກຢາຍຂໍ້ມູນມີການປ່ຽນແປງ ຫຼືທຽບກັບ . ມັນຄິດໄລ່ມາດຕະການເຊັ່ນການທົດສອບ Chi-square, ການທົດສອບ KS. ຕາຕະລາງ drift metric ໃນໄລຍະເວລາ ຄ່າພື້ນຖານ (ຖ້າສະຫນອງໃຫ້) ເພື່ອເບິ່ງລາຍຊື່ຕົວວັດແທກທີ່ສົມບູນສໍາລັບແຕ່ລະຕາຕະລາງ, ໃຫ້ກວດເບິ່ງຫນ້າເອກະສານ . ມັນເປັນໄປໄດ້ທີ່ຈະສ້າງ ຕາຕະລາງ metric . ຕົວວັດແທກທີ່ກໍາຫນົດເອງ ລັກສະນະທີ່ສໍາຄັນຂອງການສ້າງລະບົບການຕິດຕາມແມ່ນເພື່ອໃຫ້ແນ່ໃຈວ່າ dashboard ຕິດຕາມກວດກາຂອງພວກເຮົາມີການເຂົ້າເຖິງຂໍ້ມູນ inference ຫລ້າສຸດເມື່ອພວກເຂົາມາຮອດ. ສໍາລັບວ່າພວກເຮົາສາມາດນໍາໃຊ້ ຕິດຕາມແຖວທີ່ຖືກປຸງແຕ່ງໃນຕາຕະລາງ inference. ພວກເຮົານໍາໃຊ້ຕາຕະລາງ inference ຂອງຕົວແບບການບໍລິການເປັນຕາຕະລາງແຫຼ່ງຂອງພວກເຮົາ ( ), ແລະຕາຕະລາງການຕິດຕາມກວດກາເປັນຕາຕະລາງ sink ( ). ພວກເຮົາຍັງໃຫ້ແນ່ໃຈວ່າ (CDC) ຖືກເປີດໃຊ້ຢູ່ໃນຕາຕະລາງທັງສອງ (ມັນຖືກເປີດໃຊ້ໂດຍຄ່າເລີ່ມຕົ້ນໃນຕາຕະລາງ Inference). ວິທີນີ້ພວກເຮົາປະມວນຜົນພຽງແຕ່ການປ່ຽນແປງ - ໃສ່ / ປັບປຸງ / ລົບ - ໃນຕາຕະລາງແຫຼ່ງແທນທີ່ຈະກ່ວາການປຸງແຕ່ງຕາຕະລາງທັງຫມົດຄືນໃຫມ່ທຸກໆຄັ້ງ. Delta table streaming readStream writeStream ການຈັບຂໍ້ມູນການປ່ຽນແປງ ມື ເພື່ອເຮັດໃຫ້ການຕິດຕາມກວດກາຕາຕະລາງ inference ຂອງພວກເຮົາ, ພວກເຮົາດໍາເນີນຂັ້ນຕອນດັ່ງຕໍ່ໄປນີ້: ອ່ານຕາຕະລາງ inference ເປັນຕາຕະລາງການຖ່າຍທອດ ສ້າງຕາຕະລາງ delta ໃໝ່ ທີ່ມີ schema ທີ່ຖືກຕ້ອງໂດຍ unpacking ຕາຕະລາງ inference ທີ່ສ້າງຂຶ້ນໂດຍຕົວແບບຂອງພວກເຮົາໃຫ້ບໍລິການ endpoint. ກະກຽມຕາຕະລາງພື້ນຖານ (ຖ້າມີ) ສ້າງຈໍພາບເທິງຕາຕະລາງຜົນໄດ້ຮັບແລະໂຫຼດຫນ້າຈໍຄືນ metric ຈັດຕາຕະລາງຂັ້ນຕອນການເຮັດວຽກເພື່ອແຍກຕາຕະລາງການອ້າງອີງໃສ່ໂຄງສ້າງທີ່ຖືກຕ້ອງແລະປັບປຸງການວັດແທກໃຫມ່ ທໍາອິດພວກເຮົາຈໍາເປັນຕ້ອງໄດ້ຕິດຕັ້ງ Lakehouse Monitoring API. ມັນຄວນຈະຖືກຕິດຕັ້ງແລ້ວຖ້າທ່ານໃຊ້ Databricks rum time 15.3 LTS ແລະຂ້າງເທິງ: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() ໃຫ້ອ່ານຕາຕະລາງ inference ເປັນຕາຕະລາງ streaming requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True ຕໍ່ໄປພວກເຮົາຕ້ອງວາງຕາຕະລາງໃນຮູບແບບທີ່ຖືກຕ້ອງຕາມທີ່ໄດ້ອະທິບາຍຂ້າງເທິງ. ຕາຕະລາງນີ້ຄວນຈະມີແຖວຫນຶ່ງສໍາລັບແຕ່ລະການຄາດຄະເນທີ່ມີຄຸນສົມບັດທີ່ກ່ຽວຂ້ອງແລະມູນຄ່າການຄາດຄະເນ. ຕາຕະລາງ inference ທີ່ພວກເຮົາໄດ້ຮັບຈາກຮູບແບບການໃຫ້ບໍລິການ endpoint, ເກັບຮັກສາຄໍາຮ້ອງຂໍ endpoint ແລະການຕອບສະຫນອງເປັນຮູບແບບ JSON ຊ້ອນ. ນີ້ແມ່ນຕົວຢ່າງຂອງ JSON payload ສໍາລັບຄໍລໍາການຮ້ອງຂໍແລະການຕອບສະຫນອງ. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | ເພື່ອ unpack ຕາຕະລາງນີ້ໄປຫາ schema ທີ່ຖືກຕ້ອງ, ພວກເຮົາສາມາດນໍາໃຊ້ລະຫັດຕໍ່ໄປນີ້ທີ່ດັດແປງມາຈາກເອກະສານ Databricks ( ). ຕາຕະລາງການອ້າງອີງ Lakehouse Monitoring starter notebook # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned ຕາຕະລາງຜົນໄດ້ຮັບຈະມີລັກສະນະນີ້: ຕໍ່ໄປ, ພວກເຮົາຄວນຈະເລີ່ມຕົ້ນຕາຕະລາງການຫລົ້ມຈົມຂອງພວກເຮົາ dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() ແລະຂຽນຜົນໄດ້ຮັບ checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ ສຸດທ້າຍ, ພວກເຮົາສ້າງຕາຕະລາງພື້ນຖານຂອງພວກເຮົາ. DLM ໃຊ້ຕາຕະລາງນີ້ເພື່ອຄິດໄລ່ drifts ໂດຍການປຽບທຽບການແຈກຢາຍຂອງຖັນທີ່ຄ້າຍຄືກັນຂອງພື້ນຖານແລະຕົວແບບຕົ້ນຕໍ. ຕາຕະລາງພື້ນຖານຄວນມີຖັນລັກສະນະດຽວກັນກັບຖັນຕົ້ນຕໍເຊັ່ນດຽວກັນກັບຖັນການກໍານົດຕົວແບບດຽວກັນ. ສໍາລັບຕາຕະລາງພື້ນຖານພວກເຮົາໃຊ້ຕາຕະລາງການຄາດຄະເນຂອງ ຂອງພວກເຮົາທີ່ພວກເຮົາເກັບຮັກສາໄວ້ກ່ອນຫນ້ານີ້ຫຼັງຈາກທີ່ພວກເຮົາຝຶກອົບຮົມຕົວແບບຂອງພວກເຮົາໂດຍໃຊ້ hyperparameter ທີ່ດີທີ່ສຸດ. ເພື່ອຄິດໄລ່ການວັດແທກ drift, Databricks ຄິດໄລ່ຕົວຊີ້ບອກ profile ສໍາລັບທັງຕາຕະລາງຕົ້ນຕໍແລະພື້ນຖານ. ທີ່ນີ້ທ່ານສາມາດອ່ານກ່ຽວກັບ ຊຸດຂໍ້ມູນການກວດສອບ . ຕາຕະລາງປະຖົມແລະຕາຕະລາງພື້ນຖານ #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") ໃນປັດຈຸບັນພວກເຮົາກໍາລັງອ່ານເພື່ອສ້າງ dashboard ຕິດຕາມກວດກາຂອງພວກເຮົາ. ພວກເຮົາສາມາດເຮັດໄດ້ບໍ່ວ່າຈະໂດຍໃຊ້ ຫຼື Lakehouse Monitoring API. ນີ້ພວກເຮົາໃຊ້ທາງເລືອກທີສອງ: UI # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) ຫຼັງຈາກທີ່ພວກເຮົາດໍາເນີນການລະຫັດມັນໃຊ້ເວລາບາງເວລາຈົນກ່ວາ Databricks ຄິດໄລ່ metric ທັງຫມົດ. ເພື່ອເບິ່ງ dashboard ໄປທີ່ແຖບ ຂອງຕາຕະລາງ sink ຂອງທ່ານ (ເຊັ່ນ: ). ທ່ານຄວນເບິ່ງຫນ້າດັ່ງຕໍ່ໄປນີ້. Quality unpacked_requests_table_name ຖ້າທ່ານຄລິກໃສ່ການເບິ່ງ ທ່ານຈະເຫັນການແລ່ນ, ລໍຖ້າ ແລະການໂຫຼດຫນ້າຈໍຄືນທີ່ຜ່ານມາ. ໃຫ້ຄລິກໃສ່ ເພື່ອເປີດ dashboard ຂອງທ່ານ. refresh history View Dashboard ດັ່ງນັ້ນພວກເຮົາເລີ່ມຕົ້ນດ້ວຍຕາຕະລາງ inference ( ), ປຸງແຕ່ງມັນແລະບັນທຶກຜົນໄດ້ຮັບໃສ່ ແລະສົ່ງຕາຕະລາງນີ້ພ້ອມກັບຕາຕະລາງພື້ນຖານຂອງພວກເຮົາ ( ) ໄປຫາ API ຂອງພວກເຮົາ. DLM ຄິດໄລ່ການວັດແທກໂປຣໄຟລ໌ສຳລັບແຕ່ລະຕາຕະລາງ ( ) ແລະໃຊ້ພວກມັນເພື່ອຄິດໄລ່ຕົວວັດແທກການລອຍຕົວ ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics ເຈົ້າໄປເລີຍ! ເຈົ້າມີທຸກຢ່າງທີ່ເຈົ້າຕ້ອງການເພື່ອຮັບໃຊ້ ແລະຕິດຕາມເຈົ້າແບບ! ໃນສ່ວນຕໍ່ໄປຂ້ອຍຈະສະແດງວິທີການອັດຕະໂນມັດຂະບວນການນີ້ໂດຍໃຊ້ ແລະ ! Databricks Assets Bundle Gitlab