paint-brush
តោះសាងសង់បំពង់បង្ហូរប្រេង MLOps ជាមួយ Databricks and Spark - ផ្នែកទី 2ដោយ@neshom
ប្រវត្តិសាស្ត្រថ្មី។

តោះសាងសង់បំពង់បង្ហូរប្រេង MLOps ជាមួយ Databricks and Spark - ផ្នែកទី 2

ដោយ Mohsen Jadidi42m2024/12/29
Read on Terminal Reader

យូរ​ពេក; អាន

នៅក្នុងផ្នែកទីពីរនៃប្លក់នេះ យើងឃើញពីរបៀបដែល Databricks អនុញ្ញាតឱ្យពួកយើងសម្រាប់ការដាក់ពង្រាយជាបាច់ និងការបម្រើតាមអ៊ីនធឺណិត។ យើងចំណាយពេលខ្លះលើរបៀបរៀបចំទិន្នន័យ និងផ្ទាំងគ្រប់គ្រងគំរូ។
featured image - តោះសាងសង់បំពង់បង្ហូរប្រេង MLOps ជាមួយ Databricks and Spark - ផ្នែកទី 2
Mohsen Jadidi HackerNoon profile picture
0-item
1-item
2-item

នៅក្នុង ផ្នែកដំបូងនៃស៊េរីមេរៀននេះ យើងបានអនុវត្តជំហានដំបូងសម្រាប់ការកសាងបំពង់បង្ហូរប្រេង MLOps ពីចុងដល់ចុង ដោយប្រើ Databricks និង Spark ដែលដឹកនាំដោយស្ថាបត្យកម្មយោងរបស់ Databricks ។ នេះ​ជា​ការសង្ខេប​អំពី​ជំហាន​សំខាន់ៗ​ដែល​យើង​បាន​គ្របដណ្តប់៖


  • ការរៀបចំកាតាឡុក Unity សម្រាប់ស្ថាបត្យកម្ម Medallion ៖ យើងបានរៀបចំទិន្នន័យរបស់យើងទៅជាស្រទាប់សំរិទ្ធ ប្រាក់ និងមាសនៅក្នុង Unity Catalog ដោយបង្កើតប្រព័ន្ធគ្រប់គ្រងទិន្នន័យដែលមានរចនាសម្ព័ន្ធ និងមានប្រសិទ្ធភាព។

  • ការបញ្ចូលទិន្នន័យទៅក្នុងកាតាឡុក Unity ៖ យើងបានបង្ហាញពីរបៀបនាំចូលទិន្នន័យឆៅទៅក្នុងប្រព័ន្ធ ដោយធានាបាននូវភាពស៊ីសង្វាក់គ្នា និងគុណភាពសម្រាប់ដំណាក់កាលដំណើរការជាបន្តបន្ទាប់។

  • ការបណ្ដុះបណ្ដាលគំរូ ៖ ការប្រើប្រាស់ Databricks យើងបានបណ្ដុះបណ្ដាលគំរូរៀនម៉ាស៊ីនដែលស្របតាមសំណុំទិន្នន័យរបស់យើង ដោយធ្វើតាមការអនុវត្តល្អបំផុតសម្រាប់ការអភិវឌ្ឍន៍គំរូដែលអាចធ្វើមាត្រដ្ឋានបាន និងមានប្រសិទ្ធភាព។

  • ការលៃតម្រូវ Hyperparameter ជាមួយ HyperOpt ៖ ដើម្បីបង្កើនការអនុវត្តគំរូ យើងបានប្រើប្រាស់ HyperOpt ដើម្បីធ្វើស្វ័យប្រវត្តិកម្មការស្វែងរកសម្រាប់ hyperparameters ដ៏ល្អប្រសើរ ធ្វើអោយប្រសើរឡើងនូវភាពត្រឹមត្រូវ និងប្រសិទ្ធភាព។

  • ការតាមដានការសាកល្បងជាមួយ Databricks MLflow ៖ យើងបានប្រើប្រាស់ MLflow ដើម្បីកត់ត្រា និងត្រួតពិនិត្យការពិសោធន៍របស់យើង ដោយរក្សាបាននូវកំណត់ត្រាដ៏ទូលំទូលាយនៃកំណែគំរូ រង្វាស់ និងប៉ារ៉ាម៉ែត្រសម្រាប់ងាយស្រួលប្រៀបធៀប និងផលិតឡើងវិញ។


ជាមួយនឹងជំហានជាមូលដ្ឋានទាំងនេះត្រូវបានបញ្ចប់ គំរូរបស់អ្នកឥឡូវនេះត្រូវបានចាប់ផ្តើមសម្រាប់ការដាក់ឱ្យប្រើប្រាស់។ នៅក្នុងផ្នែកទីពីរនេះ យើងនឹងផ្តោតលើការរួមបញ្ចូលសមាសធាតុសំខាន់ពីរទៅក្នុងប្រព័ន្ធរបស់យើង៖


  1. ការសន្និដ្ឋានជាបាច់ ៖ ការអនុវត្តដំណើរការបណ្តុំដើម្បីបង្កើតការព្យាករណ៍លើសំណុំទិន្នន័យធំ ដែលសមរម្យសម្រាប់កម្មវិធីដូចជា ការដាក់ពិន្ទុច្រើន និងការរាយការណ៍តាមកាលកំណត់។
  2. ការសន្និដ្ឋានតាមអ៊ីនធឺណិត (ការបម្រើគំរូ) ៖ ការបង្កើតការបម្រើគំរូតាមពេលវេលាជាក់ស្តែង ដើម្បីផ្តល់នូវការព្យាករណ៍ភ្លាមៗ ដែលមានសារៈសំខាន់សម្រាប់កម្មវិធី និងសេវាកម្មអន្តរកម្ម។
  3. ការត្រួតពិនិត្យគំរូ៖ ដើម្បីធានាថាម៉ូដែលដែលបានដាក់ពង្រាយរបស់អ្នករក្សាបាននូវដំណើរការល្អបំផុត និងភាពជឿជាក់តាមពេលវេលា។


តោះចូលទៅក្នុងវា!

ការដាក់ពង្រាយគំរូ

ចំណុចចេញដំណើរនៃប្លុកចុងក្រោយគឺការវាយតម្លៃគំរូ។ ឥឡូវ​ស្រមៃ​ថា យើង​បាន​ធ្វើការ​ប្រៀបធៀប ហើយ​បាន​រក​ឃើញ​ថា​ម៉ូដែល​របស់​យើង​បង្ហាញ​ពី​ដំណើរការ​ខ្ពស់​ជាង​ធៀប​នឹង​ម៉ូដែល​ផលិតកម្ម​នេះ។ ដូចដែលយើងចង់ (សន្មត់) ដើម្បីប្រើគំរូនៅក្នុងផលិតកម្ម យើងចង់ទាញយកអត្ថប្រយោជន៍ពីទិន្នន័យទាំងអស់ដែលយើងមាន។ ជំហានបន្ទាប់គឺការបណ្តុះបណ្តាល និងសាកល្បងគំរូដោយប្រើសំណុំទិន្នន័យពេញលេញ។ បន្ទាប់​មក​បន្ត​ម៉ូដែល​របស់​យើង​សម្រាប់​ប្រើ​ពេល​ក្រោយ​ដោយ​ដាក់​ពង្រាយ​វា​ជា​គំរូ​ជើងឯក​របស់​យើង។ ដោយសារនេះគឺជាគំរូចុងក្រោយដែលយើងចង់ប្រើសម្រាប់ការសន្និដ្ឋាន យើងប្រើម៉ាស៊ីនភ្ញៀវវិស្វកម្មលក្ខណៈពិសេសដើម្បីបណ្តុះបណ្តាលគំរូ។ វិធីនេះយើងមិនត្រឹមតែតាមដានខ្សែសង្វាក់គំរូកាន់តែងាយស្រួលប៉ុណ្ណោះទេ ប៉ុន្តែថែមទាំងបិទដំណើរការសុពលភាពគ្រោងការណ៍ និងការបំប្លែងមុខងារ (ប្រសិនបើមាន) ទៅកាន់អតិថិជនផងដែរ។


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


យើងក៏អាចប្រើ Feature Store ឬ Feature Engineering APIs ដើម្បីបណ្តុះបណ្តាល និងកត់ត្រាម៉ូដែល

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


នៅពេលយើងប្រើ API វិស្វកម្មលក្ខណៈពិសេស យើងអាចមើលត្រកូលរបស់ម៉ូដែលនៅក្នុង Catalog Explorer

ជួរទិន្នន័យនៅក្នុង Dataticks Unity Catalog


ឥឡូវនេះអនុញ្ញាតឱ្យអាប់ដេតការពណ៌នាគំរូ និងកំណត់ស្លាកជើងឯកទៅវា។

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


