இந்த டுடோரியல் தொடரின் முதல் பகுதியில் , டேட்டாபிரிக்ஸ் மற்றும் ஸ்பார்க்கைப் பயன்படுத்தி, டேட்டாபிரிக்ஸின் குறிப்புக் கட்டமைப்பால் வழிநடத்தப்படும் எம்எல்ஓப்ஸ் பைப்லைனை உருவாக்குவதற்கான முதல் படிகளை எடுத்தோம். நாங்கள் உள்ளடக்கிய முக்கிய படிகளின் மறுபரிசீலனை இங்கே:
மெடாலியன் கட்டிடக்கலைக்கான யூனிட்டி கேடலாக்கை அமைத்தல் : யூனிட்டி கேடலாக்கில் எங்கள் தரவை வெண்கலம், வெள்ளி மற்றும் தங்க அடுக்குகளாக ஒழுங்கமைத்து, கட்டமைக்கப்பட்ட மற்றும் திறமையான தரவு மேலாண்மை அமைப்பை உருவாக்குகிறோம்.
யூனிட்டி கேடலாக்கில் தரவை உள்வாங்குதல் : கணினியில் மூலத் தரவை எவ்வாறு இறக்குமதி செய்வது என்பதை நாங்கள் நிரூபித்தோம், அடுத்தடுத்த செயலாக்க நிலைகளுக்கு நிலைத்தன்மை மற்றும் தரத்தை உறுதி செய்தோம்.
மாதிரியைப் பயிற்றுவித்தல் : டேட்டாபிரிக்குகளைப் பயன்படுத்தி, அளவிடக்கூடிய மற்றும் பயனுள்ள மாதிரி மேம்பாட்டிற்கான சிறந்த நடைமுறைகளைப் பின்பற்றி, எங்கள் தரவுத்தொகுப்புக்கு ஏற்றவாறு இயந்திரக் கற்றல் மாதிரியைப் பயிற்றுவித்தோம்.
ஹைப்பர்ஆப்ட் மூலம் ஹைபர்பாராமீட்டர் ட்யூனிங் : மாதிரி செயல்திறனை மேம்படுத்த, துல்லியம் மற்றும் செயல்திறனை மேம்படுத்த, உகந்த ஹைப்பர் பாராமீட்டர்களுக்கான தேடலை தானியக்கமாக்க ஹைப்பர்ஆப்ட்டைப் பயன்படுத்தினோம்.
டேட்டாபிரிக்ஸ் எம்எல்ஃப்ளோவுடன் சோதனை கண்காணிப்பு : எம்எல்ஃப்ளோவைப் பதிவுசெய்து, எங்கள் சோதனைகளை கண்காணிக்க பயன்படுத்தினோம், மாடல் பதிப்புகள், அளவீடுகள் மற்றும் அளவுருக்கள் ஆகியவற்றை எளிதாக ஒப்பிட்டுப் பார்க்கவும், மீண்டும் உருவாக்கவும் முடியும்.
இந்த அடிப்படைப் படிகள் முடிவடைந்த நிலையில், உங்கள் மாதிரி இப்போது வரிசைப்படுத்துதலுக்கு முதன்மையானது. இந்த இரண்டாவது பகுதியில், எங்கள் கணினியில் இரண்டு முக்கியமான கூறுகளை ஒருங்கிணைப்பதில் கவனம் செலுத்துவோம்:
அதற்குள் நுழைவோம்!
கடைசி வலைப்பதிவின் புறப்பாடு மாதிரி மதிப்பீடு ஆகும். இப்போது நாம் ஒப்பீடு செய்தோம் என்று கற்பனை செய்து பாருங்கள், இந்த தயாரிப்பு மாதிரியுடன் ஒப்பிடும்போது எங்கள் மாடல் அதிக செயல்திறனைக் காட்டுகிறது. உற்பத்தியில் மாடலைப் பயன்படுத்த விரும்புவதால் (ஊகிக்க), எங்களிடம் உள்ள எல்லா தரவையும் பயன்படுத்திக் கொள்ள விரும்புகிறோம். அடுத்த படி முழு தரவுத்தொகுப்பைப் பயன்படுத்தி மாதிரியைப் பயிற்றுவித்து சோதிப்பதாகும். பின்னர் எங்கள் மாடலை எங்கள் சாம்பியன் மாடலாக பயன்படுத்துவதன் மூலம் பின்னர் பயன்படுத்தவும். அனுமானத்திற்காக நாங்கள் பயன்படுத்த விரும்பும் இறுதி மாதிரி இது என்பதால், மாதிரியைப் பயிற்றுவிக்க அம்ச பொறியியல் கிளையண்டைப் பயன்படுத்துகிறோம். இந்த வழியில் நாங்கள் மாதிரி வரிசையை எளிதாகக் கண்காணிப்பது மட்டுமல்லாமல், ஸ்கீமா சரிபார்ப்பு மற்றும் அம்ச மாற்றத்தை (ஏதேனும் இருந்தால்) கிளையண்டிற்கு ஆஃப்லோட் செய்கிறோம்.
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
மாடல்களைப் பயிற்றுவிப்பதற்கும் பதிவு செய்வதற்கும் நாங்கள் ஃபீச்சர் ஸ்டோர் அல்லது ஃபீச்சர் இன்ஜினியரிங் ஏபிஐகளைப் பயன்படுத்தலாம்
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
நாம் அம்ச பொறியியல் API ஐப் பயன்படுத்தும் போது, மாடலின் வரிசையை கேடலாக் எக்ஸ்ப்ளோரரில் பார்க்கலாம்
இப்போது மாதிரி விளக்கத்தைப் புதுப்பித்து, அதற்கு சாம்பியன் லேபிளை ஒதுக்கலாம்.
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
இப்போது மேலே சென்று, நீங்கள் மாதிரியைப் பதிவுசெய்த திட்டத்தைச் சரிபார்க்கவும். உங்கள் எல்லா புதுப்பிப்புகளையும் பின்வருமாறு பார்க்க வேண்டும்
மாதிரி நிலைகள் : மாதிரிப் பதிவேட்டில் பணியிடத்தைப் பயன்படுத்தினால், உங்கள் மாடல்களை நிர்வகிப்பதற்கான நிலைகளை நீங்கள் எடுக்க வேண்டும். மாற்றுப்பெயர்களைப் பயன்படுத்துவது வேலை செய்யாது. இங்கே பாருங்கள் அது எப்படி வேலை செய்கிறது என்பதைப் பார்க்க
இப்போது அனுமானத்திற்காக உற்பத்தியில் எங்கள் மாதிரியைப் பயன்படுத்த விரும்புகிறோம் என்று கற்பனை செய்து பாருங்கள். இந்தப் படிநிலையில் நாங்கள் சாம்பியன் மாடலை ஏற்றி, ஒவ்வொரு பயனருக்கும் 20 திரைப்படப் பரிந்துரைகளை உருவாக்க அதைப் பயன்படுத்துகிறோம்.
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
அதே பயிற்சி தரவையே பேட்ச் ஸ்கோரிங்கிற்கு பயன்படுத்தியதை நீங்கள் பார்க்கலாம். சிபாரிசு செய்யும் அமைப்புகளின் விஷயத்தில் இது அர்த்தமுள்ளதாக இருந்தாலும், பெரும்பாலான பயன்பாட்டில் நாம் பார்க்காத சில தரவுகளைப் பெற மாதிரியைப் பயன்படுத்த விரும்புகிறோம். எடுத்துக்காட்டாக, இமேஜிங் உங்களுடையது Netflix மற்றும் அவர்களின் புதிய பார்வை பட்டியலின் அடிப்படையில் நாளின் முடிவில் பயனர் பரிந்துரைகளைப் புதுப்பிக்க வேண்டும். நாள் முடிவில் குறிப்பிட்ட நேரத்தில் பேட்ச் ஸ்கோரை இயக்கும் வேலையை நாங்கள் திட்டமிடலாம்.
இப்போது நாம் மேலே சென்று ஒவ்வொரு பயனருக்கும் பரிந்துரைகளை உருவாக்கலாம். இதற்காக ஒவ்வொரு பயனருக்கும் முதல் 20 உருப்படிகளைக் காண்கிறோம்
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
முடிவு எப்படி இருக்கும் என்பது இங்கே
இறுதியாக நாம் கணிப்புகளை டெல்டா லேபிளாக எங்கள் UC இல் சேமிக்கலாம் அல்லது கீழ்நிலை அமைப்புகளான Mongo DB அல்லது Azure Cosmos DB இல் வெளியிடலாம். நாங்கள் முதல் விருப்பத்துடன் செல்கிறோம்
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
நிகழ்நேர பயனர் தொடர்புகளின் அடிப்படையில் எங்கள் பரிந்துரைகளைப் புதுப்பிக்க விரும்புவதை இப்போது கற்பனை செய்து பாருங்கள். இந்த வழக்கில் நாம் மாதிரி சேவையைப் பயன்படுத்தலாம். யாராவது உங்கள் மாதிரியைப் பயன்படுத்த விரும்பினால், அவர்கள் தரவை சேவையகத்திற்கு அனுப்பலாம். சேவையகம் அந்தத் தரவை நீங்கள் பயன்படுத்திய மாதிரிக்கு ஊட்டுகிறது, அது செயல்பாட்டிற்குச் சென்று, தரவை பகுப்பாய்வு செய்து, கணிப்புகளை உருவாக்குகிறது. அவை இணைய பயன்பாடுகள், மொபைல் பயன்பாடுகள் அல்லது உட்பொதிக்கப்பட்ட அமைப்புகளில் கூட பயன்படுத்தப்படலாம். இந்த அணுகுமுறையின் பயன்பாட்டில் ஒன்று, A/B சோதனைக்கு ட்ராஃபிக் ரூட்டிங் செய்வதாகும்.
ALS அல்காரிதத்தை ஆன்லைன் அனுமானத்திற்கு நேரடியாகப் பயன்படுத்த முடியாது, ஏனெனில் பரிந்துரைகளைப் புதுப்பிக்க முழுத் தரவையும் (பழைய + புதியது) பயன்படுத்தி மாதிரியை மீண்டும் பயிற்சி செய்ய வேண்டும். கிரேடியன்ட் டிசென்ட் கற்றல் அல்காரிதம்கள் ஆன்லைன் புதுப்பிப்புகளுக்குப் பயன்படுத்தக்கூடிய மாதிரியின் எடுத்துக்காட்டுகள். இந்த அல்காரிதம்களில் சிலவற்றை எதிர்கால இடுகையில் பார்க்கலாம்.
எவ்வாறாயினும், அத்தகைய மாதிரி எவ்வாறு செயல்படும் என்பதை விளக்குவதற்கு, பயனர் ஒரு திரைப்படத்தை மதிப்பிடும் போதெல்லாம் மூவி மதிப்பீட்டைக் கணிக்கும் (பயனற்ற) சேவை இறுதிப் புள்ளியை நாங்கள் உருவாக்குகிறோம்!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
இது எங்களுக்காக லன்ச் மாடல் சர்விங் கிளஸ்டரை உருவாக்கி, சிறிது நேரம் எடுக்கும். இப்போது நீங்கள் Serving
சாளரத்தைத் திறந்தால், உங்கள் இறுதிப் புள்ளியைப் பார்க்க வேண்டும்.
பல மாதிரிகளை வழங்குவதற்கு ஒரு முனைப்புள்ளியை நாம் பயன்படுத்தலாம். A/B சோதனை போன்ற காட்சிகளுக்கு ட்ராஃபிக் ரூட்டிங்கைப் பயன்படுத்தலாம் அல்லது உற்பத்தியில் உள்ள வித்தியாச மாதிரிகளின் செயல்திறனை ஒப்பிடலாம்.
டேட்டாபிரிக்ஸ் மாடல் சர்விங்கில் உள்ள அனுமான அட்டவணைகள், எங்களின் வரிசைப்படுத்தப்பட்ட மாடல்களுக்கான தானியங்கி பதிவாக செயல்படும். இயக்கப்படும் போது, அவை உள்வரும் கோரிக்கைகள் (கணிப்பிற்காக அனுப்பப்பட்ட தரவு), தொடர்புடைய மாதிரி வெளியீடுகள் (கணிப்புகள்) மற்றும் வேறு சில மெட்டாடேட்டாவை யூனிட்டி கேடலாக்கில் டெல்டா அட்டவணையாகப் பிடிக்கும். கண்காணிப்பு மற்றும் பிழைத்திருத்தம் , பரம்பரை கண்காணிப்பு மற்றும் எங்கள் மாதிரிகளை மீண்டும் பயிற்சி செய்வதற்கு அல்லது நன்றாக மாற்றுவதற்கான தரவு சேகரிப்பு செயல்முறைக்கு நாம் அனுமான அட்டவணையைப் பயன்படுத்தலாம்.
மாதிரியைக் கண்காணிக்க, எங்கள் சேவை முனையில் inference table
இயக்கலாம். நாம் முதலில் இறுதிப்புள்ளியை உருவாக்கும் போது பேலோடில் உள்ள auto_capture_config
பண்புகளைக் குறிப்பிடுவதன் மூலம் அதைச் செய்யலாம். அல்லது put
கட்டளை மற்றும் config
எண்ட்பாயிண்ட் URL ஐப் பயன்படுத்தி பின்வருவனவற்றைப் பயன்படுத்தி எங்கள் இறுதிப்புள்ளியைப் புதுப்பிக்கிறோம் (மேலும் இங்கே )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
இப்போது சில போலி பயனர் தொடர்புத் தரவுகளுடன் இறுதிப் புள்ளியை ஊட்டுவோம்
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
இறுதிப்புள்ளி பதிவுகளை <catalog>.<schema>.<payload_table>
அட்டவணையில் பார்க்கலாம். டேபிளில் உள்ள தரவைக் காணும் வரை சுமார் 10 நிமிடங்கள் ஆகும்.
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
உங்கள் பேலோட் டேபிளை இது போன்ற ஒன்றை நீங்கள் பார்க்க வேண்டும்
இந்த அனுமான அட்டவணையின் திட்டத்தைப் புரிந்து கொள்ள, “ஒற்றுமை பட்டியல் அனுமான அட்டவணை ஸ்கீமா==” ஐ இங்கே பார்க்கவும்.==
மாடலும் டேட்டாவும் மாஸ்டர் செய்ய நிறைய நேரம் தேவைப்படும் சிக்கலான தலைப்பைக் கண்காணிக்கும். டேட்டாபிரிக்ஸ் லேக்ஹவுஸ் கண்காணிப்பு (DLM) பொதுவான பயன்பாட்டு நிகழ்வுகளுக்கு நிலையான மற்றும் தனிப்பயனாக்கக்கூடிய வார்ப்புருக்களை வழங்குவதன் மூலம் முறையான கண்காணிப்பு அமைப்பை உருவாக்குவதற்கான மேல்நிலையைக் குறைக்கிறது. இருப்பினும், பொதுவாக DLM மற்றும் மாடல் கண்காணிப்பில் தேர்ச்சி பெறுவதற்கு நிறைய பரிசோதனைகள் தேவை. மாதிரி கண்காணிப்பு பற்றிய விரிவான கண்ணோட்டத்தை இங்கே கொடுக்க விரும்பவில்லை, மாறாக ஒரு தொடக்கப் புள்ளியைத் தருகிறேன். எதிர்காலத்தில் இந்த தலைப்புக்கு நான் ஒரு வலைப்பதிவை அர்ப்பணிப்பேன்.
டிஎல்எம் செயல்பாடுகள் மற்றும் அம்சங்களின் சுருக்கமான சுருக்கம்
இப்போது எங்களுடைய மாதிரியை மேம்படுத்தி இயங்கிக்கொண்டிருக்கிறோம், முக்கிய அளவீடுகளைக் கண்காணிக்க எங்கள் சேவை முடிவுப் புள்ளியால் உருவாக்கப்பட்ட அனுமான அட்டவணையைப் பயன்படுத்தலாம். இந்த முன்முயற்சி அணுகுமுறையானது, சிறந்த செயல்திறன் மற்றும் வணிக நோக்கங்களுடன் சீரமைக்க, மாதிரியை மீண்டும் பயிற்சி செய்தல் அல்லது அதன் அம்சங்களை புதுப்பித்தல் போன்ற சரியான நேரத்தில் சரிசெய்தல் நடவடிக்கைகளை எடுக்க எங்களுக்கு உதவுகிறது.
டிஎல்எம் மூன்று வகையான பகுப்பாய்வு அல்லது profile type
வழங்குகிறது: நேரத் தொடர் , ஸ்னாப்ஷாட் மற்றும் அனுமானம் . எங்கள் அனுமான அட்டவணையை பகுப்பாய்வு செய்வதில் நாங்கள் ஆர்வமாக இருப்பதால், பிந்தையவற்றில் கவனம் செலுத்துகிறோம். கண்காணிப்புக்கு அட்டவணையைப் பயன்படுத்த - எங்கள் “ முதன்மை அட்டவணை ”, அட்டவணை சரியான அமைப்பைக் கொண்டிருப்பதை உறுதி செய்ய வேண்டும். அனுமான அட்டவணைக்கு , ஒவ்வொரு வரிசையும் பின்வரும் நெடுவரிசைகளைக் கொண்ட கோரிக்கைகளுடன் ஒத்திருக்க வேண்டும்:
மாதிரி அம்சங்கள்
மாதிரி கணிப்பு
மாதிரி ஐடி
நேரமுத்திரை : அனுமானக் கோரிக்கையின் நேரமுத்திரை
அடிப்படை உண்மை (விரும்பினால்)
நாங்கள் பல மாடல்களை வழங்கும்போது மாடல் ஐடி முக்கியமானது, மேலும் ஒவ்வொரு மாடலின் செயல்திறனையும் ஒரே கண்காணிப்பு டாஷ்போர்டில் கண்காணிக்க விரும்புகிறோம். ஒன்றுக்கு மேற்பட்ட மாடல் ஐடிகள் இருந்தால், ஒவ்வொரு ஸ்லைஸுக்கும் தனித்தனியாக டேட்டாவை ஸ்லைஸ் செய்யவும், அளவீடுகள் மற்றும் ஸ்டாட்டிக்ஸ் ஆகியவற்றைக் கணக்கிடவும் DLM அதைப் பயன்படுத்துகிறது.
DLM ஒவ்வொரு புள்ளியியல் மற்றும் அளவீடுகளையும் ஒரு குறிப்பிட்ட நேர இடைவெளியில் கணக்கிடுகிறது. அனுமான பகுப்பாய்விற்கு, இது நேர முத்திரை நெடுவரிசையைப் பயன்படுத்தியது, மேலும் நேர சாளரங்களை அடையாளம் காண பயனர் வரையறுக்கப்பட்ட சாளர அளவையும் பயன்படுத்தினார். மேலும் கீழே.
அனுமான அட்டவணைகளுக்கு DLM இரண்டு problem type
ஆதரிக்கிறது: " வகைப்பாடு " அல்லது " பின்னடைவு ". இந்த விவரக்குறிப்பின் அடிப்படையில் தொடர்புடைய சில அளவீடுகள் மற்றும் புள்ளிவிவரங்களை இது கணக்கிடுகிறது.
DLM ஐப் பயன்படுத்த, நாம் ஒரு மானிட்டரை உருவாக்கி அதை ஒரு அட்டவணையில் இணைக்க வேண்டும். நாம் இதைச் செய்யும்போது DLM இரண்டு metric tables
உருவாக்குகிறது:
சுயவிவர அளவீட்டு அட்டவணை : இந்த அட்டவணையில் குறைந்தபட்சம், அதிகபட்சம், பூஜ்யத்தின் சதவீதம் மற்றும் பூஜ்ஜியங்கள் போன்ற சுருக்கமான புள்ளிவிவரங்கள் உள்ளன. பயனரால் வரையறுக்கப்பட்ட சிக்கல் வகையின் அடிப்படையிலான கூடுதல் அளவீடுகளும் இதில் உள்ளன. எடுத்துக்காட்டாக, வகைப்படுத்தல் மாதிரிகளுக்கான துல்லியம் , நினைவுபடுத்துதல் மற்றும் f1_ ஸ்கோர் மற்றும் பின்னடைவு மாதிரிகளுக்கான சராசரி_சதுர_எரர் மற்றும் சராசரி_சராசரி_எரர் .
டிரிஃப்ட் மெட்ரிக் அட்டவணை : தரவுகளின் விநியோகம் காலப்போக்கில் எவ்வாறு மாறியது அல்லது அடிப்படை மதிப்புடன் தொடர்புடையது (வழங்கப்பட்டால்) என்பதை அளவிடும் புள்ளிவிவரத்தைக் கொண்டுள்ளது. இது சி-சதுர சோதனை, KS சோதனை போன்ற நடவடிக்கைகளை கணக்கிடுகிறது.
ஒவ்வொரு அட்டவணைக்கும் முழுமையான அளவீடுகளின் பட்டியலைப் பார்க்க, மெட்ரிக் அட்டவணை ஆவணப் பக்கத்தை கண்காணிக்கவும் . தனிப்பயன் அளவீடுகளை உருவாக்குவதும் சாத்தியமாகும்.
கண்காணிப்பு அமைப்பை உருவாக்குவதற்கான ஒரு முக்கிய அம்சம், சமீபத்திய அனுமானத் தரவை வரும்போது எங்கள் கண்காணிப்பு டாஷ்போர்டு அணுகுவதை உறுதிசெய்வதாகும். அதற்கு நாம் டெல்டா டேபிள் ஸ்ட்ரீமிங்கைப் பயன்படுத்தி அனுமான அட்டவணையில் செயலாக்கப்பட்ட வரிசைகளைக் கண்காணிக்கலாம். மாதிரி சேவையின் அனுமான அட்டவணையை எங்கள் மூல அட்டவணையாகவும் ( readStream
), கண்காணிப்பு அட்டவணையை சிங்க் டேபிளாகவும் ( writeStream
) பயன்படுத்துகிறோம். டேட்டா கேப்சர் மாற்றுதல் (சிடிசி) இரண்டு அட்டவணைகளிலும் இயக்கப்பட்டிருப்பதையும் உறுதிசெய்கிறோம் (அது அனுமான அட்டவணையில் இயல்பாகவே இயக்கப்படும்). இந்த வழியில், ஒவ்வொரு புதுப்பித்தலின் போதும் முழு அட்டவணையையும் மீண்டும் செயலாக்குவதற்குப் பதிலாக மூல அட்டவணையில் மாற்றங்களை மட்டுமே செயல்படுத்துகிறோம் - செருகவும்/புதுப்பிக்கவும்/நீக்கவும்.
எங்கள் அனுமான அட்டவணையில் கண்காணிப்பை இயக்க, நாங்கள் பின்வரும் படிகளைச் செய்கிறோம்:
முதலில் நாம் Lakehouse Monitoring API ஐ நிறுவ வேண்டும். Databricks ரம் நேரம் 15.3 LTS மற்றும் அதற்கு மேல் பயன்படுத்தினால், இது ஏற்கனவே நிறுவப்பட்டிருக்க வேண்டும்:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
அனுமான அட்டவணையை ஸ்ட்ரீமிங் அட்டவணையாகப் படிப்போம்
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
அடுத்து மேலே விவரிக்கப்பட்டுள்ளபடி அட்டவணையை சரியான வடிவத்தில் வைக்க வேண்டும். இந்த அட்டவணையில் ஒவ்வொரு கணிப்புக்கும் தொடர்புடைய அம்சங்கள் மற்றும் கணிப்பு மதிப்புடன் ஒரு வரிசை இருக்க வேண்டும். மாதிரி சேவை எண்ட்பாயிண்டிலிருந்து நாம் பெறும் அனுமான அட்டவணை, இறுதிப்புள்ளி கோரிக்கைகள் மற்றும் பதில்களை உள்ளமைக்கப்பட்ட JSON வடிவமாக சேமிக்கிறது. கோரிக்கை மற்றும் மறுமொழி நெடுவரிசைக்கான JSON பேலோடின் உதாரணம் இங்கே.
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
இந்த அட்டவணையை சரியான ஸ்கீமாவில் திறக்க, டேட்டாபிரிக்ஸ் ஆவணத்திலிருந்து ( அனுமான அட்டவணை லேக்ஹவுஸ் மானிட்டரிங் ஸ்டார்டர் நோட்புக் ) தழுவிய பின்வரும் குறியீட்டைப் பயன்படுத்தலாம்.
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
இதன் விளைவாக வரும் அட்டவணை இப்படி இருக்கும்:
அடுத்து நாம் நமது சிங்க் டேபிளை துவக்க வேண்டும்
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
மற்றும் முடிவுகளை எழுதவும்
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
இறுதியாக, நாங்கள் எங்கள் அடிப்படை அட்டவணையை உருவாக்குகிறோம். அடிப்படை மற்றும் முதன்மை மாதிரிகளின் ஒரே மாதிரியான நெடுவரிசைகளின் விநியோகத்தை ஒப்பிடுவதன் மூலம் சறுக்கல்களைக் கணக்கிட DLM இந்த அட்டவணையைப் பயன்படுத்துகிறது. அடிப்படை அட்டவணையில் முதன்மை நெடுவரிசையின் அதே அம்ச நெடுவரிசையும் அதே மாதிரி அடையாள நெடுவரிசையும் இருக்க வேண்டும். பேஸ்லைன் டேபிளுக்கு, சிறந்த ஹைப்பர் பாராமீட்டரைப் பயன்படுத்தி எங்கள் மாதிரியைப் பயிற்றுவித்த பிறகு, நாங்கள் முன்பே சேமித்து வைத்திருக்கும் எங்கள் சரிபார்ப்பு தரவுத்தொகுப்பின் முன்கணிப்பு அட்டவணையைப் பயன்படுத்துகிறோம். டிரிஃப்ட் மெட்ரிக்கைக் கணக்கிட, டேட்டாபிரிக்ஸ் முதன்மை மற்றும் அடிப்படை அட்டவணை இரண்டிற்கும் சுயவிவர அளவீடுகளைக் கணக்கிடுகிறது. இங்கே நீங்கள் முதன்மை அட்டவணை மற்றும் அடிப்படை அட்டவணை பற்றி படிக்கலாம்.
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
இப்போது எங்கள் கண்காணிப்பு டாஷ்போர்டை உருவாக்க படிக்கிறோம். UI ஐப் பயன்படுத்தி நாம் அதைச் செய்யலாம் அல்லது லேக்ஹவுஸ் கண்காணிப்பு API. இங்கே நாம் இரண்டாவது விருப்பத்தைப் பயன்படுத்துகிறோம்:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
நாம் குறியீட்டை இயக்கிய பிறகு டேட்டாபிரிக்ஸ் அனைத்து மெட்ரிக் கணக்கிடும் வரை சிறிது நேரம் எடுக்கும். டாஷ்போர்டைப் பார்க்க, உங்கள் சிங்க் டேபிளின் Quality
தாவலுக்குச் செல்லவும் (அதாவது unpacked_requests_table_name
). நீங்கள் ஒரு பக்கத்தை பின்வருமாறு பார்க்க வேண்டும்.
பார்வை refresh history
கிளிக் செய்தால், உங்கள் இயங்கும், நிலுவையில் உள்ள மற்றும் கடந்த புதுப்பிப்புகளைக் காணலாம். உங்கள் டாஷ்போர்டைத் திறக்க View Dashboard
கிளிக் செய்யவும்.
எனவே நாம் அனுமான அட்டவணையில் ( my_endpoint_payload
) தொடங்கி, அதைச் செயலாக்கி, முடிவை my_endpoint_payload_unpacked
இல் சேமித்து, இந்த அட்டவணையை எங்கள் அடிப்படை அட்டவணையுடன் ( base_table_als
) எங்கள் கண்காணிப்பு API க்கு அனுப்புவோம். DLM ஆனது ஒவ்வொரு அட்டவணைக்கும் சுயவிவர அளவீடுகளைக் கணக்கிடுகிறது ( my_endpoint_payload_unpacked_profile_metric
) மற்றும் டிரிஃப்ட் அளவீடுகளைக் கணக்கிட அவற்றைப் பயன்படுத்துகிறது ( my_endpoint_payload_unpacked_drift_metrics
)
இதோ! நீங்கள் சேவை செய்ய மற்றும் உங்கள் மாதிரியை கண்காணிக்க வேண்டிய அனைத்தும் உங்களிடம் உள்ளன!
Databricks Assets Bundle மற்றும் Gitlab ஐப் பயன்படுத்தி இந்த செயல்முறையை எவ்வாறு தானியக்கமாக்குவது என்பதை அடுத்த பகுதியில் நான் உங்களுக்குக் காண்பிப்பேன்!
இந்த டுடோரியல் தொடரின் முதல் பகுதியில் , டேட்டாபிரிக்ஸ் மற்றும் ஸ்பார்க்கைப் பயன்படுத்தி, டேட்டாபிரிக்ஸின் குறிப்புக் கட்டமைப்பால் வழிநடத்தப்படும் எம்எல்ஓப்ஸ் பைப்லைனை உருவாக்குவதற்கான முதல் படிகளை எடுத்தோம். நாங்கள் உள்ளடக்கிய முக்கிய படிகளின் மறுபரிசீலனை இங்கே:
மெடாலியன் கட்டிடக்கலைக்கான யூனிட்டி கேடலாக்கை அமைத்தல் : யூனிட்டி கேடலாக்கில் எங்கள் தரவை வெண்கலம், வெள்ளி மற்றும் தங்க அடுக்குகளாக ஒழுங்கமைத்து, கட்டமைக்கப்பட்ட மற்றும் திறமையான தரவு மேலாண்மை அமைப்பை உருவாக்குகிறோம்.
யூனிட்டி கேடலாக்கில் தரவை உள்வாங்குதல் : கணினியில் மூலத் தரவை எவ்வாறு இறக்குமதி செய்வது என்பதை நாங்கள் நிரூபித்தோம், அடுத்தடுத்த செயலாக்க நிலைகளுக்கு நிலைத்தன்மை மற்றும் தரத்தை உறுதி செய்தோம்.
மாதிரியைப் பயிற்றுவித்தல் : டேட்டாபிரிக்குகளைப் பயன்படுத்தி, அளவிடக்கூடிய மற்றும் பயனுள்ள மாதிரி மேம்பாட்டிற்கான சிறந்த நடைமுறைகளைப் பின்பற்றி, எங்கள் தரவுத்தொகுப்புக்கு ஏற்றவாறு இயந்திரக் கற்றல் மாதிரியைப் பயிற்றுவித்தோம்.
ஹைப்பர்ஆப்ட் மூலம் ஹைபர்பாராமீட்டர் ட்யூனிங் : மாதிரி செயல்திறனை மேம்படுத்த, துல்லியம் மற்றும் செயல்திறனை மேம்படுத்த, உகந்த ஹைப்பர் பாராமீட்டர்களுக்கான தேடலை தானியக்கமாக்க ஹைப்பர்ஆப்ட்டைப் பயன்படுத்தினோம்.
டேட்டாபிரிக்ஸ் எம்எல்ஃப்ளோவுடன் சோதனை கண்காணிப்பு : எம்எல்ஃப்ளோவைப் பதிவுசெய்து, எங்கள் சோதனைகளை கண்காணிக்க பயன்படுத்தினோம், மாடல் பதிப்புகள், அளவீடுகள் மற்றும் அளவுருக்கள் ஆகியவற்றை எளிதாக ஒப்பிட்டுப் பார்க்கவும், மீண்டும் உருவாக்கவும் முடியும்.
இந்த அடிப்படைப் படிகள் முடிவடைந்த நிலையில், உங்கள் மாதிரி இப்போது வரிசைப்படுத்துதலுக்கு முதன்மையானது. இந்த இரண்டாவது பகுதியில், எங்கள் கணினியில் இரண்டு முக்கியமான கூறுகளை ஒருங்கிணைப்பதில் கவனம் செலுத்துவோம்:
அதற்குள் நுழைவோம்!
கடைசி வலைப்பதிவின் புறப்பாடு மாதிரி மதிப்பீடு ஆகும். இப்போது நாம் ஒப்பீடு செய்தோம் என்று கற்பனை செய்து பாருங்கள், இந்த தயாரிப்பு மாதிரியுடன் ஒப்பிடும்போது எங்கள் மாடல் அதிக செயல்திறனைக் காட்டுகிறது. உற்பத்தியில் மாடலைப் பயன்படுத்த விரும்புவதால் (ஊகிக்க), எங்களிடம் உள்ள எல்லா தரவையும் பயன்படுத்திக் கொள்ள விரும்புகிறோம். அடுத்த படி முழு தரவுத்தொகுப்பைப் பயன்படுத்தி மாதிரியைப் பயிற்றுவித்து சோதிப்பதாகும். பின்னர் எங்கள் மாடலை எங்கள் சாம்பியன் மாடலாக பயன்படுத்துவதன் மூலம் பின்னர் பயன்படுத்தவும். அனுமானத்திற்காக நாங்கள் பயன்படுத்த விரும்பும் இறுதி மாதிரி இதுவாக இருப்பதால், மாதிரியைப் பயிற்றுவிக்க அம்ச பொறியியல் கிளையண்டைப் பயன்படுத்துகிறோம். இந்த வழியில் நாங்கள் மாதிரி வரிசையை எளிதாகக் கண்காணிப்பது மட்டுமல்லாமல், ஸ்கீமா சரிபார்ப்பு மற்றும் அம்ச மாற்றத்தை (ஏதேனும் இருந்தால்) கிளையண்டிற்கு ஆஃப்லோட் செய்கிறோம்.
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
மாடல்களைப் பயிற்றுவிப்பதற்கும் பதிவு செய்வதற்கும் நாங்கள் ஃபீச்சர் ஸ்டோர் அல்லது ஃபீச்சர் இன்ஜினியரிங் ஏபிஐகளைப் பயன்படுத்தலாம்
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
நாம் அம்ச பொறியியல் API ஐப் பயன்படுத்தும் போது, மாடலின் வரிசையை கேடலாக் எக்ஸ்ப்ளோரரில் பார்க்கலாம்
இப்போது மாதிரி விளக்கத்தைப் புதுப்பித்து, அதற்கு சாம்பியன் லேபிளை ஒதுக்கலாம்.
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
இப்போது மேலே சென்று, நீங்கள் மாதிரியைப் பதிவுசெய்த திட்டத்தைச் சரிபார்க்கவும். உங்கள் எல்லா புதுப்பிப்புகளையும் பின்வருமாறு பார்க்க வேண்டும்
மாதிரி நிலைகள் : மாதிரிப் பதிவேட்டில் பணியிடத்தைப் பயன்படுத்தினால், உங்கள் மாடல்களை நிர்வகிப்பதற்கான நிலைகளை நீங்கள் எடுக்க வேண்டும். மாற்றுப்பெயர்களைப் பயன்படுத்துவது வேலை செய்யாது. இங்கே பாருங்கள் அது எப்படி வேலை செய்கிறது என்பதைப் பார்க்க
இப்போது அனுமானத்திற்காக உற்பத்தியில் எங்கள் மாதிரியைப் பயன்படுத்த விரும்புகிறோம் என்று கற்பனை செய்து பாருங்கள். இந்தப் படிநிலையில் நாங்கள் சாம்பியன் மாடலை ஏற்றி, ஒவ்வொரு பயனருக்கும் 20 திரைப்படப் பரிந்துரைகளை உருவாக்க அதைப் பயன்படுத்துகிறோம்.
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
அதே பயிற்சி தரவையே பேட்ச் ஸ்கோரிங்கிற்கு பயன்படுத்தியதை நீங்கள் பார்க்கலாம். சிபாரிசு செய்யும் அமைப்புகளின் விஷயத்தில் இது அர்த்தமுள்ளதாக இருந்தாலும், பெரும்பாலான பயன்பாட்டில் நாம் பார்க்காத சில தரவுகளைப் பெற மாதிரியைப் பயன்படுத்த விரும்புகிறோம். எடுத்துக்காட்டாக, இமேஜிங் உங்களுடையது Netflix மற்றும் அவர்களின் புதிய பார்வை பட்டியலின் அடிப்படையில் நாளின் முடிவில் பயனர் பரிந்துரைகளைப் புதுப்பிக்க வேண்டும். நாள் முடிவில் குறிப்பிட்ட நேரத்தில் பேட்ச் ஸ்கோரை இயக்கும் வேலையை நாம் திட்டமிடலாம்.
இப்போது நாம் மேலே சென்று ஒவ்வொரு பயனருக்கும் பரிந்துரைகளை உருவாக்கலாம். இதற்காக ஒவ்வொரு பயனருக்கும் முதல் 20 உருப்படிகளைக் காண்கிறோம்
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
முடிவு எப்படி இருக்கும் என்பது இங்கே
இறுதியாக, கணிப்புகளை டெல்டா லேபிளாக எங்கள் UC இல் சேமிக்கலாம் அல்லது கீழ்நிலை அமைப்புகளான Mongo DB அல்லது Azure Cosmos DB இல் வெளியிடலாம். நாங்கள் முதல் விருப்பத்துடன் செல்கிறோம்
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
நிகழ்நேர பயனர் தொடர்புகளின் அடிப்படையில் எங்கள் பரிந்துரைகளைப் புதுப்பிக்க விரும்புவதை இப்போது கற்பனை செய்து பாருங்கள். இந்த வழக்கில் நாம் மாதிரி சேவையைப் பயன்படுத்தலாம். யாராவது உங்கள் மாதிரியைப் பயன்படுத்த விரும்பினால், அவர்கள் தரவை சேவையகத்திற்கு அனுப்பலாம். சேவையகம் அந்தத் தரவை உங்கள் வரிசைப்படுத்தப்பட்ட மாதிரிக்கு ஊட்டுகிறது, அது செயல்பாட்டிற்குச் சென்று, தரவை பகுப்பாய்வு செய்து, கணிப்புகளை உருவாக்குகிறது. அவை இணைய பயன்பாடுகள், மொபைல் பயன்பாடுகள் அல்லது உட்பொதிக்கப்பட்ட அமைப்புகளில் கூட பயன்படுத்தப்படலாம். இந்த அணுகுமுறையின் பயன்பாட்டில் ஒன்று, A/B சோதனைக்கு ட்ராஃபிக் ரூட்டிங் செய்வதாகும்.
ALS அல்காரிதத்தை ஆன்லைன் அனுமானத்திற்கு நேரடியாகப் பயன்படுத்த முடியாது, ஏனெனில் பரிந்துரைகளைப் புதுப்பிக்க முழுத் தரவையும் (பழைய + புதியது) பயன்படுத்தி மாதிரியை மீண்டும் பயிற்சி செய்ய வேண்டும். கிரேடியன்ட் டிசென்ட் கற்றல் அல்காரிதம்கள் ஆன்லைன் புதுப்பிப்புகளுக்குப் பயன்படுத்தக்கூடிய மாதிரியின் எடுத்துக்காட்டுகள். இந்த அல்காரிதம்களில் சிலவற்றை எதிர்கால இடுகையில் பார்க்கலாம்.
எவ்வாறாயினும், அத்தகைய மாதிரி எவ்வாறு செயல்படும் என்பதை விளக்குவதற்கு, பயனர் ஒரு திரைப்படத்தை மதிப்பிடும் போதெல்லாம் மூவி மதிப்பீட்டைக் கணிக்கும் (பயனற்ற) சேவை இறுதிப் புள்ளியை நாங்கள் உருவாக்குகிறோம்!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
இது எங்களுக்காக லன்ச் மாடல் சர்விங் கிளஸ்டரை உருவாக்கி, சிறிது நேரம் எடுக்கும். இப்போது நீங்கள் Serving
சாளரத்தைத் திறந்தால், உங்கள் இறுதிப் புள்ளியைப் பார்க்க வேண்டும்.
பல மாதிரிகளை வழங்குவதற்கு ஒரு முனைப்புள்ளியை நாம் பயன்படுத்தலாம். A/B சோதனை போன்ற காட்சிகளுக்கு ட்ராஃபிக் ரூட்டிங் பயன்படுத்தலாம் அல்லது உற்பத்தியில் உள்ள வித்தியாச மாதிரிகளின் செயல்திறனை ஒப்பிடலாம்.
டேட்டாபிரிக்ஸ் மாடல் சர்விங்கில் உள்ள அனுமான அட்டவணைகள், எங்களின் பயன்படுத்தப்பட்ட மாடல்களுக்கான தானியங்கு பதிவாகச் செயல்படும். இயக்கப்படும் போது, அவை உள்வரும் கோரிக்கைகள் (கணிப்பிற்காக அனுப்பப்பட்ட தரவு), தொடர்புடைய மாதிரி வெளியீடுகள் (கணிப்புகள்) மற்றும் வேறு சில மெட்டாடேட்டாவை யூனிட்டி கேடலாக்கில் டெல்டா அட்டவணையாகப் பிடிக்கும். கண்காணிப்பு மற்றும் பிழைத்திருத்தம் , பரம்பரை கண்காணிப்பு மற்றும் எங்கள் மாதிரிகளை மீண்டும் பயிற்சி செய்வதற்கு அல்லது நன்றாக மாற்றுவதற்கான தரவு சேகரிப்பு செயல்முறைக்கு நாம் அனுமான அட்டவணையைப் பயன்படுத்தலாம்.
மாதிரியைக் கண்காணிக்க, எங்கள் சேவை முனையில் inference table
இயக்கலாம். நாம் முதலில் இறுதிப்புள்ளியை உருவாக்கும் போது, பேலோடில் உள்ள auto_capture_config
பண்புகளைக் குறிப்பிடுவதன் மூலம் இதைச் செய்யலாம். அல்லது put
கட்டளை மற்றும் config
endpoint URL ஐப் பயன்படுத்தி பின்வருவனவற்றைப் பயன்படுத்தி எங்கள் இறுதிப்புள்ளியைப் புதுப்பிக்கிறோம் (மேலும் இங்கே )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
இப்போது சில போலி பயனர் தொடர்பு தரவு மூலம் இறுதிப்புள்ளியை ஊட்டுவோம்
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
இறுதிப்புள்ளி பதிவுகளை <catalog>.<schema>.<payload_table>
அட்டவணையில் பார்க்கலாம். டேபிளில் உள்ள தரவைக் காணும் வரை சுமார் 10 நிமிடங்கள் ஆகும்.
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
உங்கள் பேலோட் டேபிளை இது போன்ற ஒன்றை நீங்கள் பார்க்க வேண்டும்
இந்த அனுமான அட்டவணையின் திட்டத்தைப் புரிந்து கொள்ள, “ஒற்றுமை பட்டியல் அனுமான அட்டவணை ஸ்கீமா==” ஐ இங்கே பார்க்கவும்.==
மாடலும் டேட்டாவும் மாஸ்டர் செய்ய நிறைய நேரம் தேவைப்படும் சிக்கலான தலைப்பைக் கண்காணிக்கும். டேட்டாபிரிக்ஸ் லேக்ஹவுஸ் கண்காணிப்பு (DLM) பொதுவான பயன்பாட்டு நிகழ்வுகளுக்கு நிலையான மற்றும் தனிப்பயனாக்கக்கூடிய வார்ப்புருக்களை வழங்குவதன் மூலம் முறையான கண்காணிப்பு அமைப்பை உருவாக்குவதற்கான மேல்நிலையைக் குறைக்கிறது. இருப்பினும், பொதுவாக DLM மற்றும் மாடல் கண்காணிப்பில் தேர்ச்சி பெறுவதற்கு நிறைய பரிசோதனைகள் தேவை. மாதிரி கண்காணிப்பு பற்றிய விரிவான கண்ணோட்டத்தை இங்கே கொடுக்க விரும்பவில்லை, மாறாக ஒரு தொடக்கப் புள்ளியைத் தருகிறேன். எதிர்காலத்தில் இந்த தலைப்புக்கு நான் ஒரு வலைப்பதிவை அர்ப்பணிப்பேன்.
டிஎல்எம் செயல்பாடுகள் மற்றும் அம்சங்களின் சுருக்கமான சுருக்கம்
இப்போது எங்களுடைய மாதிரியை உருவாக்கி, இயங்கிக் கொண்டிருக்கிறோம், எங்கள் தரவு அல்லது மாதிரியில் ஏதேனும் விலகல்கள் அல்லது முரண்பாடுகளைக் கண்டறிய, மாதிரி செயல்திறன் மற்றும் டிரிஃப்ட் போன்ற முக்கிய அளவீடுகளைக் கண்காணிக்க, எங்கள் சேவை முடிவுப் புள்ளியால் உருவாக்கப்பட்ட அனுமான அட்டவணையைப் பயன்படுத்தலாம். இந்த முன்முயற்சி அணுகுமுறையானது, சிறந்த செயல்திறன் மற்றும் வணிக நோக்கங்களுடன் சீரமைக்க, மாதிரியை மீண்டும் பயிற்சி செய்தல் அல்லது அதன் அம்சங்களை புதுப்பித்தல் போன்ற சரியான நேரத்தில் சரிசெய்தல் நடவடிக்கைகளை எடுக்க எங்களுக்கு உதவுகிறது.
டிஎல்எம் மூன்று வகையான பகுப்பாய்வு அல்லது profile type
வழங்குகிறது: நேரத் தொடர் , ஸ்னாப்ஷாட் மற்றும் அனுமானம் . எங்கள் அனுமான அட்டவணையை பகுப்பாய்வு செய்வதில் நாங்கள் ஆர்வமாக இருப்பதால், பிந்தையவற்றில் கவனம் செலுத்துகிறோம். கண்காணிப்புக்கு அட்டவணையைப் பயன்படுத்த - எங்கள் " முதன்மை அட்டவணை ", அட்டவணை சரியான அமைப்பைக் கொண்டிருப்பதை உறுதி செய்ய வேண்டும். அனுமான அட்டவணைக்கு , ஒவ்வொரு வரிசையும் பின்வரும் நெடுவரிசைகளுடன் கோரிக்கைகளுடன் ஒத்திருக்க வேண்டும்:
மாதிரி அம்சங்கள்
மாதிரி கணிப்பு
மாதிரி ஐடி
நேர முத்திரை : அனுமானக் கோரிக்கையின் நேரமுத்திரை
அடிப்படை உண்மை (விரும்பினால்)
நாங்கள் பல மாடல்களை வழங்கும்போது மாடல் ஐடி முக்கியமானது, மேலும் ஒவ்வொரு மாடலின் செயல்திறனையும் ஒரே கண்காணிப்பு டாஷ்போர்டில் கண்காணிக்க விரும்புகிறோம். ஒன்றுக்கு மேற்பட்ட மாடல் ஐடிகள் இருந்தால், ஒவ்வொரு ஸ்லைஸுக்கும் தனித்தனியாக டேட்டாவை ஸ்லைஸ் செய்யவும் அளவீடுகள் மற்றும் ஸ்டாட்டிக்ஸ் கணக்கிடவும் DLM அதைப் பயன்படுத்துகிறது.
DLM ஒவ்வொரு புள்ளியியல் மற்றும் அளவீடுகளையும் ஒரு குறிப்பிட்ட நேர இடைவெளியில் கணக்கிடுகிறது. அனுமான பகுப்பாய்விற்கு, இது நேர முத்திரை நெடுவரிசையைப் பயன்படுத்தியது, மேலும் நேர சாளரங்களை அடையாளம் காண பயனர் வரையறுக்கப்பட்ட சாளர அளவையும் பயன்படுத்தினார். மேலும் கீழே.
அனுமான அட்டவணைகளுக்கு DLM இரண்டு problem type
ஆதரிக்கிறது: " வகைப்பாடு " அல்லது " பின்னடைவு ". இந்த விவரக்குறிப்பின் அடிப்படையில் தொடர்புடைய சில அளவீடுகள் மற்றும் புள்ளிவிவரங்களை இது கணக்கிடுகிறது.
DLM ஐப் பயன்படுத்த, நாம் ஒரு மானிட்டரை உருவாக்கி அதை ஒரு அட்டவணையில் இணைக்க வேண்டும். நாம் இதைச் செய்யும்போது DLM இரண்டு metric tables
உருவாக்குகிறது:
சுயவிவர அளவீட்டு அட்டவணை : இந்த அட்டவணையில் குறைந்தபட்சம், அதிகபட்சம், பூஜ்யத்தின் சதவீதம் மற்றும் பூஜ்ஜியங்கள் போன்ற சுருக்கமான புள்ளிவிவரங்கள் உள்ளன. பயனரால் வரையறுக்கப்பட்ட சிக்கல் வகையின் அடிப்படையிலான கூடுதல் அளவீடுகளும் இதில் உள்ளன. எடுத்துக்காட்டாக, வகைப்படுத்தல் மாதிரிகளுக்கான துல்லியம் , நினைவுபடுத்துதல் மற்றும் f1_ ஸ்கோர் மற்றும் பின்னடைவு மாதிரிகளுக்கான சராசரி_சதுர_எரர் மற்றும் சராசரி_சராசரி_எரர் .
டிரிஃப்ட் மெட்ரிக் அட்டவணை : தரவுகளின் விநியோகம் காலப்போக்கில் எவ்வாறு மாறியது அல்லது அடிப்படை மதிப்புடன் தொடர்புடையது (வழங்கப்பட்டால்) என்பதை அளவிடும் புள்ளிவிவரத்தைக் கொண்டுள்ளது. இது சி-சதுர சோதனை, KS சோதனை போன்ற நடவடிக்கைகளை கணக்கிடுகிறது.
ஒவ்வொரு அட்டவணைக்கும் முழுமையான அளவீடுகளின் பட்டியலைக் காண , மெட்ரிக் அட்டவணை ஆவணப் பக்கத்தை கண்காணிக்கவும் . தனிப்பயன் அளவீடுகளை உருவாக்குவதும் சாத்தியமாகும்.
கண்காணிப்பு அமைப்பை உருவாக்குவதற்கான ஒரு முக்கிய அம்சம், சமீபத்திய அனுமானத் தரவை வரும்போது எங்கள் கண்காணிப்பு டாஷ்போர்டு அணுகுவதை உறுதிசெய்வதாகும். அதற்கு நாம் டெல்டா டேபிள் ஸ்ட்ரீமிங்கைப் பயன்படுத்தி அனுமான அட்டவணையில் செயலாக்கப்பட்ட வரிசைகளைக் கண்காணிக்கலாம். மாதிரி சேவையின் அனுமான அட்டவணையை எங்கள் மூல அட்டவணையாகவும் ( readStream
), கண்காணிப்பு அட்டவணையை சிங்க் டேபிளாகவும் ( writeStream
) பயன்படுத்துகிறோம். டேட்டா கேப்சர் மாற்றுதல் (சிடிசி) இரண்டு அட்டவணைகளிலும் இயக்கப்பட்டிருப்பதையும் உறுதிசெய்கிறோம் (அது அனுமான அட்டவணையில் இயல்பாகவே இயக்கப்படும்). இந்த வழியில், ஒவ்வொரு புதுப்பித்தலின் போதும் முழு அட்டவணையையும் மீண்டும் செயலாக்குவதற்குப் பதிலாக மூல அட்டவணையில் மாற்றங்களை மட்டுமே செயல்படுத்துகிறோம் - செருகவும்/புதுப்பிக்கவும்/நீக்கவும்.
எங்கள் அனுமான அட்டவணையில் கண்காணிப்பை இயக்க, நாங்கள் பின்வரும் படிகளைச் செய்கிறோம்:
முதலில் நாம் Lakehouse Monitoring API ஐ நிறுவ வேண்டும். Databricks ரம் நேரம் 15.3 LTS மற்றும் அதற்கு மேல் பயன்படுத்தினால், இது ஏற்கனவே நிறுவப்பட்டிருக்க வேண்டும்:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
அனுமான அட்டவணையை ஸ்ட்ரீமிங் அட்டவணையாகப் படிப்போம்
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
அடுத்து மேலே விவரிக்கப்பட்டுள்ளபடி அட்டவணையை சரியான வடிவத்தில் வைக்க வேண்டும். இந்த அட்டவணையில் ஒவ்வொரு கணிப்புக்கும் தொடர்புடைய அம்சங்கள் மற்றும் கணிப்பு மதிப்புடன் ஒரு வரிசை இருக்க வேண்டும். மாதிரி சேவை எண்ட்பாயிண்டிலிருந்து நாம் பெறும் அனுமான அட்டவணை, இறுதிப்புள்ளி கோரிக்கைகள் மற்றும் பதில்களை உள்ளமைக்கப்பட்ட JSON வடிவமாக சேமிக்கிறது. கோரிக்கை மற்றும் மறுமொழி நெடுவரிசைக்கான JSON பேலோடின் உதாரணம் இங்கே.
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
இந்த அட்டவணையை சரியான ஸ்கீமாவில் திறக்க, டேட்டாபிரிக்ஸ் ஆவணத்திலிருந்து ( அனுமான அட்டவணை லேக்ஹவுஸ் மானிட்டரிங் ஸ்டார்டர் நோட்புக் ) தழுவிய பின்வரும் குறியீட்டைப் பயன்படுத்தலாம்.
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
இதன் விளைவாக வரும் அட்டவணை இப்படி இருக்கும்:
அடுத்து நாம் நமது சிங்க் டேபிளை துவக்க வேண்டும்
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
மற்றும் முடிவுகளை எழுதவும்
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
இறுதியாக, நாங்கள் எங்கள் அடிப்படை அட்டவணையை உருவாக்குகிறோம். அடிப்படை மற்றும் முதன்மை மாதிரிகளின் ஒரே மாதிரியான நெடுவரிசைகளின் விநியோகத்தை ஒப்பிடுவதன் மூலம் சறுக்கல்களைக் கணக்கிட DLM இந்த அட்டவணையைப் பயன்படுத்துகிறது. அடிப்படை அட்டவணையில் முதன்மை நெடுவரிசையின் அதே அம்ச நெடுவரிசையும் அதே மாதிரி அடையாள நெடுவரிசையும் இருக்க வேண்டும். பேஸ்லைன் டேபிளுக்கு, சிறந்த ஹைப்பர் பாராமீட்டரைப் பயன்படுத்தி எங்கள் மாதிரியைப் பயிற்றுவித்த பிறகு, நாங்கள் முன்பே சேமித்து வைத்திருக்கும் எங்கள் சரிபார்ப்பு தரவுத்தொகுப்பின் முன்கணிப்பு அட்டவணையைப் பயன்படுத்துகிறோம். டிரிஃப்ட் மெட்ரிக்கைக் கணக்கிட, டேட்டாபிரிக்ஸ் முதன்மை மற்றும் அடிப்படை அட்டவணை இரண்டிற்கும் சுயவிவர அளவீடுகளைக் கணக்கிடுகிறது. இங்கே நீங்கள் முதன்மை அட்டவணை மற்றும் அடிப்படை அட்டவணை பற்றி படிக்கலாம்.
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
இப்போது எங்கள் கண்காணிப்பு டாஷ்போர்டை உருவாக்க படிக்கிறோம். UI ஐப் பயன்படுத்தி நாம் அதைச் செய்யலாம் அல்லது லேக்ஹவுஸ் கண்காணிப்பு API. இங்கே நாம் இரண்டாவது விருப்பத்தைப் பயன்படுத்துகிறோம்:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
குறியீட்டை இயக்கிய பிறகு, டேட்டாபிரிக்ஸ் அனைத்து அளவீடுகளையும் கணக்கிடும் வரை சிறிது நேரம் எடுக்கும். டாஷ்போர்டைப் பார்க்க, உங்கள் சிங்க் டேபிளின் Quality
தாவலுக்குச் செல்லவும் (அதாவது unpacked_requests_table_name
). நீங்கள் ஒரு பக்கத்தை பின்வருமாறு பார்க்க வேண்டும்.
பார்வை refresh history
கிளிக் செய்தால், உங்கள் இயங்கும், நிலுவையில் உள்ள மற்றும் கடந்த புதுப்பிப்புகளைக் காணலாம். உங்கள் டாஷ்போர்டைத் திறக்க View Dashboard
கிளிக் செய்யவும்.
எனவே நாம் அனுமான அட்டவணையில் ( my_endpoint_payload
) தொடங்கி, அதைச் செயலாக்கி, முடிவை my_endpoint_payload_unpacked
இல் சேமித்து, இந்த அட்டவணையை எங்கள் அடிப்படை அட்டவணையுடன் ( base_table_als
) எங்கள் கண்காணிப்பு API க்கு அனுப்புவோம். DLM ஆனது ஒவ்வொரு அட்டவணைக்கும் சுயவிவர அளவீடுகளைக் கணக்கிடுகிறது ( my_endpoint_payload_unpacked_profile_metric
) மற்றும் டிரிஃப்ட் அளவீடுகளைக் கணக்கிட அவற்றைப் பயன்படுத்துகிறது ( my_endpoint_payload_unpacked_drift_metrics
)
இதோ! நீங்கள் சேவை செய்ய மற்றும் உங்கள் மாதிரியை கண்காணிக்க வேண்டிய அனைத்தும் உங்களிடம் உள்ளன!
Databricks Assets Bundle மற்றும் Gitlab ஐப் பயன்படுத்தி இந்த செயல்முறையை எவ்வாறு தானியக்கமாக்குவது என்பதை அடுத்த பகுதியில் நான் உங்களுக்குக் காண்பிப்பேன்!