इस ट्यूटोरियल श्रृंखला के पहले भाग में, हमने डेटाब्रिक्स और स्पार्क का उपयोग करके एंड-टू-एंड MLOps पाइपलाइन बनाने के लिए पहला कदम उठाया, जो डेटाब्रिक्स के संदर्भ आर्किटेक्चर द्वारा निर्देशित था। यहाँ उन प्रमुख चरणों का संक्षिप्त विवरण दिया गया है जिन्हें हमने कवर किया है:
मेडलियन आर्किटेक्चर के लिए यूनिटी कैटलॉग की स्थापना : हमने यूनिटी कैटलॉग के भीतर अपने डेटा को कांस्य, रजत और स्वर्ण परतों में व्यवस्थित किया, जिससे एक संरचित और कुशल डेटा प्रबंधन प्रणाली स्थापित हुई।
यूनिटी कैटलॉग में डेटा को सम्मिलित करना : हमने प्रदर्शित किया कि किस प्रकार कच्चे डेटा को सिस्टम में आयात किया जाए, ताकि आगामी प्रसंस्करण चरणों के लिए स्थिरता और गुणवत्ता सुनिश्चित की जा सके।
मॉडल का प्रशिक्षण : डेटाब्रिक्स का उपयोग करते हुए, हमने स्केलेबल और प्रभावी मॉडल विकास के लिए सर्वोत्तम प्रथाओं का पालन करते हुए, हमारे डेटासेट के अनुरूप एक मशीन लर्निंग मॉडल को प्रशिक्षित किया।
हाइपरऑप्ट के साथ हाइपरपैरामीटर ट्यूनिंग : मॉडल के प्रदर्शन को बढ़ाने के लिए, हमने इष्टतम हाइपरपैरामीटर की खोज को स्वचालित करने के लिए हाइपरऑप्ट का उपयोग किया, जिससे सटीकता और दक्षता में सुधार हुआ।
डेटाब्रिक्स एमएलफ्लो के साथ प्रयोग ट्रैकिंग : हमने अपने प्रयोगों को लॉग और मॉनिटर करने के लिए एमएलफ्लो का उपयोग किया, आसान तुलना और पुनरुत्पादन के लिए मॉडल संस्करणों, मैट्रिक्स और मापदंडों का व्यापक रिकॉर्ड बनाए रखा।
इन आधारभूत चरणों के पूरा होने के साथ, आपका मॉडल अब तैनाती के लिए तैयार है। इस दूसरे भाग में, हम अपने सिस्टम में दो महत्वपूर्ण घटकों को एकीकृत करने पर ध्यान केंद्रित करेंगे:
आइये इसमें शामिल हों!
पिछले ब्लॉग का प्रस्थान बिंदु मॉडल मूल्यांकन था। अब कल्पना करें कि हमने तुलना की और पाया कि हमारा मॉडल इस उत्पादन मॉडल की तुलना में बेहतर प्रदर्शन दिखाता है। चूंकि हम उत्पादन में मॉडल का उपयोग करना चाहते हैं (मान लें), इसलिए हम अपने पास मौजूद सभी डेटा का लाभ उठाना चाहते हैं। अगला चरण पूरे डेटासेट का उपयोग करके मॉडल को प्रशिक्षित और परीक्षण करना है। फिर इसे अपने चैंपियन मॉडल के रूप में तैनात करके बाद में उपयोग के लिए अपने मॉडल को बनाए रखें। चूंकि यह अंतिम मॉडल है जिसे हम अनुमान के लिए उपयोग करना चाहते हैं, इसलिए हम मॉडल को प्रशिक्षित करने के लिए फ़ीचर इंजीनियरिंग क्लाइंट का उपयोग करते हैं। इस तरह हम न केवल मॉडल वंशावली को आसानी से ट्रैक कर सकते हैं, बल्कि क्लाइंट को स्कीमा सत्यापन और फ़ीचर परिवर्तन (यदि कोई हो) भी सौंप सकते हैं।
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
हम मॉडलों को प्रशिक्षित करने और लॉग करने के लिए फ़ीचर स्टोर या फ़ीचर इंजीनियरिंग API का भी उपयोग कर सकते हैं
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
जब हम फीचर इंजीनियरिंग एपीआई का उपयोग करते हैं तो हम कैटलॉग एक्सप्लोरर में मॉडल की वंशावली देख सकते हैं
अब मॉडल विवरण को अपडेट करें और इसे चैंपियन लेबल असाइन करें।
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
अब आगे बढ़ें और उस स्कीमा की जांच करें जिस पर आपने मॉडल पंजीकृत किया है। आपको अपने सभी अपडेट इस प्रकार दिखाई देने चाहिए
मॉडल स्टेज : यदि आप मॉडल रजिस्ट्री के लिए वर्कस्पेस का उपयोग करते हैं तो आपको अपने मॉडल को प्रबंधित करने के लिए स्टेज का उपयोग करना चाहिए। उपनामों का उपयोग करना काम नहीं करेगा। यहाँ देखें यह देखने के लिए कि यह कैसे काम करता है
अब कल्पना करें कि हम अपने मॉडल का उपयोग उत्पादन में अनुमान के लिए करना चाहते हैं। इस चरण में हम चैंपियन मॉडल को लोड करते हैं और इसका उपयोग प्रत्येक उपयोगकर्ता के लिए 20 मूवी अनुशंसाएँ बनाने के लिए करते हैं।
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
और आप देख सकते हैं कि हमने बैच स्कोरिंग के लिए समान प्रशिक्षण डेटा का उपयोग किया है। हालाँकि, अनुशंसा प्रणाली के मामले में यह समझ में आता है, अधिकांश एप्लिकेशन में हम कुछ अनदेखे डेटा को स्कोर करने के लिए मॉडल का उपयोग करना चाहते हैं। उदाहरण के लिए, कल्पना करें कि आप नेटफ्लिक्स हैं और दिन के अंत में उनकी नई देखी गई सूची के आधार पर उपयोगकर्ता की सिफारिशों को अपडेट करना चाहते हैं। हम दिन के अंत में विशिष्ट समय पर बैच स्कोरिंग चलाने वाले कार्य को शेड्यूल कर सकते हैं।
अब हम आगे बढ़ सकते हैं और प्रत्येक उपयोगकर्ता के लिए अनुशंसाएँ तैयार कर सकते हैं। इसके लिए हम प्रत्येक उपयोगकर्ता के लिए शीर्ष 20 आइटम ढूँढ़ते हैं
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
परिणाम कुछ इस प्रकार है
अंत में हम अपने UC पर डेल्टा लेबल के रूप में पूर्वानुमान को संग्रहीत कर सकते हैं या उन्हें डाउनस्ट्रीम सिस्टम मोंगो DB या Azure Cosmos DB पर प्रकाशित कर सकते हैं। हम पहले विकल्प के साथ चलते हैं
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
अब एक ऐसे मामले की कल्पना करें जिसमें हम वास्तविक समय के उपयोगकर्ता इंटरैक्शन के आधार पर अपनी सिफारिशों को अपडेट करना चाहते हैं। इस मामले के लिए हम मॉडल सर्विंग का उपयोग कर सकते हैं। जब कोई आपके मॉडल का उपयोग करना चाहता है, तो वे सर्वर को डेटा भेज सकते हैं। फिर सर्वर उस डेटा को आपके तैनात मॉडल को खिलाता है, जो कार्रवाई में जाता है, डेटा का विश्लेषण करता है, और एक भविष्यवाणी उत्पन्न करता है। उनका उपयोग वेब एप्लिकेशन, मोबाइल ऐप या यहां तक कि एम्बेडेड सिस्टम में भी किया जा सकता है। इस दृष्टिकोण का एक अनुप्रयोग A/B परीक्षण के लिए ट्रैफ़िक रूटिंग को सक्षम करना है।
ALS एल्गोरिदम का इस्तेमाल सीधे ऑनलाइन अनुमान के लिए नहीं किया जा सकता क्योंकि इसके लिए अनुशंसाओं को अपडेट करने के लिए पूरे डेटा (पुराने + नए) का उपयोग करके मॉडल को फिर से प्रशिक्षित करने की आवश्यकता होती है। ग्रेडिएंट डिसेंट लर्निंग एल्गोरिदम ऐसे मॉडल के उदाहरण हैं जिनका इस्तेमाल ऑनलाइन अपडेट के लिए किया जा सकता है। हम भविष्य की पोस्ट में इनमें से कुछ एल्गोरिदम पर नज़र डाल सकते हैं।
हालांकि, यह समझाने के लिए कि ऐसा मॉडल कैसे काम करेगा, हम एक (बेकार) मॉडल बना रहे हैं जो एंडपॉइंट की सेवा करता है जो उपयोगकर्ता द्वारा किसी मूवी को रेटिंग दिए जाने के आधार पर मूवी रेटिंग की भविष्यवाणी करता है!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
यह हमारे लिए मॉडल सर्विंग क्लस्टर बनाएगा और लॉन्च करेगा, इसलिए इसमें कुछ समय लगता है। अब अगर आप Serving
विंडो खोलते हैं तो आपको अपना एंडपॉइंट दिखना चाहिए।
हम एक से ज़्यादा मॉडल को सर्व करने के लिए एक एंडपॉइंट का इस्तेमाल कर सकते हैं। फिर हम A/B टेस्टिंग जैसे परिदृश्यों के लिए ट्रैफ़िक रूटिंग का इस्तेमाल कर सकते हैं या उत्पादन में अलग-अलग मॉडल के प्रदर्शन की तुलना कर सकते हैं।
डेटाब्रिक्स मॉडल सर्विंग में अनुमान तालिकाएँ हमारे तैनात मॉडलों के लिए एक स्वचालित लॉग के रूप में कार्य करती हैं। सक्षम होने पर, वे आने वाले अनुरोधों (पूर्वानुमान के लिए भेजा गया डेटा), संबंधित मॉडल आउटपुट (पूर्वानुमान) और यूनिटी कैटलॉग के भीतर डेल्टा टेबल के रूप में कुछ अन्य मेटाडेटा को कैप्चर करते हैं। हम निगरानी और डिबगिंग , वंशावली ट्रैकिंग और हमारे मॉडलों को फिर से प्रशिक्षित करने या ठीक करने के लिए डेटा संग्रह प्रक्रिया के लिए अनुमान तालिका का उपयोग कर सकते हैं।
हम मॉडल की निगरानी के लिए अपने सर्विंग एंडपॉइंट पर inference table
सक्षम कर सकते हैं। हम इसे पेलोड में auto_capture_config
प्रॉपर्टी को निर्दिष्ट करके कर सकते हैं जब हम पहली बार एंडपॉइंट बनाते हैं। या हम बाद में put
कमांड और config
एंडपॉइंट URL का उपयोग करके अपने एंडपॉइंट को इस प्रकार अपडेट करते हैं (अधिक जानकारी यहाँ )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
अब आइए एंडपॉइंट को कुछ डमी उपयोगकर्ता इंटरैक्शन डेटा से फीड करें
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
हम <catalog>.<schema>.<payload_table>
टेबल में एंडपॉइंट लॉग की जांच कर सकते हैं। टेबल में डेटा देखने में लगभग 10 मिनट लगते हैं।
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
आपको अपनी पेलोड तालिका कुछ इस तरह दिखनी चाहिए
इस अनुमान तालिका की स्कीमा को समझने के लिए, यहां “यूनिटी कैटलॉग अनुमान तालिका स्कीमा==” देखें।==
मॉडल और डेटा मॉनिटरिंग एक जटिल विषय है, जिसमें महारत हासिल करने के लिए बहुत समय की आवश्यकता होती है। डेटाब्रिक्स लेकहाउस मॉनिटरिंग (DLM) सामान्य उपयोग के मामलों के लिए मानक और अनुकूलन योग्य टेम्पलेट प्रदान करके एक उचित मॉनिटरिंग सिस्टम बनाने के ओवरहेड को कम करता है। हालाँकि, DLM और मॉडल मॉनिटरिंग में महारत हासिल करने के लिए आम तौर पर बहुत सारे प्रयोगों की आवश्यकता होती है। मैं आपको यहाँ मॉडल मॉनिटरिंग का विस्तृत अवलोकन नहीं देना चाहता, बल्कि आपको एक शुरुआती बिंदु देना चाहता हूँ। मैं भविष्य में इस विषय पर एक ब्लॉग समर्पित कर सकता हूँ।
डीएलएम कार्यक्षमताओं और सुविधाओं का संक्षिप्त सारांश
अब जबकि हमारा मॉडल तैयार है और चल रहा है, हम अपने सर्विंग एंडपॉइंट द्वारा जेनरेट की गई इनफ़रेंस टेबल का उपयोग करके मॉडल के प्रदर्शन और बहाव जैसे प्रमुख मीट्रिक की निगरानी कर सकते हैं ताकि समय के साथ हमारे डेटा या मॉडल में किसी भी विचलन या विसंगति का पता लगाया जा सके। यह सक्रिय दृष्टिकोण हमें समय पर सुधारात्मक कार्रवाई करने में मदद करता है, जैसे कि मॉडल को फिर से प्रशिक्षित करना या इसकी विशेषताओं को अपडेट करना, ताकि इष्टतम प्रदर्शन और व्यावसायिक उद्देश्यों के साथ संरेखण बनाए रखा जा सके।
डीएलएम तीन प्रकार के विश्लेषण या profile type
प्रदान करता है: टाइम सीरीज़ , स्नैपशॉट और इंफ़रेंस । चूँकि हम अपनी इंफ़रेंस टेबल का विश्लेषण करने में रुचि रखते हैं, इसलिए हम बाद वाले पर ध्यान केंद्रित करते हैं। मॉनिटरिंग के लिए एक टेबल का उपयोग करने के लिए - हमारी " प्राथमिक तालिका ", हमें यह सुनिश्चित करना चाहिए कि टेबल में सही संरचना हो। इंफ़रेंस टेबल के लिए, प्रत्येक पंक्ति को निम्नलिखित कॉलम वाले अनुरोधों के अनुरूप होना चाहिए:
मॉडल की विशेषताएं
मॉडल भविष्यवाणी
मॉडल आईडी
टाइमस्टैम्प : अनुमान अनुरोध का टाइमस्टैम्प
जमीनी सच्चाई (वैकल्पिक)
मॉडल आईडी उन मामलों के लिए महत्वपूर्ण है जब हम कई मॉडल पेश करते हैं और हम एक मॉनिटरिंग डैशबोर्ड में प्रत्येक मॉडल के प्रदर्शन को ट्रैक करना चाहते हैं। यदि एक से अधिक मॉडल आईडी उपलब्ध हैं, तो DLM इसका उपयोग डेटा को स्लाइस करने और प्रत्येक स्लाइस के लिए अलग-अलग मीट्रिक और स्टैटिक्स की गणना करने के लिए करता है।
डीएलएम एक निर्दिष्ट समय अंतराल के लिए प्रत्येक सांख्यिकी और मीट्रिक की गणना करता है। अनुमान विश्लेषण के लिए, इसने टाइमस्टैम्प कॉलम का उपयोग किया, साथ ही समय विंडो की पहचान करने के लिए उपयोगकर्ता द्वारा परिभाषित विंडो आकार का उपयोग किया। नीचे और अधिक जानकारी।
डीएलएम अनुमान तालिकाओं के लिए दो problem type
समर्थन करता है: " वर्गीकरण " या " प्रतिगमन "। यह इस विनिर्देश के आधार पर कुछ प्रासंगिक मीट्रिक और सांख्यिकी की गणना करता है।
DLM का उपयोग करने के लिए, हमें एक मॉनिटर बनाना चाहिए और उसे एक टेबल से जोड़ना चाहिए। जब हम ऐसा करते हैं तो DLM दो metric tables
बनाता है:
प्रोफ़ाइल मीट्रिक तालिका : इस तालिका में न्यूनतम, अधिकतम, शून्य और शून्य का प्रतिशत जैसे सारांश आँकड़े शामिल हैं। इसमें उपयोगकर्ता द्वारा परिभाषित समस्या प्रकार के आधार पर अतिरिक्त मीट्रिक भी शामिल हैं। उदाहरण के लिए वर्गीकरण मॉडल के लिए परिशुद्धता , रिकॉल और f1_score , और प्रतिगमन मॉडल के लिए mean_squared_error और mean_average_error ।
ड्रिफ्ट मीट्रिक टेबल : इसमें सांख्यिकी होती है जो मापती है कि समय के साथ या बेसलाइन मान (यदि प्रदान किया गया हो) के सापेक्ष डेटा का वितरण कैसे बदला है। यह ची-स्क्वायर टेस्ट, केएस टेस्ट जैसे उपायों की गणना करता है।
प्रत्येक तालिका के लिए संपूर्ण मीट्रिक की सूची देखने के लिए मॉनिटर मीट्रिक तालिका दस्तावेज़ पृष्ठ देखें। कस्टम मीट्रिक बनाना भी संभव है।
मॉनिटरिंग सिस्टम बनाने का एक महत्वपूर्ण पहलू यह सुनिश्चित करना है कि हमारे मॉनिटरिंग डैशबोर्ड को नवीनतम अनुमान डेटा तक पहुँच प्राप्त हो। इसके लिए हम अनुमान तालिका में संसाधित पंक्तियों का ट्रैक रखने के लिए डेल्टा टेबल स्ट्रीमिंग का उपयोग कर सकते हैं। हम मॉडल सर्विंग के अनुमान तालिका को अपने स्रोत तालिका ( readStream
) के रूप में और मॉनिटरिंग तालिका को सिंक तालिका ( writeStream
) के रूप में उपयोग करते हैं। हम यह भी सुनिश्चित करते हैं कि दोनों तालिकाओं पर परिवर्तन डेटा कैप्चर (CDC) सक्षम है (यह अनुमान तालिका पर डिफ़ॉल्ट रूप से सक्षम है)। इस तरह हम हर रिफ्रेश पर पूरी तालिका को फिर से प्रोसेस करने के बजाय केवल स्रोत तालिका में परिवर्तन - इन्सर्ट/अपडेट/डिलीट - को प्रोसेस करते हैं।
हमारी अनुमान तालिका पर निगरानी सक्षम करने के लिए हम निम्नलिखित कदम उठाते हैं:
सबसे पहले हमें Lakehouse Monitoring API इंस्टॉल करना होगा। यदि आप Databricks rum time 15.3 LTS और उससे ऊपर का उपयोग करते हैं तो यह पहले से इंस्टॉल होना चाहिए:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
आइए अनुमान तालिका को स्ट्रीमिंग तालिका के रूप में पढ़ें
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
इसके बाद हमें टेबल को ऊपर बताए अनुसार सही प्रारूप में रखना होगा। इस टेबल में प्रत्येक पूर्वानुमान के लिए एक पंक्ति होनी चाहिए जिसमें संबंधित विशेषताएं और पूर्वानुमान मूल्य हों। मॉडल सर्विंग एंडपॉइंट से हमें जो अनुमान तालिका मिलती है, वह एंडपॉइंट अनुरोधों और प्रतिक्रियाओं को नेस्टेड JSON प्रारूप में संग्रहीत करती है। अनुरोध और प्रतिक्रिया कॉलम के लिए JSON पेलोड का एक उदाहरण यहां दिया गया है।
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
इस तालिका को सही स्कीमा में अनपैक करने के लिए हम निम्नलिखित कोड का उपयोग कर सकते हैं जिसे डेटाब्रिक्स डॉक्यूमेंटेशन ( इन्फ्रेंस टेबल लेकहाउस मॉनिटरिंग स्टार्टर नोटबुक ) से अनुकूलित किया गया है।
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
परिणामी तालिका इस प्रकार दिखाई देगी:
इसके बाद हमें अपनी सिंक तालिका को आरंभ करना चाहिए
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
और परिणाम लिखें
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
अंत में, हम अपनी बेसलाइन तालिका बनाते हैं। DLM इस तालिका का उपयोग बेसलाइन और प्राथमिक मॉडल के समान स्तंभों के वितरण की तुलना करके बहाव की गणना करने के लिए करता है। बेसलाइन तालिका में प्राथमिक स्तंभ के समान ही विशेषता स्तंभ और साथ ही समान मॉडल पहचान स्तंभ होना चाहिए। बेसलाइन तालिका के लिए हम अपने सत्यापन डेटासेट की भविष्यवाणी तालिका का उपयोग करते हैं जिसे हम अपने मॉडल को सर्वश्रेष्ठ हाइपरपैरामीटर का उपयोग करके प्रशिक्षित करने के बाद पहले संग्रहीत करते हैं। बहाव मीट्रिक की गणना करने के लिए, डेटाब्रिक्स प्राथमिक और बेसलाइन तालिका दोनों के लिए प्रोफ़ाइल मीट्रिक की गणना करता है। यहाँ आप प्राथमिक तालिका और बेसलाइन तालिका के बारे में पढ़ सकते हैं।
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
अब हम अपना मॉनिटरिंग डैशबोर्ड बनाने के लिए तैयार हैं। हम इसे UI का उपयोग करके कर सकते हैं या लेकहाउस मॉनिटरिंग एपीआई। यहाँ हम दूसरे विकल्प का उपयोग करते हैं:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
कोड चलाने के बाद डेटाब्रिक्स को सभी मीट्रिक की गणना करने में कुछ समय लगता है। डैशबोर्ड देखने के लिए अपने सिंक टेबल (यानी unpacked_requests_table_name
) के Quality
टैब पर जाएँ। आपको निम्न जैसा पेज दिखाई देगा।
यदि आप व्यू refresh history
पर क्लिक करते हैं तो आपको अपने चल रहे, लंबित और पिछले रिफ्रेश दिखाई देंगे। अपना डैशबोर्ड खोलने के लिए View Dashboard
पर क्लिक करें।
इसलिए हम अनुमान तालिका ( my_endpoint_payload
) से शुरू करते हैं, इसे प्रोसेस करते हैं और परिणाम को my_endpoint_payload_unpacked
में सेव करते हैं और इस तालिका को हमारी बेसलाइन तालिका ( base_table_als
) के साथ हमारे मॉनिटरिंग API में पास करते हैं। DLM प्रत्येक तालिका ( my_endpoint_payload_unpacked_profile_metric
) के लिए प्रोफ़ाइल मेट्रिक्स की गणना करता है और उनका उपयोग ड्रिफ्ट मेट्रिक्स ( my_endpoint_payload_unpacked_drift_metrics
) की गणना करने के लिए करता है।
बस, अब आपके पास अपने मॉडल की सेवा और निगरानी के लिए आवश्यक सभी चीजें मौजूद हैं!
अगले भाग में मैं आपको दिखाऊंगा कि डेटाब्रिक्स एसेट्स बंडल और गिटलैब का उपयोग करके इस प्रक्रिया को कैसे स्वचालित किया जाए!
इस ट्यूटोरियल श्रृंखला के पहले भाग में, हमने डेटाब्रिक्स और स्पार्क का उपयोग करके एंड-टू-एंड MLOps पाइपलाइन बनाने के लिए पहला कदम उठाया, जो डेटाब्रिक्स के संदर्भ आर्किटेक्चर द्वारा निर्देशित था। यहाँ उन प्रमुख चरणों का संक्षिप्त विवरण दिया गया है जिन्हें हमने कवर किया है:
मेडलियन आर्किटेक्चर के लिए यूनिटी कैटलॉग की स्थापना : हमने यूनिटी कैटलॉग के भीतर अपने डेटा को कांस्य, रजत और स्वर्ण परतों में व्यवस्थित किया, जिससे एक संरचित और कुशल डेटा प्रबंधन प्रणाली स्थापित हुई।
यूनिटी कैटलॉग में डेटा को सम्मिलित करना : हमने प्रदर्शित किया कि किस प्रकार कच्चे डेटा को सिस्टम में आयात किया जाए, ताकि आगामी प्रसंस्करण चरणों के लिए स्थिरता और गुणवत्ता सुनिश्चित की जा सके।
मॉडल का प्रशिक्षण : डेटाब्रिक्स का उपयोग करते हुए, हमने स्केलेबल और प्रभावी मॉडल विकास के लिए सर्वोत्तम प्रथाओं का पालन करते हुए, हमारे डेटासेट के अनुरूप एक मशीन लर्निंग मॉडल को प्रशिक्षित किया।
हाइपरऑप्ट के साथ हाइपरपैरामीटर ट्यूनिंग : मॉडल के प्रदर्शन को बढ़ाने के लिए, हमने इष्टतम हाइपरपैरामीटर की खोज को स्वचालित करने के लिए हाइपरऑप्ट का उपयोग किया, जिससे सटीकता और दक्षता में सुधार हुआ।
डेटाब्रिक्स एमएलफ्लो के साथ प्रयोग ट्रैकिंग : हमने अपने प्रयोगों को लॉग और मॉनिटर करने के लिए एमएलफ्लो का उपयोग किया, आसान तुलना और पुनरुत्पादन के लिए मॉडल संस्करणों, मैट्रिक्स और मापदंडों का व्यापक रिकॉर्ड बनाए रखा।
इन आधारभूत चरणों के पूरा होने के साथ, आपका मॉडल अब तैनाती के लिए तैयार है। इस दूसरे भाग में, हम अपने सिस्टम में दो महत्वपूर्ण घटकों को एकीकृत करने पर ध्यान केंद्रित करेंगे:
आइये इसमें शामिल हों!
पिछले ब्लॉग का प्रस्थान बिंदु मॉडल मूल्यांकन था। अब कल्पना करें कि हमने तुलना की और पाया कि हमारा मॉडल इस उत्पादन मॉडल की तुलना में बेहतर प्रदर्शन दिखाता है। चूंकि हम उत्पादन में मॉडल का उपयोग करना चाहते हैं (मान लें), इसलिए हम अपने पास मौजूद सभी डेटा का लाभ उठाना चाहते हैं। अगला चरण पूरे डेटासेट का उपयोग करके मॉडल को प्रशिक्षित और परीक्षण करना है। फिर इसे अपने चैंपियन मॉडल के रूप में तैनात करके बाद में उपयोग के लिए अपने मॉडल को बनाए रखें। चूंकि यह अंतिम मॉडल है जिसे हम अनुमान के लिए उपयोग करना चाहते हैं, इसलिए हम मॉडल को प्रशिक्षित करने के लिए फ़ीचर इंजीनियरिंग क्लाइंट का उपयोग करते हैं। इस तरह हम न केवल मॉडल वंशावली को आसानी से ट्रैक कर सकते हैं, बल्कि क्लाइंट को स्कीमा सत्यापन और फ़ीचर परिवर्तन (यदि कोई हो) भी सौंप सकते हैं।
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
हम मॉडलों को प्रशिक्षित करने और लॉग करने के लिए फ़ीचर स्टोर या फ़ीचर इंजीनियरिंग API का भी उपयोग कर सकते हैं
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
जब हम फीचर इंजीनियरिंग एपीआई का उपयोग करते हैं तो हम कैटलॉग एक्सप्लोरर में मॉडल की वंशावली देख सकते हैं
अब मॉडल विवरण को अपडेट करें और इसे चैंपियन लेबल असाइन करें।
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
अब आगे बढ़ें और उस स्कीमा की जांच करें जिस पर आपने मॉडल पंजीकृत किया है। आपको अपने सभी अपडेट इस प्रकार दिखाई देने चाहिए
मॉडल स्टेज : यदि आप मॉडल रजिस्ट्री के लिए वर्कस्पेस का उपयोग करते हैं तो आपको अपने मॉडल को प्रबंधित करने के लिए स्टेज का उपयोग करना चाहिए। उपनामों का उपयोग करना काम नहीं करेगा। यहाँ देखें यह देखने के लिए कि यह कैसे काम करता है
अब कल्पना करें कि हम अपने मॉडल का उपयोग उत्पादन में अनुमान के लिए करना चाहते हैं। इस चरण में हम चैंपियन मॉडल को लोड करते हैं और इसका उपयोग प्रत्येक उपयोगकर्ता के लिए 20 मूवी अनुशंसाएँ बनाने के लिए करते हैं।
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
और आप देख सकते हैं कि हमने बैच स्कोरिंग के लिए समान प्रशिक्षण डेटा का उपयोग किया है। हालाँकि, अनुशंसा प्रणाली के मामले में यह समझ में आता है, अधिकांश एप्लिकेशन में हम कुछ अनदेखे डेटा को स्कोर करने के लिए मॉडल का उपयोग करना चाहते हैं। उदाहरण के लिए, कल्पना करें कि आप नेटफ्लिक्स हैं और दिन के अंत में उनकी नई देखी गई सूची के आधार पर उपयोगकर्ता की सिफारिशों को अपडेट करना चाहते हैं। हम दिन के अंत में विशिष्ट समय पर बैच स्कोरिंग चलाने वाले कार्य को शेड्यूल कर सकते हैं।
अब हम आगे बढ़ सकते हैं और प्रत्येक उपयोगकर्ता के लिए अनुशंसाएँ तैयार कर सकते हैं। इसके लिए हम प्रत्येक उपयोगकर्ता के लिए शीर्ष 20 आइटम ढूँढ़ते हैं
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
परिणाम कुछ इस प्रकार है
अंत में हम अपने UC पर डेल्टा लेबल के रूप में पूर्वानुमान को संग्रहीत कर सकते हैं या उन्हें डाउनस्ट्रीम सिस्टम मोंगो DB या Azure Cosmos DB पर प्रकाशित कर सकते हैं। हम पहले विकल्प के साथ चलते हैं
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
अब एक ऐसे मामले की कल्पना करें जिसमें हम वास्तविक समय के उपयोगकर्ता इंटरैक्शन के आधार पर अपनी सिफारिशों को अपडेट करना चाहते हैं। इस मामले के लिए हम मॉडल सर्विंग का उपयोग कर सकते हैं। जब कोई आपके मॉडल का उपयोग करना चाहता है, तो वे सर्वर को डेटा भेज सकते हैं। फिर सर्वर उस डेटा को आपके तैनात मॉडल को खिलाता है, जो कार्रवाई में जाता है, डेटा का विश्लेषण करता है, और एक भविष्यवाणी उत्पन्न करता है। उनका उपयोग वेब एप्लिकेशन, मोबाइल ऐप या यहां तक कि एम्बेडेड सिस्टम में भी किया जा सकता है। इस दृष्टिकोण का एक अनुप्रयोग A/B परीक्षण के लिए ट्रैफ़िक रूटिंग को सक्षम करना है।
ALS एल्गोरिदम का इस्तेमाल सीधे ऑनलाइन अनुमान के लिए नहीं किया जा सकता क्योंकि इसके लिए अनुशंसाओं को अपडेट करने के लिए पूरे डेटा (पुराने + नए) का उपयोग करके मॉडल को फिर से प्रशिक्षित करने की आवश्यकता होती है। ग्रेडिएंट डिसेंट लर्निंग एल्गोरिदम ऐसे मॉडल के उदाहरण हैं जिनका इस्तेमाल ऑनलाइन अपडेट के लिए किया जा सकता है। हम भविष्य की पोस्ट में इनमें से कुछ एल्गोरिदम पर नज़र डाल सकते हैं।
हालांकि, यह समझाने के लिए कि ऐसा मॉडल कैसे काम करेगा, हम एक (बेकार) मॉडल बना रहे हैं जो एंडपॉइंट की सेवा करता है जो उपयोगकर्ता द्वारा किसी मूवी को रेटिंग दिए जाने के आधार पर मूवी रेटिंग की भविष्यवाणी करता है!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
यह हमारे लिए मॉडल सर्विंग क्लस्टर बनाएगा और लॉन्च करेगा, इसलिए इसमें कुछ समय लगता है। अब अगर आप Serving
विंडो खोलते हैं तो आपको अपना एंडपॉइंट दिखना चाहिए।
हम एक से ज़्यादा मॉडल को सर्व करने के लिए एक एंडपॉइंट का इस्तेमाल कर सकते हैं। फिर हम A/B टेस्टिंग जैसे परिदृश्यों के लिए ट्रैफ़िक रूटिंग का इस्तेमाल कर सकते हैं या उत्पादन में अलग-अलग मॉडल के प्रदर्शन की तुलना कर सकते हैं।
डेटाब्रिक्स मॉडल सर्विंग में अनुमान तालिकाएँ हमारे तैनात मॉडलों के लिए एक स्वचालित लॉग के रूप में कार्य करती हैं। सक्षम होने पर, वे आने वाले अनुरोधों (पूर्वानुमान के लिए भेजा गया डेटा), संबंधित मॉडल आउटपुट (पूर्वानुमान) और यूनिटी कैटलॉग के भीतर डेल्टा टेबल के रूप में कुछ अन्य मेटाडेटा को कैप्चर करते हैं। हम निगरानी और डिबगिंग , वंशावली ट्रैकिंग और हमारे मॉडलों को फिर से प्रशिक्षित करने या ठीक करने के लिए डेटा संग्रह प्रक्रिया के लिए अनुमान तालिका का उपयोग कर सकते हैं।
हम मॉडल की निगरानी के लिए अपने सर्विंग एंडपॉइंट पर inference table
सक्षम कर सकते हैं। हम इसे पेलोड में auto_capture_config
प्रॉपर्टी को निर्दिष्ट करके कर सकते हैं जब हम पहली बार एंडपॉइंट बनाते हैं। या हम बाद में put
कमांड और config
एंडपॉइंट URL का उपयोग करके अपने एंडपॉइंट को इस प्रकार अपडेट करते हैं (अधिक जानकारी यहाँ )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
अब आइए एंडपॉइंट को कुछ डमी उपयोगकर्ता इंटरैक्शन डेटा से फीड करें
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
हम <catalog>.<schema>.<payload_table>
टेबल में एंडपॉइंट लॉग की जांच कर सकते हैं। टेबल में डेटा देखने में लगभग 10 मिनट लगते हैं।
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
आपको अपनी पेलोड तालिका कुछ इस तरह दिखनी चाहिए
इस अनुमान तालिका की स्कीमा को समझने के लिए, यहां “यूनिटी कैटलॉग अनुमान तालिका स्कीमा==” देखें।==
मॉडल और डेटा मॉनिटरिंग एक जटिल विषय है, जिसमें महारत हासिल करने के लिए बहुत समय की आवश्यकता होती है। डेटाब्रिक्स लेकहाउस मॉनिटरिंग (DLM) सामान्य उपयोग के मामलों के लिए मानक और अनुकूलन योग्य टेम्पलेट प्रदान करके एक उचित मॉनिटरिंग सिस्टम बनाने के ओवरहेड को कम करता है। हालाँकि, DLM और मॉडल मॉनिटरिंग में महारत हासिल करने के लिए आम तौर पर बहुत सारे प्रयोगों की आवश्यकता होती है। मैं आपको यहाँ मॉडल मॉनिटरिंग का विस्तृत अवलोकन नहीं देना चाहता, बल्कि आपको एक शुरुआती बिंदु देना चाहता हूँ। मैं भविष्य में इस विषय पर एक ब्लॉग समर्पित कर सकता हूँ।
डीएलएम कार्यक्षमताओं और सुविधाओं का संक्षिप्त सारांश
अब जबकि हमारा मॉडल तैयार है और चल रहा है, हम अपने सर्विंग एंडपॉइंट द्वारा जेनरेट की गई इनफ़रेंस टेबल का उपयोग करके मॉडल के प्रदर्शन और बहाव जैसे प्रमुख मीट्रिक की निगरानी कर सकते हैं ताकि समय के साथ हमारे डेटा या मॉडल में किसी भी विचलन या विसंगति का पता लगाया जा सके। यह सक्रिय दृष्टिकोण हमें समय पर सुधारात्मक कार्रवाई करने में मदद करता है, जैसे कि मॉडल को फिर से प्रशिक्षित करना या इसकी विशेषताओं को अपडेट करना, ताकि इष्टतम प्रदर्शन और व्यावसायिक उद्देश्यों के साथ संरेखण बनाए रखा जा सके।
डीएलएम तीन प्रकार के विश्लेषण या profile type
प्रदान करता है: टाइम सीरीज़ , स्नैपशॉट और इंफ़रेंस । चूँकि हम अपनी इंफ़रेंस टेबल का विश्लेषण करने में रुचि रखते हैं, इसलिए हम बाद वाले पर ध्यान केंद्रित करते हैं। मॉनिटरिंग के लिए एक टेबल का उपयोग करने के लिए - हमारी " प्राथमिक तालिका ", हमें यह सुनिश्चित करना चाहिए कि टेबल में सही संरचना हो। इंफ़रेंस टेबल के लिए, प्रत्येक पंक्ति को निम्नलिखित कॉलम वाले अनुरोधों के अनुरूप होना चाहिए:
मॉडल की विशेषताएं
मॉडल भविष्यवाणी
मॉडल आईडी
टाइमस्टैम्प : अनुमान अनुरोध का टाइमस्टैम्प
जमीनी सच्चाई (वैकल्पिक)
मॉडल आईडी उन मामलों के लिए महत्वपूर्ण है जब हम कई मॉडल पेश करते हैं और हम एक मॉनिटरिंग डैशबोर्ड में प्रत्येक मॉडल के प्रदर्शन को ट्रैक करना चाहते हैं। यदि एक से अधिक मॉडल आईडी उपलब्ध हैं, तो DLM इसका उपयोग डेटा को स्लाइस करने और प्रत्येक स्लाइस के लिए अलग-अलग मीट्रिक और स्टैटिक्स की गणना करने के लिए करता है।
डीएलएम एक निर्दिष्ट समय अंतराल के लिए प्रत्येक सांख्यिकी और मीट्रिक की गणना करता है। अनुमान विश्लेषण के लिए, इसने टाइमस्टैम्प कॉलम का उपयोग किया, साथ ही समय विंडो की पहचान करने के लिए उपयोगकर्ता द्वारा परिभाषित विंडो आकार का उपयोग किया। नीचे और अधिक जानकारी।
डीएलएम अनुमान तालिकाओं के लिए दो problem type
समर्थन करता है: " वर्गीकरण " या " प्रतिगमन "। यह इस विनिर्देश के आधार पर कुछ प्रासंगिक मीट्रिक और सांख्यिकी की गणना करता है।
DLM का उपयोग करने के लिए, हमें एक मॉनिटर बनाना चाहिए और उसे एक टेबल से जोड़ना चाहिए। जब हम ऐसा करते हैं तो DLM दो metric tables
बनाता है:
प्रोफ़ाइल मीट्रिक तालिका : इस तालिका में न्यूनतम, अधिकतम, शून्य और शून्य का प्रतिशत जैसे सारांश आँकड़े शामिल हैं। इसमें उपयोगकर्ता द्वारा परिभाषित समस्या प्रकार के आधार पर अतिरिक्त मीट्रिक भी शामिल हैं। उदाहरण के लिए वर्गीकरण मॉडल के लिए परिशुद्धता , रिकॉल और f1_score , और प्रतिगमन मॉडल के लिए mean_squared_error और mean_average_error ।
ड्रिफ्ट मीट्रिक टेबल : इसमें सांख्यिकी होती है जो मापती है कि समय के साथ या बेसलाइन मान (यदि प्रदान किया गया हो) के सापेक्ष डेटा का वितरण कैसे बदला है। यह ची-स्क्वायर टेस्ट, केएस टेस्ट जैसे उपायों की गणना करता है।
प्रत्येक तालिका के लिए संपूर्ण मीट्रिक की सूची देखने के लिए मॉनिटर मीट्रिक तालिका दस्तावेज़ पृष्ठ देखें। कस्टम मीट्रिक बनाना भी संभव है।
मॉनिटरिंग सिस्टम बनाने का एक महत्वपूर्ण पहलू यह सुनिश्चित करना है कि हमारे मॉनिटरिंग डैशबोर्ड को नवीनतम अनुमान डेटा तक पहुँच प्राप्त हो। इसके लिए हम अनुमान तालिका में संसाधित पंक्तियों का ट्रैक रखने के लिए डेल्टा टेबल स्ट्रीमिंग का उपयोग कर सकते हैं। हम मॉडल सर्विंग के अनुमान तालिका को अपने स्रोत तालिका ( readStream
) के रूप में और मॉनिटरिंग तालिका को सिंक तालिका ( writeStream
) के रूप में उपयोग करते हैं। हम यह भी सुनिश्चित करते हैं कि दोनों तालिकाओं पर परिवर्तन डेटा कैप्चर (CDC) सक्षम है (यह अनुमान तालिका पर डिफ़ॉल्ट रूप से सक्षम है)। इस तरह हम हर रिफ्रेश पर पूरी तालिका को फिर से प्रोसेस करने के बजाय केवल स्रोत तालिका में परिवर्तन - इन्सर्ट/अपडेट/डिलीट - को प्रोसेस करते हैं।
हमारी अनुमान तालिका पर निगरानी सक्षम करने के लिए हम निम्नलिखित कदम उठाते हैं:
सबसे पहले हमें Lakehouse Monitoring API इंस्टॉल करना होगा। यदि आप Databricks rum time 15.3 LTS और उससे ऊपर का उपयोग करते हैं तो यह पहले से इंस्टॉल होना चाहिए:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
आइए अनुमान तालिका को स्ट्रीमिंग तालिका के रूप में पढ़ें
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
इसके बाद हमें टेबल को ऊपर बताए अनुसार सही प्रारूप में रखना होगा। इस टेबल में प्रत्येक पूर्वानुमान के लिए एक पंक्ति होनी चाहिए जिसमें संबंधित विशेषताएं और पूर्वानुमान मूल्य हों। मॉडल सर्विंग एंडपॉइंट से हमें जो अनुमान तालिका मिलती है, वह एंडपॉइंट अनुरोधों और प्रतिक्रियाओं को नेस्टेड JSON प्रारूप में संग्रहीत करती है। अनुरोध और प्रतिक्रिया कॉलम के लिए JSON पेलोड का एक उदाहरण यहां दिया गया है।
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
इस तालिका को सही स्कीमा में अनपैक करने के लिए हम निम्नलिखित कोड का उपयोग कर सकते हैं जिसे डेटाब्रिक्स डॉक्यूमेंटेशन ( इन्फ्रेंस टेबल लेकहाउस मॉनिटरिंग स्टार्टर नोटबुक ) से अनुकूलित किया गया है।
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
परिणामी तालिका इस प्रकार दिखाई देगी:
इसके बाद हमें अपनी सिंक तालिका को आरंभ करना चाहिए
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
और परिणाम लिखें
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
अंत में, हम अपनी बेसलाइन तालिका बनाते हैं। DLM इस तालिका का उपयोग बेसलाइन और प्राथमिक मॉडल के समान स्तंभों के वितरण की तुलना करके बहाव की गणना करने के लिए करता है। बेसलाइन तालिका में प्राथमिक स्तंभ के समान ही विशेषता स्तंभ और साथ ही समान मॉडल पहचान स्तंभ होना चाहिए। बेसलाइन तालिका के लिए हम अपने सत्यापन डेटासेट की भविष्यवाणी तालिका का उपयोग करते हैं जिसे हम अपने मॉडल को सर्वश्रेष्ठ हाइपरपैरामीटर का उपयोग करके प्रशिक्षित करने के बाद पहले संग्रहीत करते हैं। बहाव मीट्रिक की गणना करने के लिए, डेटाब्रिक्स प्राथमिक और बेसलाइन तालिका दोनों के लिए प्रोफ़ाइल मीट्रिक की गणना करता है। यहाँ आप प्राथमिक तालिका और बेसलाइन तालिका के बारे में पढ़ सकते हैं।
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
अब हम अपना मॉनिटरिंग डैशबोर्ड बनाने के लिए तैयार हैं। हम इसे UI का उपयोग करके कर सकते हैं या लेकहाउस मॉनिटरिंग एपीआई। यहाँ हम दूसरे विकल्प का उपयोग करते हैं:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
कोड चलाने के बाद डेटाब्रिक्स को सभी मीट्रिक की गणना करने में कुछ समय लगता है। डैशबोर्ड देखने के लिए अपने सिंक टेबल (यानी unpacked_requests_table_name
) के Quality
टैब पर जाएँ। आपको निम्न जैसा पेज दिखाई देगा।
यदि आप व्यू refresh history
पर क्लिक करते हैं तो आपको अपने चल रहे, लंबित और पिछले रिफ्रेश दिखाई देंगे। अपना डैशबोर्ड खोलने के लिए View Dashboard
पर क्लिक करें।
इसलिए हम अनुमान तालिका ( my_endpoint_payload
) से शुरू करते हैं, इसे प्रोसेस करते हैं और परिणाम को my_endpoint_payload_unpacked
में सेव करते हैं और इस तालिका को हमारी बेसलाइन तालिका ( base_table_als
) के साथ हमारे मॉनिटरिंग API में पास करते हैं। DLM प्रत्येक तालिका ( my_endpoint_payload_unpacked_profile_metric
) के लिए प्रोफ़ाइल मेट्रिक्स की गणना करता है और उनका उपयोग ड्रिफ्ट मेट्रिक्स ( my_endpoint_payload_unpacked_drift_metrics
) की गणना करने के लिए करता है।
बस, अब आपके पास अपने मॉडल की सेवा और निगरानी के लिए आवश्यक सब कुछ है!
अगले भाग में मैं आपको दिखाऊंगा कि डेटाब्रिक्स एसेट्स बंडल और गिटलैब का उपयोग करके इस प्रक्रिया को कैसे स्वचालित किया जाए!