ឥឡូវ​នេះ​ទៅ​មុខ ហើយ​ពិនិត្យ​មើល​គ្រោងការណ៍​ដែល​អ្នក​បាន​ចុះ​ឈ្មោះ​ម៉ូដែល។ អ្នកគួរតែឃើញការអាប់ដេតរបស់អ្នកទាំងអស់ដូចខាងក្រោម

ការចុះបញ្ជីគំរូនៅក្នុងកាតាឡុក Databricks Unity

ដំណាក់កាលគំរូ ៖ ប្រសិនបើអ្នកប្រើកន្លែងធ្វើការសម្រាប់ការចុះបញ្ជីគំរូ អ្នកគួរដំណាក់កាលដើម្បីគ្រប់គ្រងគំរូរបស់អ្នក។ ការប្រើឈ្មោះក្លែងក្លាយនឹងមិនដំណើរការទេ។ សូមពិនិត្យមើល នៅទីនេះ ដើម្បីមើលពីរបៀបដែលវាដំណើរការ

ការសន្និដ្ឋានគំរូ

ការដាក់ពិន្ទុជាបាច់

ឥឡូវនេះស្រមៃថាយើងចង់ប្រើគំរូរបស់យើងនៅក្នុងផលិតកម្មសម្រាប់ការសន្និដ្ឋាន។ នៅក្នុងជំហាននេះ យើងផ្ទុកគំរូជើងឯក ហើយប្រើវាដើម្បីបង្កើតការណែនាំភាពយន្តចំនួន 20 សម្រាប់អ្នកប្រើប្រាស់ម្នាក់ៗ។


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


ហើយអ្នកអាចមើលឃើញថាយើងបានប្រើទិន្នន័យបណ្តុះបណ្តាលដូចគ្នាសម្រាប់ការដាក់ពិន្ទុជាក្រុម។ ទោះបីជាក្នុងករណីប្រព័ន្ធណែនាំវាសមហេតុផលក៏ដោយ នៅក្នុងកម្មវិធីភាគច្រើន យើងចង់ប្រើគំរូដើម្បីដាក់ពិន្ទុទិន្នន័យដែលមើលមិនឃើញមួយចំនួន។ ឧទាហរណ៍ រូបភាពរបស់អ្នកគឺជាក្រុមហ៊ុន Netflix ហើយចង់ធ្វើបច្ចុប្បន្នភាពការណែនាំរបស់អ្នកប្រើប្រាស់នៅចុងថ្ងៃដោយផ្អែកលើបញ្ជីដែលបានមើលថ្មីរបស់ពួកគេ។ យើង​អាច​កំណត់​ពេល​ការងារ​ដែល​ដំណើរការ​ការ​ដាក់​ពិន្ទុ​តាម​ពេល​វេលា​ជាក់លាក់​នៅ​ចុង​ថ្ងៃ។


ឥឡូវនេះយើងអាចបន្តទៅមុខ ហើយបង្កើតការណែនាំសម្រាប់អ្នកប្រើប្រាស់ម្នាក់ៗ។ ចំពោះបញ្ហានេះយើងរកឃើញធាតុកំពូលទាំង 20 ក្នុងមួយអ្នកប្រើប្រាស់

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


នេះជារបៀបដែលលទ្ធផលមើលទៅ

ទីបំផុតយើងអាចរក្សាទុកការព្យាករណ៍ជាស្លាកដីសណ្តនៅលើ UC របស់យើង ឬបោះផ្សាយវាទៅប្រព័ន្ធខាងក្រោម Mongo DB ឬ Azure Cosmos DB។ យើងទៅជាមួយជម្រើស firs


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


ស្ទ្រីម/ការសន្និដ្ឋានតាមអ៊ីនធឺណិត

ឥឡូវស្រមៃមើលករណីដែលយើងចង់ធ្វើបច្ចុប្បន្នភាពការណែនាំរបស់យើងដោយផ្អែកលើអន្តរកម្មរបស់អ្នកប្រើប្រាស់ក្នុងពេលជាក់ស្តែង។ សម្រាប់ករណីនេះ យើងអាចប្រើការបម្រើគំរូ។ នៅពេលដែលនរណាម្នាក់ចង់ប្រើគំរូរបស់អ្នក ពួកគេអាចផ្ញើទិន្នន័យទៅកាន់ម៉ាស៊ីនមេ។ បន្ទាប់មកម៉ាស៊ីនមេបញ្ជូនទិន្នន័យនោះទៅកាន់គំរូដែលបានដាក់ឱ្យប្រើប្រាស់របស់អ្នក ដែលដំណើរការ វិភាគទិន្នន័យ និងបង្កើតការព្យាករណ៍។ ពួកវាអាចត្រូវបានប្រើនៅក្នុងកម្មវិធីគេហទំព័រ កម្មវិធីទូរស័ព្ទ ឬសូម្បីតែប្រព័ន្ធបង្កប់។ ការអនុវត្តមួយនៃវិធីសាស្រ្តនេះគឺដើម្បីបើកផ្លូវចរាចរណ៍សម្រាប់ការធ្វើតេស្ត A/B ។


ក្បួនដោះស្រាយ ALS មិនអាចប្រើដោយផ្ទាល់សម្រាប់ការសន្និដ្ឋានតាមអ៊ីនធឺណិតបានទេ ដោយសារវាទាមទារឱ្យមានការបណ្តុះបណ្តាលគំរូឡើងវិញដោយប្រើទិន្នន័យទាំងមូល (ចាស់ + ថ្មី) ដើម្បីធ្វើបច្ចុប្បន្នភាពការណែនាំ។ ក្បួនដោះស្រាយការរៀនតាមជម្រាលជម្រាល គឺជាឧទាហរណ៍នៃគំរូដែលអាចប្រើបានសម្រាប់ការធ្វើបច្ចុប្បន្នភាពតាមអ៊ីនធឺណិត។ យើង​អាច​នឹង​មើល​ទៅ​លើ​ក្បួន​ដោះស្រាយ​មួយ​ចំនួន​ក្នុង​ការ​ប្រកាស​នា​ពេល​អនាគត។


ទោះជាយ៉ាងណាក៏ដោយ ដើម្បីបង្ហាញពីរបៀបដែលគំរូបែបនេះនឹងដំណើរការ យើងកំពុងបង្កើតគំរូ (គ្មានប្រយោជន៍) បម្រើចំណុចបញ្ចប់ដែលព្យាករណ៍ការវាយតម្លៃភាពយន្តដោយផ្អែកលើនៅពេលណាដែលអ្នកប្រើប្រាស់វាយតម្លៃភាពយន្ត!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


វានឹងបង្កើត និងបង្កើតគំរូអាហារថ្ងៃត្រង់ដែលបម្រើចង្កោមសម្រាប់យើង ដូច្នេះវាត្រូវការពេលវេលាខ្លះ។ ឥឡូវនេះ ប្រសិនបើអ្នកបើកបង្អួច Serving អ្នកគួរតែឃើញចំណុចបញ្ចប់របស់អ្នក។


យើង​អាច​ប្រើ​ចំណុច​បញ្ចប់​មួយ​ដើម្បី​បម្រើ​គំរូ​ច្រើន។ បន្ទាប់មក យើង​អាច​ប្រើ​ការ​កំណត់​ផ្លូវ​ចរាចរណ៍​សម្រាប់​សេណារីយ៉ូ​ដូចជា​ការ​ធ្វើ​តេស្ត​ A/B ឬ​ប្រៀបធៀប​ការ​អនុវត្ត​នៃ​ម៉ូដែល​ខុស​គ្នា​ក្នុង​ការ​ផលិត។

តារាងសន្និដ្ឋាន

តារាងសន្និដ្ឋាននៅក្នុងគំរូ Databricks បម្រើដើរតួជាកំណត់ហេតុស្វ័យប្រវត្តិសម្រាប់ម៉ូដែលដែលបានដាក់ពង្រាយរបស់យើង។ នៅពេលបើកដំណើរការ ពួកគេចាប់យកសំណើចូល (ទិន្នន័យដែលបានផ្ញើសម្រាប់ការទស្សន៍ទាយ) លទ្ធផលគំរូដែលត្រូវគ្នា (ការព្យាករណ៍) និងទិន្នន័យមេតាមួយចំនួនផ្សេងទៀតជាតារាង Delta នៅក្នុង Unity Catalog។ យើង​អាច​ប្រើ​តារាង​សន្និដ្ឋាន​សម្រាប់ ​ការ​ត្រួត​ពិនិត្យ និង​បំបាត់​កំហុស ការ​តាម​ដាន​ពូជពង្ស និង​នីតិវិធី​ប្រមូល​ទិន្នន័យ​សម្រាប់ ​ការ​បង្ហាត់​ឡើង​វិញ​កែ​សម្រួល ​គំរូ​របស់​យើង។


