בחלק הראשון של סדרת הדרכה זו , נקטנו את הצעדים הראשונים לבניית צינור MLOps מקצה לקצה באמצעות Databricks ו-Spark, בהנחיית ארכיטקטורת ההתייחסות של Databricks. להלן תקציר של השלבים העיקריים שסיקרנו:
הגדרת קטלוג האחדות לארכיטקטורת מדליונים : ארגנו את הנתונים שלנו לשכבות ברונזה, כסף וזהב בתוך קטלוג האחדות, ויצרנו מערכת מובנית ויעילה לניהול נתונים.
הכנסת נתונים לקטלוג Unity : הדגמנו כיצד לייבא נתונים גולמיים למערכת, תוך הבטחת עקביות ואיכות לשלבי העיבוד הבאים.
הכשרת המודל : באמצעות Databricks, הכשרנו מודל למידת מכונה המותאם למערך הנתונים שלנו, בעקבות שיטות עבודה מומלצות לפיתוח מודל מדרגי ואפקטיבי.
כוונון היפרפרמטרים עם HyperOpt : כדי לשפר את ביצועי המודל, השתמשנו ב-HyperOpt כדי להפוך את החיפוש אחר פרמטרים אופטימליים לאוטומטיים, תוך שיפור הדיוק והיעילות.
מעקב אחר ניסוי עם Databricks MLflow : השתמשנו ב-MLflow לרישום ולנטר את הניסויים שלנו, תוך שמירה על תיעוד מקיף של גרסאות מודל, מדדים ופרמטרים להשוואה ושחזור קלות.
עם השלמת השלבים הבסיסיים האלה, המודל שלך מוכן כעת לפריסה. בחלק השני הזה, נתמקד בשילוב שני רכיבים קריטיים במערכת שלנו:
בואו ניכנס לזה!
נקודת המוצא של הבלוג האחרון הייתה הערכת מודלים. כעת דמיינו שעשינו את ההשוואה וגילינו שהדגם שלנו מציג ביצועים גבוהים יותר בהשוואה לדגם הייצור הזה. מכיוון שאנו רוצים (מניחים) להשתמש במודל בייצור, אנו רוצים לנצל את כל הנתונים שיש לנו. השלב הבא הוא לאמן ולבדוק את המודל באמצעות מערך הנתונים המלא. לאחר מכן המשיך את המודל שלנו לשימוש מאוחר יותר על ידי פריסתו כדגם האלוף שלנו. מכיוון שזהו המודל הסופי שאנו רוצים להשתמש בו להסקת מסקנות, אנו משתמשים בלקוח Feature Engineering כדי לאמן את המודל. בדרך זו אנו לא רק עוקבים בקלות אחר שושלת המודל, אלא גם מורידים ללקוח את אימות הסכימה ושינוי התכונות (אם יש).
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
אנו יכולים גם להשתמש ב- Feature Store או ב-API של Feature Engineering כדי להכשיר ולרשום את המודלים
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
כאשר אנו משתמשים ב-API להנדסת תכונות אנו יכולים לראות את השושלת של המודל ב-Catalog Explorer
כעת אפשר לעדכן את תיאור הדגם ולהקצות לו תווית Champion.
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
עכשיו קדימה ובדוק את הסכימה שרשמת את המודל. אתה אמור לראות את כל העדכונים שלך כדלקמן
שלבי מודל : אם אתה משתמש בסביבת עבודה עבור רישום מודלים, עליך לבצע שלבים לניהול המודלים שלך. שימוש בכינויים לא יעבוד. בדוק כאן כדי לראות איך זה עובד
עכשיו תארו לעצמכם שאנחנו רוצים להשתמש במודל שלנו בייצור כדי להסיק. בשלב זה אנו טוענים את מודל האלוף ומשתמשים בו כדי להפיק 20 המלצות לסרטים עבור כל משתמש.
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
ואתה יכול לראות שהשתמשנו באותם נתוני אימון לניקוד אצווה. למרות שבמקרה של מערכות ממליצים זה הגיוני, ברוב האפליקציות אנחנו רוצים להשתמש במודל כדי לצבור כמה נתונים בלתי נראים. לדוגמה, הדמיה שלך היא נטפליקס ורוצה לעדכן את המלצות המשתמש בסופו של יום בהתבסס על רשימת הנצפים החדשה שלהם. אנחנו יכולים לתזמן עבודה שמפעילה את ניקוד האצווה בזמן ספציפי בסוף היום.
כעת נוכל להמשיך ולהפיק את ההמלצות עבור כל משתמש. לשם כך אנו מוצאים את 20 הפריטים המובילים לכל משתמש
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
כך נראית התוצאה
לבסוף נוכל לאחסן את החיזוי כתווית דלתא ב-UC שלנו או לפרסם אותם ב-Mongo DB או Azure Cosmos DB במערכות מטה. אנחנו הולכים עם האופציה הראשונה
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
כעת דמיינו מקרה שבו אנו רוצים לעדכן את ההמלצות שלנו על סמך אינטראקציות משתמש בזמן אמת. למקרה זה נוכל להשתמש בהגשה מודל. כאשר מישהו רוצה להשתמש במודל שלך, הוא יכול לשלוח נתונים לשרת. לאחר מכן השרת מזין את הנתונים הללו למודל הפרוס שלך, אשר נכנס לפעולה, מנתח את הנתונים ויוצר חיזוי. ניתן להשתמש בהם ביישומי אינטרנט, באפליקציות לנייד, או אפילו במערכות משובצות. אחד היישומים של גישה זו הוא לאפשר ניתוב תעבורה לבדיקת A/B.
לא ניתן להשתמש באלגוריתם ALS ישירות להסקת מסקנות מקוונות מכיוון שהוא מצריך אימון מחדש של המודל תוך שימוש בכל הנתונים (ישן + חדש) כדי לעדכן את ההמלצות. אלגוריתמי למידה של ירידה בדרגה הם דוגמאות למודל שניתן להשתמש בהם עבור עדכונים מקוונים. אנו עשויים להסתכל על כמה מאלגוריתמים אלה בפוסט הבא.
עם זאת, רק כדי להמחיש כיצד מודל כזה יעבוד, אנו יוצרים מודל (חסר תועלת) המשרת נקודת קצה אשר מנבא דירוג סרטים על סמך כל פעם שמשתמש מדרג סרט!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
זה ייצור ואשכול דגמי הגשה לארוחת צהריים עבורנו כך שזה לוקח קצת זמן. כעת, אם אתה פותח את חלון Serving
, אתה אמור לראות את נקודת הקצה שלך.
אנחנו יכולים להשתמש בנקודת קצה אחת כדי לשרת מודלים מרובים. לאחר מכן נוכל להשתמש בניתוב תעבורה עבור תרחישים כמו בדיקות A/B או להשוות את הביצועים של מודלים שונים בייצור.
טבלאות מסקנות ב-Databricks Model Serving פועלות כיומן אוטומטי עבור המודלים הפרוסים שלנו. כשהם מופעלים, הם לוכדים בקשות נכנסות (נתונים שנשלחו לחיזוי), את פלטי המודל התואמים (חיזויים), וכמה מטא נתונים אחרים כטבלת Delta בתוך Unity Catalog. אנו יכולים להשתמש בטבלת מסקנות לניטור וניפוי באגים , מעקב שושלת ונוהל איסוף נתונים להכשרה מחדש או כוונון עדין של המודלים שלנו.
אנו יכולים לאפשר את inference table
בנקודת הקצה המשרתת שלנו כדי לנטר את המודל. אנו יכולים לעשות זאת על ידי ציון המאפיינים auto_capture_config
במטען כאשר אנו יוצרים לראשונה את נקודת הקצה. לחלופין, אנו מעדכנים את נקודת הקצה שלנו לאחר מכן באמצעות הפקודה put
וכתובת ה-URL של נקודת הקצה config
כדלקמן (עוד כאן )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
עכשיו בואו נזין את נקודת הקצה עם כמה נתוני אינטראקציה של משתמש דמה
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
אנו יכולים לבדוק את יומני נקודות הקצה בטבלה <catalog>.<schema>.<payload_table>
. לוקח בערך 10 דקות עד שתוכל לראות את הנתונים בטבלה.
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
אתה אמור לראות משהו כזה בטבלת המטען שלך
כדי להבין את הסכימה של טבלת ההסקות הזו, בדוק את "טבלת ההסקות של קטלוג אחדות ==" כאן .==
ניטור מודלים ונתונים של נושא מורכב שדורש הרבה זמן לשלוט בו. Databricks Lakehouse Monitoring (DLM) מפחית את התקורה של בניית מערכת ניטור נכונה על ידי אספקת תבניות סטנדרטיות וניתנות להתאמה אישית למקרי שימוש נפוצים. עם זאת, שליטה ב-DLM ובניטור מודלים באופן כללי דורשת הרבה ניסויים. אני לא רוצה לתת לך סקירה נרחבת של ניטור מודלים כאן אלא לתת לך נקודת התחלה. אני עשוי להקדיש בלוג לנושא זה בעתיד.
סיכום קצר של פונקציונליות ותכונות DLM
כעת, לאחר שהמודל שלנו פועל, אנו יכולים להשתמש בטבלת ההסקות שנוצרה על ידי נקודת הקצה המשרתת שלנו כדי לנטר מדדי מפתח כגון ביצועי מודל וסחיפה כדי לזהות סטיות או חריגות בנתונים או במודל שלנו לאורך זמן. גישה פרואקטיבית זו עוזרת לנו לנקוט בפעולות מתקנות בזמן, כגון הדרכה מחדש של המודל או עדכון תכונותיו, כדי לשמור על ביצועים מיטביים והתאמה ליעדים העסקיים.
DLM מספק שלושה סוגים של ניתוח או profile type
: סדרת זמן , תמונת מצב והסקת מסקנות . מכיוון שאנו מעוניינים לנתח את טבלת ההסקות שלנו, אנו מתמקדים באחרון. כדי להשתמש בטבלה לניטור - " הטבלה הראשית " שלנו, עלינו לוודא שלטבלה יש את המבנה הנכון. עבור טבלת ההסקות , כל שורה צריכה להתאים לבקשות עם העמודות הבאות:
תכונות הדגם
חיזוי מודל
מזהה דגם
חותמת זמן : חותמת זמן של בקשת ההסקה
אמת יסוד (אופציונלי)
מזהה הדגם חשוב למקרים בהם אנו משרתים מספר דגמים ואנו רוצים לעקוב אחר הביצועים של כל דגם בלוח מחוונים ניטור אחד. אם יש יותר ממזהה מודל אחד זמין, DLM משתמש בו כדי לחתוך את הנתונים ולחשב מדדים וסטטים עבור כל פרוסה בנפרד.
DLM מחשב כל סטטיסטיקה ומדדים עבור מרווח זמן מוגדר. לניתוח מסקנות, הוא השתמש בעמודת חותמת הזמן , בתוספת גודל חלון שהוגדר על ידי המשתמש כדי לזהות את חלונות הזמן. עוד למטה.
DLM תומך בשני problem type
עבור טבלאות היסק: " סיווג " או " רגרסיה ". הוא מחשב חלק מהמדדים והסטטיסטיקה הרלוונטיים בהתבסס על מפרט זה.
כדי להשתמש ב-DLM, עלינו ליצור צג ולצרף אותו לטבלה. כאשר אנו עושים זאת DLM יוצרים שתי metric tables
:
טבלת מדדי פרופיל : טבלה זו מכילה סטטיסטיקות סיכום כגון מינימום, מקסימום, אחוז של null ואפסים. הוא מכיל גם מדדים נוספים המבוססים על סוג הבעיה שהוגדר על ידי המשתמש. לדוגמה precision , recall ו- f1_score עבור מודלים הסיווג, ו- mean_squared_error ו- mean_average_error עבור מודלים של רגרסיה.
טבלת מדדי סחיפה : היא מכילה נתונים סטטיסטיים המודדים כיצד השתנתה התפלגות הנתונים לאורך זמן או ביחס לערך בסיס (אם מסופק) . זה מחשב מדדים כמו מבחן צ'י ריבוע, מבחן KS.
כדי לראות את רשימת המדדים המלאים עבור כל טבלה, עיין בדף התיעוד של טבלת המדדים . אפשר גם ליצור מדדים מותאמים אישית .
היבט חשוב בבניית מערכת ניטור הוא לוודא שללוח המחוונים הניטור שלנו יש גישה לנתוני ההסקנות העדכניים ביותר כשהם מגיעים. לשם כך נוכל להשתמש בהזרמת טבלת Delta כדי לעקוב אחר שורות מעובדות בטבלת ההסקות. אנו משתמשים בטבלת ההסקות של המודל כטבלת המקור שלנו ( readStream
), ובטבלת הניטור כטבלת השקיעה ( writeStream
). אנו גם מוודאים ש- Change Data Capture (CDC) מופעל בשתי הטבלאות (הוא מופעל כברירת מחדל בטבלת ההסקות). כך אנו מעבדים רק שינויים - הוספה/עדכון/מחיקה - בטבלת המקור במקום לעבד מחדש את כל הטבלה בכל רענון.
כדי לאפשר את הניטור על טבלת ההסקות שלנו, אנו נוקטים את הצעדים הבאים:
ראשית עלינו להתקין את ה-API לניטור של Lakehouse. זה אמור להיות מותקן כבר אם אתה משתמש ב-Databricks rum time 15.3 LTS ומעלה:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
בואו נקרא את טבלת ההסקות כטבלת סטרימינג
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
לאחר מכן עלינו לשים את הטבלה בפורמט הנכון כמתואר לעיל. טבלה זו צריכה לכלול שורה אחת עבור כל חיזוי עם התכונות וערך החיזוי הרלוונטיים. טבלת ההסקות שאנו מקבלים מהמודל המשרת את נקודת הקצה, מאחסנת את הבקשות והתגובות של נקודות הקצה כפורמט JSON מקונן. הנה דוגמה למטען ה-JSON עבור עמודת הבקשה והתגובה.
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
כדי לפרוק את הטבלה הזו לסכימה הנכונה נוכל להשתמש בקוד הבא המותאם מתיעוד של Databricks ( מחברת ה-Inference table Lakehouse Monitoring starter ).
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
הטבלה שהתקבלה תיראה כך:
בשלב הבא עלינו לאתחל את שולחן הכיור שלנו
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
ולכתוב את התוצאות
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
לבסוף, אנו יוצרים את טבלת הבסיס שלנו. DLM משתמש בטבלה זו כדי לחשב את הסחפים על ידי השוואת התפלגות של עמודות דומות של מודלים בסיסיים ועיקריים. לטבלת הבסיס צריכה להיות אותה עמודת תכונה כמו העמודה הראשית, כמו גם אותה עמודת זיהוי דגם. לטבלת הבסיס אנו משתמשים בטבלת החיזוי של מערך האימות שלנו שאנו מאחסנים מוקדם יותר לאחר שאימנו את המודל שלנו באמצעות ההיפרפרמטר הטוב ביותר. כדי לחשב את מדד הסחיפה, Databricks מחשב את מדדי הפרופיל הן עבור טבלת ראשי והן עבור טבלת הבסיס. כאן תוכל לקרוא על הטבלה הראשית וטבלת קו הבסיס .
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
כעת אנו קוראים כדי ליצור את לוח המחוונים לניטור שלנו. אנחנו יכולים לעשות זאת באמצעות ממשק המשתמש או ה-API לניטור של Lakehouse. כאן אנו משתמשים באפשרות השנייה:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
לאחר שנפעיל את הקוד, לוקח זמן עד ש-Databricks מחשב את כל המדד. כדי לראות את לוח המחוונים עבור ללשונית Quality
של טבלת הכיורים שלך (כלומר unpacked_requests_table_name
). אתה אמור לראות דף כדלקמן.
אם תלחץ על refresh history
של הצג, תראה את הריענונים הרצים, הממתינים והעבר שלך. לחץ על View Dashboard
כדי לפתוח את לוח המחוונים שלך.
אז אנחנו מתחילים עם טבלת ההסקות ( my_endpoint_payload
), מעבדים אותה ושומרים את התוצאה ב- my_endpoint_payload_unpacked
ומעבירים את הטבלה הזו יחד עם טבלת הבסיס שלנו ( base_table_als
) ל-API לניטור שלנו. ה-DLM מחשב את מדדי הפרופיל עבור כל טבלה ( my_endpoint_payload_unpacked_profile_metric
) והשתמש בהם כדי לחשב את מדדי הסחיפה ( my_endpoint_payload_unpacked_drift_metrics
)
הנה לך! יש לך את כל מה שאתה צריך כדי לשרת ולפקח על הדגם שלך!
בחלק הבא אני אראה לך כיצד להפוך תהליך זה לאוטומטי באמצעות Databrick Assets Bundle ו- Gitlab !
בחלק הראשון של סדרת הדרכה זו , נקטנו את הצעדים הראשונים לבניית צינור MLOps מקצה לקצה באמצעות Databricks ו-Spark, בהנחיית ארכיטקטורת ההתייחסות של Databricks. להלן תקציר של השלבים העיקריים שסקרנו:
הגדרת קטלוג האחדות לארכיטקטורת מדליונים : ארגנו את הנתונים שלנו לשכבות ברונזה, כסף וזהב בתוך קטלוג האחדות, ויצרנו מערכת מובנית ויעילה לניהול נתונים.
הכנסת נתונים לקטלוג Unity : הדגמנו כיצד לייבא נתונים גולמיים למערכת, תוך הבטחת עקביות ואיכות לשלבי העיבוד הבאים.
הכשרת המודל : באמצעות Databricks, הכשרנו מודל למידת מכונה המותאם למערך הנתונים שלנו, בעקבות שיטות עבודה מומלצות לפיתוח מודל מדרגי ואפקטיבי.
כוונון היפרפרמטרים עם HyperOpt : כדי לשפר את ביצועי המודל, השתמשנו ב-HyperOpt כדי להפוך את החיפוש אחר פרמטרים אופטימליים לאוטומטיים, תוך שיפור הדיוק והיעילות.
מעקב אחר ניסוי עם Databricks MLflow : השתמשנו ב-MLflow לרישום ולנטר את הניסויים שלנו, תוך שמירה על תיעוד מקיף של גרסאות מודל, מדדים ופרמטרים להשוואה ושחזור קלות.
עם השלמת השלבים הבסיסיים האלה, המודל שלך מוכן כעת לפריסה. בחלק השני הזה, נתמקד בשילוב שני רכיבים קריטיים במערכת שלנו:
בואו ניכנס לזה!
נקודת המוצא של הבלוג האחרון הייתה הערכת מודלים. כעת דמיינו שעשינו את ההשוואה וגילינו שהדגם שלנו מציג ביצועים גבוהים יותר בהשוואה לדגם הייצור הזה. מכיוון שאנו רוצים (מניחים) להשתמש במודל בייצור, אנו רוצים לנצל את כל הנתונים שיש לנו. השלב הבא הוא לאמן ולבדוק את המודל באמצעות מערך הנתונים המלא. לאחר מכן המשיך את המודל שלנו לשימוש מאוחר יותר על ידי פריסתו כדגם האלוף שלנו. מכיוון שזהו המודל הסופי שאנו רוצים להשתמש בו להסקת מסקנות, אנו משתמשים בלקוח Feature Engineering כדי לאמן את המודל. בדרך זו אנו לא רק עוקבים בקלות אחר שושלת המודל, אלא גם מורידים ללקוח את אימות הסכימה ושינוי התכונות (אם יש).
with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse)
אנו יכולים גם להשתמש ב- Feature Store או ב-API של Feature Engineering כדי להכשיר ולרשום את המודלים
model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" )
כאשר אנו משתמשים ב-API להנדסת תכונות אנו יכולים לראות את השושלת של המודל ב-Catalog Explorer
כעת אפשר לעדכן את תיאור הדגם ולהקצות לו תווית Champion.
import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version)
עכשיו קדימה ובדוק את הסכימה שרשמת את המודל. אתה אמור לראות את כל העדכונים שלך כדלקמן
שלבי מודל : אם אתה משתמש בסביבת עבודה עבור רישום מודלים, עליך לבצע שלבים לניהול המודלים שלך. שימוש בכינויים לא יעבוד. בדוק כאן כדי לראות איך זה עובד
עכשיו תארו לעצמכם שאנחנו רוצים להשתמש במודל שלנו בייצור כדי להסיק. בשלב זה אנו טוענים את מודל האלוף ומשתמשים בו כדי להפיק 20 המלצות לסרטים עבור כל משתמש.
from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input)
ואתה יכול לראות שהשתמשנו באותם נתוני אימון לניקוד אצווה. למרות שבמקרה של מערכות ממליצים זה הגיוני, ברוב האפליקציות אנחנו רוצים להשתמש במודל כדי לצבור כמה נתונים בלתי נראים. לדוגמה, הדמיה שלך היא נטפליקס ורוצה לעדכן את המלצות המשתמש בסופו של יום בהתבסס על רשימת הנצפים החדשה שלהם. אנחנו יכולים לתזמן עבודה שמפעילה את ניקוד האצווה בזמן ספציפי בסוף היום.
כעת נוכל להמשיך ולהפיק את ההמלצות עבור כל משתמש. לשם כך אנו מוצאים את 20 הפריטים המובילים לכל משתמש
from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids"))
כך נראית התוצאה
לבסוף נוכל לאחסן את החיזוי כתווית דלתא ב-UC שלנו או לפרסם אותם ב-Mongo DB או Azure Cosmos DB במערכת במורד הזרם. אנחנו הולכים עם האופציה הראשונה
df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations")
כעת דמיינו מקרה שבו אנו רוצים לעדכן את ההמלצות שלנו על סמך אינטראקציות משתמש בזמן אמת. למקרה זה נוכל להשתמש בהגשה מודל. כאשר מישהו רוצה להשתמש במודל שלך, הוא יכול לשלוח נתונים לשרת. לאחר מכן השרת מזין את הנתונים הללו למודל הפרוס שלך, אשר נכנס לפעולה, מנתח את הנתונים ויוצר חיזוי. הם יכולים לשמש ביישומי אינטרנט, אפליקציות לנייד, או אפילו מערכות משובצות. אחד היישומים של גישה זו הוא לאפשר ניתוב תעבורה לבדיקת A/B.
לא ניתן להשתמש באלגוריתם ALS ישירות להסקת מסקנות מקוונות מכיוון שהוא מצריך אימון מחדש של המודל תוך שימוש בכל הנתונים (ישן + חדש) כדי לעדכן את ההמלצות. אלגוריתמי למידה של Gradient Descent הם דוגמאות למודל שניתן להשתמש בהם עבור עדכונים מקוונים. אנו עשויים להסתכל על כמה מאלגוריתמים אלה בפוסט הבא.
עם זאת, רק כדי להמחיש כיצד מודל כזה יעבוד, אנו יוצרים מודל (חסר תועלת) המשרת נקודת קצה אשר מנבא דירוג סרטים על סמך כל פעם שמשתמש מדרג סרט!
import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers )
זה ייצור עבורנו אשכול הגשה של דגמי ארוחת צהריים כך שזה לוקח קצת זמן. כעת, אם אתה פותח את חלון Serving
, אתה אמור לראות את נקודת הקצה שלך.
אנחנו יכולים להשתמש בנקודת קצה אחת כדי לשרת מודלים מרובים. לאחר מכן נוכל להשתמש בניתוב תעבורה עבור תרחישים כמו בדיקות A/B או להשוות את הביצועים של מודלים שונים בייצור.
טבלאות מסקנות ב-Databricks Model Serving פועלות כיומן אוטומטי עבור המודלים הפרוסים שלנו. כשהם מופעלים, הם לוכדים בקשות נכנסות (נתונים שנשלחו לחיזוי), את פלטי המודל התואמים (חיזויים), וכמה מטא נתונים אחרים כטבלת Delta בתוך Unity Catalog. אנו יכולים להשתמש בטבלת מסקנות לניטור וניפוי באגים , מעקב שושלת ונוהל איסוף נתונים להכשרה מחדש או כוונון עדין של המודלים שלנו.
אנו יכולים לאפשר את inference table
בנקודת הקצה המשרתת שלנו כדי לנטר את המודל. אנו יכולים לעשות זאת על ידי ציון המאפיינים auto_capture_config
במטען כאשר אנו יוצרים לראשונה את נקודת הקצה. לחלופין, אנו מעדכנים את נקודת הקצה שלנו לאחר מכן באמצעות הפקודה put
וכתובת ה-URL של נקודת הקצה config
כדלקמן (עוד כאן )
data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4))
עכשיו בואו נזין את נקודת הקצה עם כמה נתוני אינטראקציה של משתמש דמה
import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8))
אנו יכולים לבדוק את יומני נקודות הקצה בטבלה <catalog>.<schema>.<payload_table>
. לוקח בערך 10 דקות עד שתוכל לראות את הנתונים בטבלה.
table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table )
אתה אמור לראות משהו כזה בטבלת המטען שלך
כדי להבין את הסכימה של טבלת ההסקות הזו, בדוק את "טבלת ההסקות של קטלוג אחדות ==" כאן .==
ניטור מודלים ונתונים של נושא מורכב שדורש הרבה זמן לשלוט בו. Databricks Lakehouse Monitoring (DLM) מפחית את התקורה של בניית מערכת ניטור נכונה על ידי אספקת תבניות סטנדרטיות וניתנות להתאמה אישית למקרי שימוש נפוצים. עם זאת, שליטה ב-DLM ובניטור מודלים באופן כללי דורשת הרבה ניסויים. אני לא רוצה לתת לך סקירה נרחבת של ניטור מודלים כאן אלא לתת לך נקודת התחלה. אני עשוי להקדיש בלוג לנושא זה בעתיד.
סיכום קצר של פונקציונליות ותכונות DLM
כעת, לאחר שהמודל שלנו פועל, אנו יכולים להשתמש בטבלת ההסקות שנוצרה על ידי נקודת הקצה המשרתת שלנו כדי לנטר מדדי מפתח כגון ביצועי מודל וסחיפה כדי לזהות סטיות או חריגות בנתונים או במודל שלנו לאורך זמן. גישה פרואקטיבית זו עוזרת לנו לנקוט בפעולות מתקנות בזמן, כגון הדרכה מחדש של המודל או עדכון תכונותיו, כדי לשמור על ביצועים מיטביים והתאמה ליעדים העסקיים.
DLM מספק שלושה סוגים של ניתוח או profile type
: סדרת זמן , תמונת מצב והסקת מסקנות . מכיוון שאנו מעוניינים לנתח את טבלת ההסקות שלנו, אנו מתמקדים באחרון. כדי להשתמש בטבלה לניטור - " הטבלה הראשית " שלנו, עלינו לוודא שלטבלה יש את המבנה הנכון. עבור טבלת ההסקות , כל שורה צריכה להתאים לבקשות עם העמודות הבאות:
תכונות הדגם
חיזוי מודל
מזהה דגם
חותמת זמן : חותמת זמן של בקשת ההסקה
אמת יסוד (אופציונלי)
מזהה הדגם חשוב למקרים בהם אנו משרתים מספר דגמים ואנו רוצים לעקוב אחר הביצועים של כל דגם בלוח מחוונים ניטור אחד. אם יש יותר ממזהה מודל אחד זמין, DLM משתמש בו כדי לחתוך את הנתונים ולחשב מדדים וסטטים עבור כל פרוסה בנפרד.
DLM מחשב כל סטטיסטיקה ומדדים עבור מרווח זמן מוגדר. לניתוח מסקנות, הוא השתמש בעמודת חותמת הזמן , בתוספת גודל חלון שהוגדר על ידי המשתמש כדי לזהות את חלונות הזמן. עוד למטה.
DLM תומך בשני problem type
עבור טבלאות היסק: " סיווג " או " רגרסיה ". הוא מחשב חלק מהמדדים והסטטיסטיקה הרלוונטיים בהתבסס על מפרט זה.
כדי להשתמש ב-DLM, עלינו ליצור צג ולצרף אותו לטבלה. כאשר אנו עושים זאת DLM יוצרים שתי metric tables
:
טבלת מדדי פרופיל : טבלה זו מכילה סטטיסטיקות סיכום כגון מינימום, מקסימום, אחוז של null ואפסים. הוא מכיל גם מדדים נוספים המבוססים על סוג הבעיה שהוגדר על ידי המשתמש. לדוגמה precision , recall ו- f1_score עבור מודלים הסיווג, ו- mean_squared_error ו- mean_average_error עבור מודלים של רגרסיה.
טבלת מדדי סחיפה : היא מכילה נתונים סטטיסטיים המודדים כיצד השתנתה התפלגות הנתונים לאורך זמן או ביחס לערך בסיס (אם מסופק) . זה מחשב מדדים כמו מבחן צ'י ריבוע, מבחן KS.
כדי לראות את רשימת המדדים המלאים עבור כל טבלה, עיין בדף התיעוד של טבלת המדדים . אפשר גם ליצור מדדים מותאמים אישית .
היבט חשוב בבניית מערכת ניטור הוא לוודא שללוח המחוונים הניטור שלנו יש גישה לנתוני ההסקנות העדכניים ביותר כשהם מגיעים. לשם כך נוכל להשתמש בהזרמת טבלת Delta כדי לעקוב אחר שורות מעובדות בטבלת ההסקות. אנו משתמשים בטבלת ההסקות של המודל כטבלת המקור שלנו ( readStream
), ובטבלת הניטור כטבלת השקיעה ( writeStream
). אנו גם מוודאים ש- Change Data Capture (CDC) מופעל בשתי הטבלאות (הוא מופעל כברירת מחדל בטבלת ההסקות). כך אנו מעבדים רק שינויים - הוספה/עדכון/מחיקה - בטבלת המקור במקום לעבד מחדש את כל הטבלה בכל רענון.
כדי לאפשר את הניטור מעל טבלת ההסקות שלנו, אנו נוקטים את הצעדים הבאים:
ראשית עלינו להתקין את ה-API לניטור של Lakehouse. זה אמור להיות מותקן כבר אם אתה משתמש ב-Databricks rum time 15.3 LTS ומעלה:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython()
בואו נקרא את טבלת ההסקות כטבלת סטרימינג
requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True
לאחר מכן עלינו לשים את הטבלה בפורמט הנכון כפי שתואר לעיל. טבלה זו צריכה לכלול שורה אחת עבור כל חיזוי עם התכונות וערך החיזוי הרלוונטיים. טבלת ההסקות שאנו מקבלים מהמודל המשרת את נקודת הקצה, מאחסנת את הבקשות והתגובות של נקודות הקצה כפורמט JSON מקונן. הנה דוגמה למטען ה-JSON עבור עמודת הבקשה והתגובה.
#requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 |
כדי לפרוק את הטבלה הזו לסכימה הנכונה נוכל להשתמש בקוד הבא המותאם מתיעוד של Databricks ( מחברת ה-Inference table Lakehouse Monitoring starter ).
# define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned
הטבלה שהתקבלה תיראה כך:
בשלב הבא עלינו לאתחל את שולחן הכיור שלנו
dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute()
ולכתוב את התוצאות
checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \
לבסוף, אנו יוצרים את טבלת הבסיס שלנו. DLM משתמש בטבלה זו כדי לחשב את הסחפים על ידי השוואת התפלגות של עמודות דומות של מודלים בסיסיים ועיקריים. לטבלת הבסיס צריכה להיות אותה עמודת תכונה כמו העמודה הראשית, כמו גם אותה עמודת זיהוי דגם. לטבלת הבסיס אנו משתמשים בטבלת החיזוי של מערך האימות שלנו שאנו מאחסנים מוקדם יותר לאחר שאימנו את המודל שלנו באמצעות ההיפרפרמטר הטוב ביותר. כדי לחשב את מדד הסחיפה, Databricks מחשב את מדדי הפרופיל הן עבור טבלת היסוד והן עבור טבלת היסוד. כאן תוכל לקרוא על הטבלה הראשית וטבלת קו הבסיס .
#read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
כעת אנו קוראים כדי ליצור את לוח המחוונים לניטור שלנו. אנחנו יכולים לעשות זאת באמצעות ממשק המשתמש או ה-API לניטור של Lakehouse. כאן אנו משתמשים באפשרות השנייה:
# This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info)
לאחר שנפעיל את הקוד, לוקח קצת זמן עד ש-Databricks מחשב את כל המדד. כדי לראות את לוח המחוונים עבור ללשונית Quality
של טבלת הכיורים שלך (כלומר unpacked_requests_table_name
). אתה אמור לראות דף כדלקמן.
אם תלחץ על refresh history
של הצג, תראה את הריענונים הרצים, הממתינים והעבר שלך. לחץ על View Dashboard
כדי לפתוח את לוח המחוונים שלך.
אז אנחנו מתחילים עם טבלת ההסקות ( my_endpoint_payload
), מעבדים אותה ושומרים את התוצאה ב- my_endpoint_payload_unpacked
ומעבירים את הטבלה הזו יחד עם טבלת הבסיס שלנו ( base_table_als
) ל-API לניטור שלנו. ה-DLM מחשב את מדדי הפרופיל עבור כל טבלה ( my_endpoint_payload_unpacked_profile_metric
) והשתמש בהם כדי לחשב את מדדי הסחיפה ( my_endpoint_payload_unpacked_drift_metrics
)
הנה לך! יש לך את כל מה שאתה צריך כדי לשרת ולפקח על הדגם שלך!
בחלק הבא אני אראה לך כיצד להפוך תהליך זה לאוטומטי באמצעות Databrick Assets Bundle ו- Gitlab !