យើង​អាច​បើក inference table ​នៅ​លើ​ចំណុច​បញ្ចប់​ការ​បម្រើ​របស់​យើង​ដើម្បី​ត្រួត​ពិនិត្យ​គំរូ។ យើងអាចធ្វើវាបានដោយបញ្ជាក់លក្ខណៈសម្បត្តិ auto_capture_config នៅក្នុង payload នៅពេលយើងបង្កើត endpoint ដំបូង។ ឬយើងធ្វើបច្ចុប្បន្នភាពចំណុចបញ្ចប់របស់យើងនៅពេលក្រោយដោយប្រើពាក្យបញ្ជា put និង URL ចំណុចបញ្ចប់ config ដូចខាងក្រោម (ច្រើនទៀត នៅទីនេះ )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


ឥឡូវនេះ ចូរយើងផ្តល់ព័ត៌មានដល់ចំណុចបញ្ចប់ជាមួយនឹងទិន្នន័យអន្តរកម្មរបស់អ្នកប្រើអត់ចេះសោះមួយចំនួន

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


យើង​អាច​ពិនិត្យ​មើល​កំណត់​ហេតុ​ចុង​ក្នុង <catalog>.<schema>.<payload_table> table។ វាត្រូវចំណាយពេលប្រហែល 10 នាទីរហូតដល់អ្នកអាចឃើញទិន្នន័យនៅក្នុងតារាង។


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


អ្នកគួរតែឃើញអ្វីមួយដូចនេះ តារាងបន្ទុករបស់អ្នក។

គំរូ Databricks បម្រើតារាងបន្ទុក


ដើម្បីយល់ពីគ្រោងការណ៍នៃតារាងការសន្និដ្ឋាននេះ សូមពិនិត្យមើល "គ្រោងការណ៍តារាងការសន្និដ្ឋានកាតាឡុក Unity ==" នៅទីនេះ .==


ការត្រួតពិនិត្យគំរូ

គំរូ និងទិន្នន័យត្រួតពិនិត្យប្រធានបទស្មុគស្មាញដែលទាមទារពេលវេលាច្រើនដើម្បីធ្វើជាម្ចាស់។ Databricks Lakehouse Monitoring (DLM) កាត់បន្ថយការចំណាយលើការកសាងប្រព័ន្ធត្រួតពិនិត្យត្រឹមត្រូវដោយផ្តល់នូវគំរូស្តង់ដារ និងអាចប្ដូរតាមបំណងបានសម្រាប់ករណីប្រើប្រាស់ទូទៅ។ ទោះជាយ៉ាងណាក៏ដោយ ការធ្វើជាម្ចាស់ DLM និងការត្រួតពិនិត្យគំរូជាទូទៅតម្រូវឱ្យមានការពិសោធន៍ជាច្រើន។ ខ្ញុំមិនចង់ផ្តល់ឱ្យអ្នកនូវទិដ្ឋភាពទូទៅទូលំទូលាយនៃការត្រួតពិនិត្យគំរូនៅទីនេះទេ ប៉ុន្តែផ្តល់ឱ្យអ្នកនូវចំណុចចាប់ផ្តើមមួយ។ ខ្ញុំ​អាច​នឹង​លះបង់​ប្លុក​មួយ​ទៅ​ប្រធាន​បទ​នេះ​នៅ​ពេល​អនាគត។


សេចក្តីសង្ខេបខ្លីៗនៃមុខងារ និងមុខងារ DLM

ឥឡូវនេះ យើងមានគំរូ និងដំណើរការរបស់យើង យើងអាចប្រើតារាងសន្និដ្ឋានដែលបង្កើតដោយចំណុចបញ្ចប់នៃការបម្រើរបស់យើង ដើម្បីតាមដានរង្វាស់សំខាន់ៗដូចជាការអនុវត្តគំរូ និងការរសាត់ ដើម្បីរកមើលគម្លាត ឬភាពមិនប្រក្រតីណាមួយនៅក្នុងទិន្នន័យ ឬគំរូរបស់យើងតាមពេលវេលា។ វិធីសាស្រ្តសកម្មនេះជួយយើងឱ្យចាត់វិធានការកែតម្រូវទាន់ពេលវេលា ដូចជាការបង្ហាត់គំរូឡើងវិញ ឬការធ្វើបច្ចុប្បន្នភាពលក្ខណៈពិសេសរបស់វា ដើម្បីរក្សាបាននូវប្រតិបត្តិការដ៏ល្អប្រសើរ និងស្របតាមគោលបំណងអាជីវកម្ម។


Databricks Lakehouse Monitoring Data ប្រភពស្ថាបត្យកម្ម៖ Databricks


DLM ផ្តល់នូវការវិភាគបីប្រភេទ ឬ profile typeស៊េរីពេលវេលា រូបថត និង សេចក្តីសន្និដ្ឋាន ។ ដោយសារយើងចាប់អារម្មណ៍ក្នុងការវិភាគតារាងសន្និដ្ឋានរបស់យើង យើងផ្តោតលើតារាងចុងក្រោយ។ ដើម្បីប្រើតារាងសម្រាប់ការត្រួតពិនិត្យ - " តារាងបឋម " របស់យើង យើងគួរតែប្រាកដថាតារាងមានរចនាសម្ព័ន្ធត្រឹមត្រូវ។ សម្រាប់ តារាងសន្និដ្ឋាន ជួរនីមួយៗគួរតែឆ្លើយតបទៅនឹងសំណើដែលមានជួរខាងក្រោម៖

  • លក្ខណៈពិសេសគំរូ

  • ការព្យាករណ៍គំរូ

  • លេខ​សម្គាល់​ម៉ូដែល

  • timestamp : timestamp of the inference request

  • ការពិតមូលដ្ឋាន (ស្រេចចិត្ត)


លេខសម្គាល់ម៉ូដែល មានសារៈសំខាន់សម្រាប់ករណីនៅពេលដែលយើងបម្រើម៉ូដែលជាច្រើន ហើយយើងចង់តាមដានដំណើរការនៃម៉ូដែលនីមួយៗនៅក្នុងផ្ទាំងគ្រប់គ្រងមួយ។ ប្រសិនបើមានលេខសម្គាល់គំរូច្រើនជាងមួយ DLM ប្រើវាដើម្បីបំបែកទិន្នន័យ និងគណនាម៉ែត្រ និងឋិតិវន្តសម្រាប់ចំណិតនីមួយៗដោយឡែកពីគ្នា។


DLM គណនាស្ថិតិ និងម៉ែត្រនីមួយៗសម្រាប់ចន្លោះពេលជាក់លាក់មួយ។ សម្រាប់ការវិភាគការសន្និដ្ឋាន វាបានប្រើជួរឈរ ពេលវេលា បូកនឹងទំហំបង្អួចដែលកំណត់ដោយអ្នកប្រើប្រាស់ ដើម្បីកំណត់អត្តសញ្ញាណបង្អួចពេលវេលា។ ច្រើនទៀតខាងក្រោម។


DLM គាំទ្រ problem type ​ពីរ​សម្រាប់​តារាង​ការ​សន្និដ្ឋាន​: " ចំណាត់ថ្នាក់ " ឬ " តំរែតំរង់ " ។ វាគណនារង្វាស់ និងស្ថិតិពាក់ព័ន្ធមួយចំនួនដោយផ្អែកលើការបញ្ជាក់នេះ។


ដើម្បីប្រើ DLM យើងគួរតែបង្កើតម៉ូនីទ័រ ហើយភ្ជាប់វាទៅនឹងតារាង។ នៅពេលយើងធ្វើ DLM នេះ បង្កើត metric tables ពីរ៖

  • តារាងម៉ែត្រទម្រង់ ៖ តារាងនេះមានស្ថិតិសង្ខេបដូចជា អប្បបរមា អតិបរមា ភាគរយនៃមោឃៈ និងសូន្យ។ វាក៏មានម៉ែត្របន្ថែមផងដែរ ដោយផ្អែកលើប្រភេទបញ្ហាដែលកំណត់ដោយអ្នកប្រើប្រាស់។ ឧទាហរណ៍ ភាពជាក់លាក់ ការរំលឹកឡើងវិញ និង f1_score សម្រាប់ម៉ូដែលចំណាត់ថ្នាក់ និង កំហុស mean_squared_ និង mean_average_error សម្រាប់គំរូតំរែតំរង់។

  • តារាងរង្វាស់រង្វាស់ ៖ វាមានស្ថិតិដែលវាស់វែងពីរបៀបដែលការចែកចាយទិន្នន័យបានផ្លាស់ប្តូរ តាមពេលវេលា ឬទាក់ទងទៅនឹង តម្លៃមូលដ្ឋាន (ប្រសិនបើបានផ្តល់) ។ វាគណនារង្វាស់ដូចជាការធ្វើតេស្ត Chi-square, ការធ្វើតេស្ត KS ។


ដើម្បីមើលបញ្ជីរង្វាស់ពេញលេញសម្រាប់តារាងនីមួយៗ សូមពិនិត្យមើលទំព័រឯកសារ តារាងម៉ែត្រ ។ វាក៏អាចធ្វើទៅបានផងដែរដើម្បីបង្កើត ម៉ែត្រផ្ទាល់ខ្លួន


ទិដ្ឋភាពសំខាន់នៃការកសាងប្រព័ន្ធត្រួតពិនិត្យគឺត្រូវប្រាកដថាផ្ទាំងគ្រប់គ្រងត្រួតពិនិត្យរបស់យើងមានសិទ្ធិចូលប្រើទិន្នន័យចុងក្រោយបំផុតនៅពេលពួកគេមកដល់។ សម្រាប់នោះ យើងអាចប្រើ ការស្ទ្រីមតារាង Delta ដើម្បីតាមដានជួរដែលបានដំណើរការនៅក្នុងតារាងសន្និដ្ឋាន។ យើងប្រើតារាងការសន្និដ្ឋាននៃការបម្រើគំរូជាតារាងប្រភពរបស់យើង ( readStream ) និងតារាងត្រួតពិនិត្យជាតារាងលិច ( writeStream )។ យើងក៏ត្រូវប្រាកដថា ការផ្លាស់ប្តូរការចាប់យកទិន្នន័យ (CDC) ត្រូវបានបើកនៅលើតារាងទាំងពីរ (វាត្រូវបានបើកតាមលំនាំដើមនៅលើតារាង Inference)។ វិធីនេះ យើងដំណើរការតែការផ្លាស់ប្តូរ - បញ្ចូល/អាប់ដេត/លុប - នៅក្នុងតារាងប្រភព ជាជាងដំណើរការតារាងទាំងមូលឡើងវិញរាល់ការធ្វើឱ្យស្រស់។

ដៃ

ដើម្បី​បើក​ការ​ត្រួត​ពិនិត្យ​លើ​តារាង​សន្និដ្ឋាន​របស់​យើង យើង​ធ្វើ​ជំហាន​ដូច​ខាង​ក្រោម៖

  1. អានតារាងសន្និដ្ឋានជាតារាងស្ទ្រីម
  2. បង្កើតតារាង delta ថ្មីមួយជាមួយនឹងគ្រោងការណ៍ត្រឹមត្រូវដោយពន្លាតារាងការសន្និដ្ឋានដែលត្រូវបានបង្កើតដោយចំណុចបញ្ចប់នៃការបម្រើគំរូរបស់យើង។
  3. រៀបចំតារាងមូលដ្ឋាន (ប្រសិនបើមាន)
  4. បង្កើតម៉ូនីទ័រលើតារាងលទ្ធផល និងធ្វើឱ្យម៉ែត្រឡើងវិញ
  5. រៀបចំកាលវិភាគការងារ ដើម្បីស្រាយតារាងការសន្និដ្ឋានទៅរចនាសម្ព័ន្ធត្រឹមត្រូវ និងធ្វើឱ្យម៉ែត្រឡើងវិញ


ដំបូងយើងត្រូវដំឡើង Lakehouse Monitoring API។ វាគួរតែត្រូវបានដំឡើងរួចហើយ ប្រសិនបើអ្នកប្រើ Databricks rum time 15.3 LTS និងខ្ពស់ជាងនេះ៖


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


ចូរយើងអានតារាងសន្និដ្ឋានជាតារាងផ្សាយ

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


បន្ទាប់យើងត្រូវដាក់តារាងក្នុងទម្រង់ត្រឹមត្រូវដូចបានរៀបរាប់ខាងលើ។ តារាងនេះគួរតែមានជួរមួយសម្រាប់ការទស្សន៍ទាយនីមួយៗដែលពាក់ព័ន្ធជាមួយនឹងលក្ខណៈពិសេស និងតម្លៃនៃការទស្សន៍ទាយ។ តារាងការសន្និដ្ឋានដែលយើងទទួលបានពីម៉ូដែលបម្រើចំណុចបញ្ចប់ រក្សាទុកសំណើ និងការឆ្លើយតបនៃចំណុចបញ្ចប់ជាទម្រង់ JSON ដែលត្រូវបានភ្ជាប់។ នេះគឺជាឧទាហរណ៍នៃ JSON payload សម្រាប់ជួរឈរសំណើ និងការឆ្លើយតប។

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


ដើម្បី​ស្រាយ​តារាង​នេះ​តាម​គ្រោងការណ៍​ដែល​ត្រឹមត្រូវ យើង​អាច​ប្រើ​កូដ​ខាងក្រោម​ដែល​ត្រូវ​បាន​កែសម្រួល​ពី​ឯកសារ Databricks ( តារាង​សេចក្តី​សន្និដ្ឋាន Lakehouse Monitoring starter notebook )។


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


តារាងលទ្ធផលនឹងមើលទៅដូចនេះ៖

តារាង​បន្ទុក​ត្រូវ​បាន​ពន្លា

បន្ទាប់យើងគួរតែចាប់ផ្តើមតារាងលិចរបស់យើង។

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


ហើយសរសេរលទ្ធផល

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


ជាចុងក្រោយ យើងបង្កើតតារាងមូលដ្ឋានរបស់យើង។ DLM ប្រើតារាងនេះដើម្បីគណនាការរសាត់ដោយប្រៀបធៀបការចែកចាយជួរឈរស្រដៀងគ្នានៃបន្ទាត់គោល និងគំរូបឋម។ តារាងបន្ទាត់មូលដ្ឋានគួរតែមានជួរឈរលក្ខណៈពិសេសដូចគ្នានឹងជួរឈរចម្បងក៏ដូចជាជួរឈរកំណត់អត្តសញ្ញាណគំរូដូចគ្នា។ សម្រាប់តារាងមូលដ្ឋាន យើងប្រើតារាងទស្សន៍ទាយនៃ សំណុំទិន្នន័យដែលមានសុពលភាព របស់យើង ដែលយើងរក្សាទុកមុននេះ បន្ទាប់ពីយើងបានបណ្តុះបណ្តាលគំរូរបស់យើងដោយប្រើ hyperparameter ល្អបំផុតរបស់គាត់។ ដើម្បីគណនារង្វាស់រង្វាស់ Databricks គណនារង្វាស់ទម្រង់សម្រាប់ទាំងតារាងបឋម និងតារាងមូលដ្ឋាន។ នៅទីនេះអ្នកអាចអានអំពី តារាងបឋម និងតារាងមូលដ្ឋាន


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


ឥឡូវនេះយើងត្រូវបានអានដើម្បីបង្កើតផ្ទាំងគ្រប់គ្រងរបស់យើង។ យើងអាចធ្វើវាបានដោយប្រើ UI ឬ Lakehouse Monitoring API ។ នៅទីនេះយើងប្រើជម្រើសទីពីរ:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


បន្ទាប់ពីយើងដំណើរការកូដវាត្រូវចំណាយពេលខ្លះរហូតដល់ Databricks គណនាម៉ែត្រទាំងអស់។ ដើម្បីមើលផ្ទាំងគ្រប់គ្រង សូមចូលទៅកាន់ផ្ទាំង Quality នៃតារាងលិចរបស់អ្នក (ឧទាហរណ៍ unpacked_requests_table_name )។ អ្នកគួរតែឃើញទំព័រមួយដូចខាងក្រោម។

ទិដ្ឋភាព​ត្រួត​ពិនិត្យ​គំរូ Databricks


ប្រសិនបើអ្នកចុចលើមើល refresh history អ្នកឃើញការដំណើរការ ការរង់ចាំ និងការធ្វើឱ្យស្រស់ពីមុនរបស់អ្នក។ ចុចលើ View Dashboard ដើម្បីបើកផ្ទាំងគ្រប់គ្រងរបស់អ្នក។

បន្ទះត្រួតពិនិត្យគំរូ Databricks



ដូច្នេះយើងចាប់ផ្តើមជាមួយតារាងសន្និដ្ឋាន ( my_endpoint_payload ) ដំណើរការវា ហើយរក្សាទុកលទ្ធផលទៅ my_endpoint_payload_unpacked ហើយហុចតារាងនេះ រួមជាមួយនឹងតារាងមូលដ្ឋានរបស់យើង ( base_table_als ) ទៅកាន់ API ត្រួតពិនិត្យរបស់យើង។ DLM គណនារង្វាស់ទម្រង់សម្រាប់តារាងនីមួយៗ ( my_endpoint_payload_unpacked_profile_metric ) ហើយប្រើពួកវាដើម្បីគណនារង្វាស់រង្វាស់ ( my_endpoint_payload_unpacked_drift_metrics )


អញ្ចឹងទៅ! អ្នកមានអ្វីគ្រប់យ៉ាងដែលអ្នកត្រូវការដើម្បីបម្រើ និងតាមដានគំរូរបស់អ្នក!


នៅផ្នែកបន្ទាប់ ខ្ញុំនឹងបង្ហាញអ្នកពីរបៀបធ្វើស្វ័យប្រវត្តិកម្មដំណើរការនេះដោយប្រើ Databricks Assets Bundle និង Gitlab !

នៅក្នុង ផ្នែកដំបូងនៃស៊េរីមេរៀននេះ យើងបានអនុវត្តជំហានដំបូងសម្រាប់ការកសាងបំពង់បង្ហូរប្រេង MLOps ពីចុងដល់ចុង ដោយប្រើ Databricks និង Spark ដែលដឹកនាំដោយស្ថាបត្យកម្មយោងរបស់ Databricks ។ នេះ​ជា​ការសង្ខេប​អំពី​ជំហាន​សំខាន់ៗ​ដែល​យើង​បាន​គ្របដណ្តប់៖


  • ការរៀបចំកាតាឡុក Unity សម្រាប់ស្ថាបត្យកម្ម Medallion ៖ យើងបានរៀបចំទិន្នន័យរបស់យើងទៅជាស្រទាប់សំរិទ្ធ ប្រាក់ និងមាសនៅក្នុង Unity Catalog ដោយបង្កើតប្រព័ន្ធគ្រប់គ្រងទិន្នន័យដែលមានរចនាសម្ព័ន្ធ និងមានប្រសិទ្ធភាព។

  • ការបញ្ចូលទិន្នន័យទៅក្នុងកាតាឡុក Unity ៖ យើងបានបង្ហាញពីរបៀបនាំចូលទិន្នន័យឆៅទៅក្នុងប្រព័ន្ធ ដោយធានាបាននូវភាពស៊ីសង្វាក់គ្នា និងគុណភាពសម្រាប់ដំណាក់កាលដំណើរការជាបន្តបន្ទាប់។

  • ការបណ្ដុះបណ្ដាលគំរូ ៖ ការប្រើប្រាស់ Databricks យើងបានបណ្ដុះបណ្ដាលគំរូរៀនម៉ាស៊ីនដែលស្របតាមសំណុំទិន្នន័យរបស់យើង ដោយធ្វើតាមការអនុវត្តល្អបំផុតសម្រាប់ការអភិវឌ្ឍន៍គំរូដែលអាចធ្វើមាត្រដ្ឋានបាន និងមានប្រសិទ្ធភាព។

  • ការលៃតម្រូវ Hyperparameter ជាមួយ HyperOpt ៖ ដើម្បីបង្កើនការអនុវត្តគំរូ យើងបានប្រើប្រាស់ HyperOpt ដើម្បីធ្វើស្វ័យប្រវត្តិកម្មការស្វែងរកសម្រាប់ hyperparameters ដ៏ល្អប្រសើរ ធ្វើអោយប្រសើរឡើងនូវភាពត្រឹមត្រូវ និងប្រសិទ្ធភាព។

  • ការតាមដានការសាកល្បងជាមួយ Databricks MLflow ៖ យើងបានប្រើប្រាស់ MLflow ដើម្បីកត់ត្រា និងត្រួតពិនិត្យការពិសោធន៍របស់យើង ដោយរក្សាបាននូវកំណត់ត្រាដ៏ទូលំទូលាយនៃកំណែគំរូ រង្វាស់ និងប៉ារ៉ាម៉ែត្រសម្រាប់ងាយស្រួលប្រៀបធៀប និងផលិតឡើងវិញ។


ជាមួយនឹងជំហានជាមូលដ្ឋានទាំងនេះត្រូវបានបញ្ចប់ គំរូរបស់អ្នកឥឡូវនេះត្រូវបានចាប់ផ្តើមសម្រាប់ការដាក់ឱ្យប្រើប្រាស់។ នៅក្នុងផ្នែកទីពីរនេះ យើងនឹងផ្តោតលើការរួមបញ្ចូលសមាសធាតុសំខាន់ពីរទៅក្នុងប្រព័ន្ធរបស់យើង៖


  1. ការសន្និដ្ឋានជាបាច់ ៖ ការអនុវត្តដំណើរការបណ្តុំដើម្បីបង្កើតការព្យាករណ៍លើសំណុំទិន្នន័យធំ ដែលសមរម្យសម្រាប់កម្មវិធីដូចជា ការដាក់ពិន្ទុច្រើន និងការរាយការណ៍តាមកាលកំណត់។
  2. ការសន្និដ្ឋានតាមអ៊ីនធឺណិត (ការបម្រើគំរូ) ៖ ការបង្កើតការបម្រើគំរូតាមពេលវេលាជាក់ស្តែង ដើម្បីផ្តល់នូវការព្យាករណ៍ភ្លាមៗ ដែលមានសារៈសំខាន់សម្រាប់កម្មវិធី និងសេវាកម្មអន្តរកម្ម។
  3. ការត្រួតពិនិត្យគំរូ៖ ដើម្បីធានាថាម៉ូដែលដែលបានដាក់ពង្រាយរបស់អ្នករក្សាបាននូវដំណើរការល្អបំផុត និងភាពជឿជាក់តាមពេលវេលា។


តោះចូលទៅក្នុងវា!

ការដាក់ពង្រាយគំរូ

ចំណុចចេញដំណើរនៃប្លុកចុងក្រោយគឺការវាយតម្លៃគំរូ។ ឥឡូវ​ស្រមៃ​ថា យើង​បាន​ធ្វើការ​ប្រៀបធៀប ហើយ​បាន​រក​ឃើញ​ថា​ម៉ូដែល​របស់​យើង​បង្ហាញ​ពី​ដំណើរការ​ខ្ពស់​ជាង​ធៀប​នឹង​ម៉ូដែល​ផលិតកម្ម​នេះ។ ដូចដែលយើងចង់ (សន្មត់) ដើម្បីប្រើគំរូនៅក្នុងផលិតកម្ម យើងចង់ទាញយកអត្ថប្រយោជន៍ពីទិន្នន័យទាំងអស់ដែលយើងមាន។ ជំហានបន្ទាប់គឺការបណ្តុះបណ្តាល និងសាកល្បងគំរូដោយប្រើសំណុំទិន្នន័យពេញលេញ។ បន្ទាប់​មក​បន្ត​ម៉ូដែល​របស់​យើង​សម្រាប់​ការ​ប្រើ​ពេល​ក្រោយ​ដោយ​ប្រើ​វា​ជា​គំរូ​ជើងឯក​របស់​យើង។ ដោយសារនេះគឺជាគំរូចុងក្រោយដែលយើងចង់ប្រើសម្រាប់ការសន្និដ្ឋាន យើងប្រើម៉ាស៊ីនភ្ញៀវវិស្វកម្មលក្ខណៈពិសេសដើម្បីបណ្តុះបណ្តាលគំរូ។ វិធីនេះ​យើង​មិន​ត្រឹម​តែ​តាម​ដាន​ត្រកូល​គំរូ​ដែល​ងាយ​ស្រួល​ជាង​មុន​ប៉ុណ្ណោះ​ទេ ប៉ុន្តែ​ថែម​ទាំង​បិទ​ដំណើរការ​សុពលភាព​គ្រោងការណ៍ និង​ការ​បំប្លែង​លក្ខណៈ​ពិសេស (បើ​មាន) ទៅ​ឲ្យ​អតិថិជន។


 with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)


យើងក៏អាចប្រើ Feature Store ឬ Feature Engineering APIs ដើម្បីបណ្តុះបណ្តាល និងកត់ត្រាម៉ូដែល

 model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )


នៅពេលយើងប្រើ API វិស្វកម្មលក្ខណៈពិសេស យើងអាចមើលត្រកូលរបស់ម៉ូដែលនៅក្នុង Catalog Explorer

ជួរទិន្នន័យនៅក្នុង Dataticks Unity Catalog


ឥឡូវនេះអនុញ្ញាតឱ្យអាប់ដេតការពណ៌នាគំរូ និងកំណត់ស្លាកជើងឯកទៅវា។

 import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)


ឥឡូវ​នេះ​ទៅ​មុខ ហើយ​ពិនិត្យ​មើល​គ្រោងការណ៍​ដែល​អ្នក​បាន​ចុះ​ឈ្មោះ​ម៉ូដែល។ អ្នកគួរតែឃើញការអាប់ដេតរបស់អ្នកទាំងអស់ដូចខាងក្រោម

ការចុះបញ្ជីគំរូនៅក្នុងកាតាឡុក Databricks Unity

ដំណាក់កាលគំរូ ៖ ប្រសិនបើអ្នកប្រើកន្លែងធ្វើការសម្រាប់ការចុះបញ្ជីគំរូ អ្នកគួរដំណាក់កាលដើម្បីគ្រប់គ្រងគំរូរបស់អ្នក។ ការប្រើឈ្មោះក្លែងក្លាយនឹងមិនដំណើរការទេ។ សូមពិនិត្យមើល នៅទីនេះ ដើម្បីមើលពីរបៀបដែលវាដំណើរការ

ការសន្និដ្ឋានគំរូ

ការដាក់ពិន្ទុជាបាច់

ឥឡូវនេះស្រមៃថាយើងចង់ប្រើគំរូរបស់យើងនៅក្នុងផលិតកម្មសម្រាប់ការសន្និដ្ឋាន។ នៅក្នុងជំហាននេះ យើងផ្ទុកគំរូជើងឯក ហើយប្រើវាដើម្បីបង្កើតការណែនាំភាពយន្តចំនួន 20 សម្រាប់អ្នកប្រើប្រាស់ម្នាក់ៗ។


 from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)


ហើយអ្នកអាចមើលឃើញថាយើងបានប្រើទិន្នន័យបណ្តុះបណ្តាលដូចគ្នាសម្រាប់ការដាក់ពិន្ទុជាក្រុម។ ទោះបីជាក្នុងករណីប្រព័ន្ធណែនាំវាសមហេតុផលក៏ដោយ នៅក្នុងកម្មវិធីភាគច្រើន យើងចង់ប្រើគំរូដើម្បីដាក់ពិន្ទុទិន្នន័យដែលមើលមិនឃើញមួយចំនួន។ ឧទាហរណ៍ រូបភាពរបស់អ្នកគឺជាក្រុមហ៊ុន Netflix ហើយចង់ធ្វើបច្ចុប្បន្នភាពការណែនាំរបស់អ្នកប្រើប្រាស់នៅចុងថ្ងៃដោយផ្អែកលើបញ្ជីដែលបានមើលថ្មីរបស់ពួកគេ។ យើង​អាច​កំណត់​ពេល​ការងារ​ដែល​ដំណើរការ​ការ​ដាក់​ពិន្ទុ​តាម​ពេល​វេលា​ជាក់លាក់​នៅ​ចុង​ថ្ងៃ។


ឥឡូវនេះយើងអាចបន្តទៅមុខ ហើយបង្កើតការណែនាំសម្រាប់អ្នកប្រើប្រាស់ម្នាក់ៗ។ ចំពោះបញ្ហានេះយើងរកឃើញធាតុកំពូលទាំង 20 ក្នុងមួយអ្នកប្រើប្រាស់

 from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))


នេះជារបៀបដែលលទ្ធផលមើលទៅ

ទីបំផុតយើងអាចរក្សាទុកការព្យាករណ៍ជាស្លាកដីសណ្តនៅលើ UC របស់យើង ឬបោះផ្សាយវាទៅប្រព័ន្ធខាងក្រោម Mongo DB ឬ Azure Cosmos DB។ យើងទៅជាមួយជម្រើស firs


 df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")


ស្ទ្រីម/ការសន្និដ្ឋានតាមអ៊ីនធឺណិត

ឥឡូវស្រមៃមើលករណីដែលយើងចង់ធ្វើបច្ចុប្បន្នភាពការណែនាំរបស់យើងដោយផ្អែកលើអន្តរកម្មរបស់អ្នកប្រើប្រាស់ក្នុងពេលជាក់ស្តែង។ សម្រាប់ករណីនេះ យើងអាចប្រើការបម្រើគំរូ។ នៅពេលដែលនរណាម្នាក់ចង់ប្រើគំរូរបស់អ្នក ពួកគេអាចផ្ញើទិន្នន័យទៅកាន់ម៉ាស៊ីនមេ។ បន្ទាប់មកម៉ាស៊ីនមេបញ្ជូនទិន្នន័យនោះទៅកាន់គំរូដែលបានដាក់ឱ្យប្រើប្រាស់របស់អ្នក ដែលដំណើរការ វិភាគទិន្នន័យ និងបង្កើតការព្យាករណ៍។ ពួកវាអាចត្រូវបានប្រើនៅក្នុងកម្មវិធីគេហទំព័រ កម្មវិធីទូរស័ព្ទ ឬសូម្បីតែប្រព័ន្ធបង្កប់។ ការអនុវត្តមួយនៃវិធីសាស្រ្តនេះគឺដើម្បីបើកផ្លូវចរាចរណ៍សម្រាប់ការធ្វើតេស្ត A/B ។


ក្បួនដោះស្រាយ ALS មិនអាចប្រើដោយផ្ទាល់សម្រាប់ការសន្និដ្ឋានតាមអ៊ីនធឺណិតបានទេ ដោយសារវាទាមទារឱ្យមានការបណ្តុះបណ្តាលគំរូឡើងវិញដោយប្រើទិន្នន័យទាំងមូល (ចាស់ + ថ្មី) ដើម្បីធ្វើបច្ចុប្បន្នភាពការណែនាំ។ ក្បួនដោះស្រាយការរៀនតាមជម្រាលជម្រាល គឺជាឧទាហរណ៍នៃគំរូដែលអាចប្រើបានសម្រាប់ការធ្វើបច្ចុប្បន្នភាពតាមអ៊ីនធឺណិត។ យើង​អាច​នឹង​មើល​ទៅ​លើ​ក្បួន​ដោះស្រាយ​មួយ​ចំនួន​ក្នុង​ការ​ប្រកាស​នា​ពេល​អនាគត។


ទោះជាយ៉ាងណាក៏ដោយ ដើម្បីបង្ហាញពីរបៀបដែលគំរូបែបនេះនឹងដំណើរការ យើងកំពុងបង្កើតគំរូ (គ្មានប្រយោជន៍) បម្រើចំណុចបញ្ចប់ដែលព្យាករណ៍ការវាយតម្លៃភាពយន្តដោយផ្អែកលើនៅពេលណាដែលអ្នកប្រើប្រាស់វាយតម្លៃភាពយន្ត!


 import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )


វានឹងបង្កើត និងបង្កើតគំរូអាហារថ្ងៃត្រង់ដែលបម្រើចង្កោមសម្រាប់យើង ដូច្នេះវាត្រូវការពេលវេលាខ្លះ។ ឥឡូវនេះ ប្រសិនបើអ្នកបើកបង្អួច Serving អ្នកគួរតែឃើញចំណុចបញ្ចប់របស់អ្នក។


យើង​អាច​ប្រើ​ចំណុច​បញ្ចប់​មួយ​ដើម្បី​បម្រើ​គំរូ​ច្រើន។ បន្ទាប់មក យើង​អាច​ប្រើ​ការ​កំណត់​ផ្លូវ​ចរាចរណ៍​សម្រាប់​សេណារីយ៉ូ​ដូចជា​ការ​ធ្វើ​តេស្ត​ A/B ឬ​ប្រៀបធៀប​ការ​អនុវត្ត​នៃ​ម៉ូដែល​ខុស​គ្នា​ក្នុង​ការ​ផលិត។

តារាងសន្និដ្ឋាន

តារាងសន្និដ្ឋាននៅក្នុងគំរូ Databricks បម្រើដើរតួជាកំណត់ហេតុស្វ័យប្រវត្តិសម្រាប់ម៉ូដែលដែលបានដាក់ពង្រាយរបស់យើង។ នៅពេលបើកដំណើរការ ពួកគេចាប់យកសំណើចូល (ទិន្នន័យដែលបានផ្ញើសម្រាប់ការទស្សន៍ទាយ) លទ្ធផលគំរូដែលត្រូវគ្នា (ការព្យាករណ៍) និងទិន្នន័យមេតាមួយចំនួនផ្សេងទៀតជាតារាង Delta នៅក្នុង Unity Catalog។ យើង​អាច​ប្រើ​តារាង​សន្និដ្ឋាន​សម្រាប់ ​ការ​ត្រួត​ពិនិត្យ និង​បំបាត់​កំហុស ការ​តាម​ដាន​ពូជពង្ស និង​នីតិវិធី​ប្រមូល​ទិន្នន័យ​សម្រាប់ ​ការ​បង្ហាត់​ឡើង​វិញ​កែ​សម្រួល ​គំរូ​របស់​យើង។


យើង​អាច​បើក inference table ​នៅ​លើ​ចំណុច​បញ្ចប់​ការ​បម្រើ​របស់​យើង​ដើម្បី​ត្រួត​ពិនិត្យ​គំរូ។ យើងអាចធ្វើវាបានដោយបញ្ជាក់លក្ខណៈសម្បត្តិ auto_capture_config នៅក្នុង payload នៅពេលយើងបង្កើត endpoint ដំបូង។ ឬយើងធ្វើបច្ចុប្បន្នភាពចំណុចបញ្ចប់របស់យើងនៅពេលក្រោយដោយប្រើពាក្យបញ្ជា put និង URL ចំណុចបញ្ចប់ config ដូចខាងក្រោម (ច្រើនទៀត នៅទីនេះ )


 data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))


ឥឡូវនេះ ចូរយើងផ្តល់ព័ត៌មានដល់ចំណុចបញ្ចប់ជាមួយនឹងទិន្នន័យអន្តរកម្មរបស់អ្នកប្រើអត់ចេះសោះមួយចំនួន

 import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))


យើង​អាច​ពិនិត្យ​មើល​កំណត់​ហេតុ​ចុង​ក្នុង <catalog>.<schema>.<payload_table> table។ វាត្រូវចំណាយពេលប្រហែល 10 នាទីរហូតដល់អ្នកអាចឃើញទិន្នន័យនៅក្នុងតារាង។


 table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )


អ្នកគួរតែឃើញអ្វីមួយដូចនេះ តារាងបន្ទុករបស់អ្នក។

គំរូ Databricks បម្រើតារាងបន្ទុក


ដើម្បីយល់ពីគ្រោងការណ៍នៃតារាងការសន្និដ្ឋាននេះ សូមពិនិត្យមើល "គ្រោងការណ៍តារាងការសន្និដ្ឋានកាតាឡុក Unity ==" នៅទីនេះ .==


ការត្រួតពិនិត្យគំរូ

គំរូ និងទិន្នន័យត្រួតពិនិត្យប្រធានបទស្មុគស្មាញដែលទាមទារពេលវេលាច្រើនដើម្បីធ្វើជាម្ចាស់។ Databricks Lakehouse Monitoring (DLM) កាត់បន្ថយការចំណាយលើការកសាងប្រព័ន្ធត្រួតពិនិត្យត្រឹមត្រូវដោយផ្តល់នូវគំរូស្តង់ដារ និងអាចប្ដូរតាមបំណងបានសម្រាប់ករណីប្រើប្រាស់ទូទៅ។ ទោះជាយ៉ាងណាក៏ដោយ ការធ្វើជាម្ចាស់ DLM និងការត្រួតពិនិត្យគំរូជាទូទៅតម្រូវឱ្យមានការពិសោធន៍ជាច្រើន។ ខ្ញុំមិនចង់ផ្តល់ឱ្យអ្នកនូវទិដ្ឋភាពទូទៅទូលំទូលាយនៃការត្រួតពិនិត្យគំរូនៅទីនេះទេ ប៉ុន្តែផ្តល់ឱ្យអ្នកនូវចំណុចចាប់ផ្តើមមួយ។ ខ្ញុំ​អាច​នឹង​លះបង់​ប្លុក​មួយ​ទៅ​ប្រធាន​បទ​នេះ​នៅ​ពេល​អនាគត។


សេចក្តីសង្ខេបខ្លីៗនៃមុខងារ និងមុខងារ DLM

ឥឡូវនេះ យើងមានគំរូ និងដំណើរការរបស់យើង យើងអាចប្រើតារាងសន្និដ្ឋានដែលបង្កើតដោយចំណុចបញ្ចប់នៃការបម្រើរបស់យើង ដើម្បីតាមដានរង្វាស់សំខាន់ៗដូចជាការអនុវត្តគំរូ និងការរសាត់ ដើម្បីរកមើលគម្លាត ឬភាពមិនប្រក្រតីណាមួយនៅក្នុងទិន្នន័យ ឬគំរូរបស់យើងតាមពេលវេលា។ វិធីសាស្រ្តសកម្មនេះជួយយើងឱ្យចាត់វិធានការកែតម្រូវទាន់ពេលវេលា ដូចជាការបង្ហាត់គំរូឡើងវិញ ឬការធ្វើបច្ចុប្បន្នភាពលក្ខណៈពិសេសរបស់វា ដើម្បីរក្សាបាននូវប្រតិបត្តិការដ៏ល្អប្រសើរ និងស្របតាមគោលបំណងអាជីវកម្ម។


Databricks Lakehouse Monitoring Data ប្រភពស្ថាបត្យកម្ម៖ Databricks


DLM ផ្តល់នូវការវិភាគបីប្រភេទ ឬ profile typeស៊េរីពេលវេលា រូបថត និង សេចក្តីសន្និដ្ឋាន ។ ដោយសារយើងចាប់អារម្មណ៍ក្នុងការវិភាគតារាងសន្និដ្ឋានរបស់យើង យើងផ្តោតលើតារាងចុងក្រោយ។ ដើម្បីប្រើតារាងសម្រាប់ការត្រួតពិនិត្យ - " តារាងបឋម " របស់យើង យើងគួរតែប្រាកដថាតារាងមានរចនាសម្ព័ន្ធត្រឹមត្រូវ។ សម្រាប់ តារាងសន្និដ្ឋាន ជួរនីមួយៗគួរតែឆ្លើយតបទៅនឹងសំណើដែលមានជួរខាងក្រោម៖

  • លក្ខណៈពិសេសគំរូ

  • ការព្យាករណ៍គំរូ

  • លេខ​សម្គាល់​ម៉ូដែល

  • timestamp : timestamp of the inference request

  • ការពិតមូលដ្ឋាន (ស្រេចចិត្ត)


លេខសម្គាល់ម៉ូដែល មានសារៈសំខាន់សម្រាប់ករណីនៅពេលដែលយើងបម្រើម៉ូដែលជាច្រើន ហើយយើងចង់តាមដានដំណើរការនៃម៉ូដែលនីមួយៗនៅក្នុងផ្ទាំងគ្រប់គ្រងមួយ។ ប្រសិនបើមានលេខសម្គាល់គំរូច្រើនជាងមួយ DLM ប្រើវាដើម្បីបំបែកទិន្នន័យ និងគណនាម៉ែត្រ និងឋិតិវន្តសម្រាប់ចំណិតនីមួយៗដាច់ដោយឡែក


DLM គណនាស្ថិតិ និងម៉ែត្រនីមួយៗសម្រាប់ចន្លោះពេលជាក់លាក់មួយ។ សម្រាប់ការវិភាគការសន្និដ្ឋាន វាបានប្រើជួរឈរ ពេលវេលា បូកនឹងទំហំបង្អួចដែលកំណត់ដោយអ្នកប្រើប្រាស់ដើម្បីកំណត់អត្តសញ្ញាណបង្អួចពេលវេលា។ ច្រើនទៀតខាងក្រោម។


DLM គាំទ្រ problem type ​ពីរ​សម្រាប់​តារាង​ការ​សន្និដ្ឋាន៖ " ការចាត់ថ្នាក់ " ឬ " តំរែតំរង់ "។ វាគណនារង្វាស់ និងស្ថិតិពាក់ព័ន្ធមួយចំនួនដោយផ្អែកលើការបញ្ជាក់នេះ។


ដើម្បីប្រើ DLM យើងគួរតែបង្កើតម៉ូនីទ័រ ហើយភ្ជាប់វាទៅនឹងតារាង។ នៅពេលយើងធ្វើ DLM នេះ បង្កើត metric tables ពីរ៖

  • តារាងម៉ែត្រទម្រង់ ៖ តារាងនេះមានស្ថិតិសង្ខេបដូចជា អប្បបរមា អតិបរមា ភាគរយនៃមោឃៈ និងសូន្យ។ វាក៏មានម៉ែត្របន្ថែមផងដែរ ដោយផ្អែកលើប្រភេទបញ្ហាដែលកំណត់ដោយអ្នកប្រើប្រាស់។ ឧទាហរណ៍ ភាពជាក់លាក់ ការរំលឹកឡើងវិញ និង f1_score សម្រាប់ម៉ូដែលចំណាត់ថ្នាក់ និង កំហុស mean_squared_ និង mean_average_error សម្រាប់គំរូតំរែតំរង់។

  • តារាងរង្វាស់រង្វាស់ ៖ វាមានស្ថិតិដែលវាស់វែងពីរបៀបដែលការចែកចាយទិន្នន័យបានផ្លាស់ប្តូរ តាមពេលវេលា ឬទាក់ទងទៅនឹង តម្លៃមូលដ្ឋាន (ប្រសិនបើបានផ្តល់) ។ វាគណនារង្វាស់ដូចជាការធ្វើតេស្ត Chi-square, ការធ្វើតេស្ត KS ។


ដើម្បីមើលបញ្ជីរង្វាស់ពេញលេញសម្រាប់តារាងនីមួយៗ សូមពិនិត្យមើលទំព័រឯកសារ តារាងម៉ែត្រ ។ វាក៏អាចធ្វើទៅបានផងដែរដើម្បីបង្កើត ម៉ែត្រផ្ទាល់ខ្លួន


ទិដ្ឋភាពសំខាន់នៃការកសាងប្រព័ន្ធត្រួតពិនិត្យគឺត្រូវប្រាកដថាផ្ទាំងគ្រប់គ្រងត្រួតពិនិត្យរបស់យើងមានសិទ្ធិចូលប្រើទិន្នន័យចុងក្រោយបំផុតនៅពេលពួកគេមកដល់។ សម្រាប់នោះ យើងអាចប្រើ ការស្ទ្រីមតារាង Delta ដើម្បីតាមដានជួរដែលបានដំណើរការនៅក្នុងតារាងសន្និដ្ឋាន។ យើងប្រើតារាងការសន្និដ្ឋាននៃការបម្រើគំរូជាតារាងប្រភពរបស់យើង ( readStream ) និងតារាងត្រួតពិនិត្យជាតារាងលិច ( writeStream )។ យើងក៏ត្រូវប្រាកដថា ការផ្លាស់ប្តូរការចាប់យកទិន្នន័យ (CDC) ត្រូវបានបើកនៅលើតារាងទាំងពីរ (វាត្រូវបានបើកតាមលំនាំដើមនៅលើតារាង Inference)។ វិធីនេះ យើងដំណើរការតែការផ្លាស់ប្តូរ - បញ្ចូល/អាប់ដេត/លុប - នៅក្នុងតារាងប្រភព ជាជាងដំណើរការតារាងទាំងមូលឡើងវិញរាល់ការធ្វើឱ្យស្រស់។

ដៃ

ដើម្បី​បើក​ការ​ត្រួត​ពិនិត្យ​លើ​តារាង​សន្និដ្ឋាន​របស់​យើង យើង​ធ្វើ​ជំហាន​ដូច​ខាង​ក្រោម៖

  1. អានតារាងសន្និដ្ឋានជាតារាងស្ទ្រីម
  2. បង្កើតតារាង delta ថ្មីមួយជាមួយនឹងគ្រោងការណ៍ត្រឹមត្រូវដោយពន្លាតារាងការសន្និដ្ឋានដែលត្រូវបានបង្កើតដោយចំណុចបញ្ចប់នៃការបម្រើគំរូរបស់យើង។
  3. រៀបចំតារាងមូលដ្ឋាន (ប្រសិនបើមាន)
  4. បង្កើតម៉ូនីទ័រលើតារាងលទ្ធផល និងធ្វើឱ្យម៉ែត្រឡើងវិញ
  5. រៀបចំកាលវិភាគការងារ ដើម្បីស្រាយតារាងការសន្និដ្ឋានទៅរចនាសម្ព័ន្ធត្រឹមត្រូវ និងធ្វើឱ្យម៉ែត្រឡើងវិញ


ដំបូងយើងត្រូវដំឡើង Lakehouse Monitoring API។ វាគួរតែត្រូវបានដំឡើងរួចហើយ ប្រសិនបើអ្នកប្រើ Databricks rum time 15.3 LTS និងខ្ពស់ជាងនេះ៖


 %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()


ចូរយើងអានតារាងសន្និដ្ឋានជាតារាងផ្សាយ

 requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True


បន្ទាប់យើងត្រូវដាក់តារាងក្នុងទម្រង់ត្រឹមត្រូវដូចបានរៀបរាប់ខាងលើ។ តារាងនេះគួរតែមានជួរមួយសម្រាប់ការទស្សន៍ទាយនីមួយៗដែលពាក់ព័ន្ធជាមួយនឹងលក្ខណៈពិសេស និងតម្លៃនៃការទស្សន៍ទាយ។ តារាងការសន្និដ្ឋានដែលយើងទទួលបានពីម៉ូដែលបម្រើចំណុចបញ្ចប់ រក្សាទុកសំណើ និងការឆ្លើយតបនៃចំណុចបញ្ចប់ជាទម្រង់ JSON ដែលត្រូវបានភ្ជាប់។ នេះគឺជាឧទាហរណ៍នៃ JSON payload សម្រាប់ជួរឈរសំណើ និងការឆ្លើយតប។

 #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |


ដើម្បី​ស្រាយ​តារាង​នេះ​តាម​គ្រោងការណ៍​ដែល​ត្រឹមត្រូវ យើង​អាច​ប្រើ​កូដ​ខាងក្រោម​ដែល​ត្រូវ​បាន​កែសម្រួល​ពី​ឯកសារ Databricks ( តារាង​សេចក្តី​សន្និដ្ឋាន Lakehouse Monitoring starter notebook )។


 # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned


តារាងលទ្ធផលនឹងមើលទៅដូចនេះ៖

តារាង​បន្ទុក​ត្រូវ​បាន​ពន្លា

បន្ទាប់យើងគួរតែចាប់ផ្តើមតារាងលិចរបស់យើង។

 dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()


ហើយសរសេរលទ្ធផល

 checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \


ជាចុងក្រោយ យើងបង្កើតតារាងមូលដ្ឋានរបស់យើង។ DLM ប្រើតារាងនេះដើម្បីគណនាការរសាត់ដោយប្រៀបធៀបការចែកចាយជួរឈរស្រដៀងគ្នានៃបន្ទាត់គោល និងគំរូបឋម។ តារាងបន្ទាត់មូលដ្ឋានគួរតែមានជួរឈរលក្ខណៈពិសេសដូចគ្នានឹងជួរឈរចម្បងក៏ដូចជាជួរឈរកំណត់អត្តសញ្ញាណគំរូដូចគ្នា។ សម្រាប់តារាងមូលដ្ឋាន យើងប្រើតារាងទស្សន៍ទាយនៃ សំណុំទិន្នន័យដែលមានសុពលភាព របស់យើង ដែលយើងរក្សាទុកមុននេះ បន្ទាប់ពីយើងបានបណ្តុះបណ្តាលគំរូរបស់យើងដោយប្រើ hyperparameter ល្អបំផុតរបស់គាត់។ ដើម្បីគណនារង្វាស់រង្វាស់ Databricks គណនារង្វាស់ទម្រង់សម្រាប់ទាំងតារាងបឋម និងតារាងមូលដ្ឋាន។ នៅទីនេះអ្នកអាចអានអំពី តារាងបឋម និងតារាងមូលដ្ឋាន


 #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


ឥឡូវនេះយើងត្រូវបានអានដើម្បីបង្កើតផ្ទាំងគ្រប់គ្រងរបស់យើង។ យើងអាចធ្វើវាបានដោយប្រើ UI ឬ Lakehouse Monitoring API ។ នៅទីនេះយើងប្រើជម្រើសទីពីរ:

 # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)


បន្ទាប់ពីយើងដំណើរការកូដវាត្រូវចំណាយពេលខ្លះរហូតដល់ Databricks គណនាម៉ែត្រទាំងអស់។ ដើម្បីមើលផ្ទាំងគ្រប់គ្រង សូមចូលទៅកាន់ផ្ទាំង Quality នៃតារាងលិចរបស់អ្នក (ឧទាហរណ៍ unpacked_requests_table_name )។ អ្នកគួរតែឃើញទំព័រមួយដូចខាងក្រោម។

ទិដ្ឋភាព​ត្រួត​ពិនិត្យ​គំរូ Databricks


ប្រសិនបើអ្នកចុចលើមើល refresh history អ្នកឃើញការដំណើរការ ការរង់ចាំ និងការធ្វើឱ្យស្រស់ពីមុនរបស់អ្នក។ ចុចលើ View Dashboard ដើម្បីបើកផ្ទាំងគ្រប់គ្រងរបស់អ្នក។

បន្ទះត្រួតពិនិត្យគំរូ Databricks



ដូច្នេះយើងចាប់ផ្តើមជាមួយតារាងសន្និដ្ឋាន ( my_endpoint_payload ) ដំណើរការវា ហើយរក្សាទុកលទ្ធផលទៅ my_endpoint_payload_unpacked ហើយហុចតារាងនេះ រួមជាមួយនឹងតារាងមូលដ្ឋានរបស់យើង ( base_table_als ) ទៅកាន់ API ត្រួតពិនិត្យរបស់យើង។ DLM គណនារង្វាស់ទម្រង់សម្រាប់តារាងនីមួយៗ ( my_endpoint_payload_unpacked_profile_metric ) ហើយប្រើពួកវាដើម្បីគណនារង្វាស់រង្វាស់ ( my_endpoint_payload_unpacked_drift_metrics )


អញ្ចឹងទៅ! អ្នកមានអ្វីគ្រប់យ៉ាងដែលអ្នកត្រូវការដើម្បីបម្រើ និងតាមដានគំរូរបស់អ្នក!


នៅផ្នែកបន្ទាប់ ខ្ញុំនឹងបង្ហាញអ្នកពីរបៀបធ្វើស្វ័យប្រវត្តិកម្មដំណើរការនេះដោយប្រើ Databricks Assets Bundle និង Gitlab !

L O A D I N G
. . . comments & more!

About Author

Mohsen Jadidi HackerNoon profile picture
Mohsen Jadidi@neshom
tech, culture, economy and collective work

ព្យួរស្លាក

អត្ថបទនេះត្រូវបានបង្ហាញនៅក្នុង...