בחלק , נקטנו את הצעדים הראשונים לבניית צינור MLOps מקצה לקצה באמצעות Databricks ו-Spark, בהנחיית ארכיטקטורת ההתייחסות של Databricks. להלן תקציר של השלבים העיקריים שסיקרנו: הראשון של סדרת הדרכה זו : ארגנו את הנתונים שלנו לשכבות ברונזה, כסף וזהב בתוך קטלוג האחדות, ויצרנו מערכת מובנית ויעילה לניהול נתונים. הגדרת קטלוג האחדות לארכיטקטורת מדליונים : הדגמנו כיצד לייבא נתונים גולמיים למערכת, תוך הבטחת עקביות ואיכות לשלבי העיבוד הבאים. הכנסת נתונים לקטלוג Unity : באמצעות Databricks, הכשרנו מודל למידת מכונה המותאם למערך הנתונים שלנו, בעקבות שיטות עבודה מומלצות לפיתוח מודל מדרגי ואפקטיבי. הכשרת המודל : כדי לשפר את ביצועי המודל, השתמשנו ב-HyperOpt כדי להפוך את החיפוש אחר פרמטרים אופטימליים לאוטומטיים, תוך שיפור הדיוק והיעילות. כוונון היפרפרמטרים עם HyperOpt : השתמשנו ב-MLflow לרישום ולנטר את הניסויים שלנו, תוך שמירה על תיעוד מקיף של גרסאות מודל, מדדים ופרמטרים להשוואה ושחזור קלות. מעקב אחר ניסוי עם Databricks MLflow עם השלמת השלבים הבסיסיים האלה, המודל שלך מוכן כעת לפריסה. בחלק השני הזה, נתמקד בשילוב שני רכיבים קריטיים במערכת שלנו: : הטמעת עיבוד אצווה ליצירת תחזיות על מערכי נתונים גדולים, המתאימים ליישומים כמו ניקוד בכמות גדולה ודיווח תקופתי. מסקנות אצווה : הגדרת שירות מודל בזמן אמת כדי לספק תחזיות מיידיות, חיוניות ליישומים ושירותים אינטראקטיביים. הסקה מקוונת (הגשה של מודל) כדי להבטיח שהמודלים הפרוסים שלך ישמרו על ביצועים ואמינות מיטביים לאורך זמן. ניטור מודלים: בואו ניכנס לזה! פריסת מודל נקודת המוצא של הבלוג האחרון הייתה הערכת מודלים. כעת דמיינו שעשינו את ההשוואה וגילינו שהדגם שלנו מציג ביצועים גבוהים יותר בהשוואה לדגם הייצור הזה. מכיוון שאנו רוצים (מניחים) להשתמש במודל בייצור, אנו רוצים לנצל את כל הנתונים שיש לנו. השלב הבא הוא לאמן ולבדוק את המודל באמצעות מערך הנתונים המלא. לאחר מכן המשיך את המודל שלנו לשימוש מאוחר יותר על ידי פריסתו כדגם האלוף שלנו. מכיוון שזהו המודל הסופי שאנו רוצים להשתמש בו להסקת מסקנות, אנו משתמשים בלקוח Feature Engineering כדי לאמן את המודל. בדרך זו אנו לא רק עוקבים בקלות אחר שושלת המודל, אלא גם מורידים ללקוח את אימות הסכימה ושינוי התכונות (אם יש). with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) אנו יכולים גם להשתמש ב- כדי להכשיר ולרשום את המודלים Feature Store או ב-API של Feature Engineering model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) כאשר אנו משתמשים ב-API להנדסת תכונות אנו יכולים לראות את השושלת של המודל ב-Catalog Explorer כעת אפשר לעדכן את תיאור הדגם ולהקצות לו תווית Champion. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) עכשיו קדימה ובדוק את הסכימה שרשמת את המודל. אתה אמור לראות את כל העדכונים שלך כדלקמן : אם אתה משתמש בסביבת עבודה עבור רישום מודלים, עליך לבצע שלבים לניהול המודלים שלך. שימוש בכינויים לא יעבוד. בדוק כדי לראות איך זה עובד שלבי מודל כאן מסקנות דגם ניקוד אצווה עכשיו תארו לעצמכם שאנחנו רוצים להשתמש במודל שלנו בייצור כדי להסיק. בשלב זה אנו טוענים את מודל האלוף ומשתמשים בו כדי להפיק 20 המלצות לסרטים עבור כל משתמש. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) ואתה יכול לראות שהשתמשנו באותם נתוני אימון לניקוד אצווה. למרות שבמקרה של מערכות ממליצים זה הגיוני, ברוב האפליקציות אנחנו רוצים להשתמש במודל כדי לצבור כמה נתונים בלתי נראים. לדוגמה, הדמיה שלך היא נטפליקס ורוצה לעדכן את המלצות המשתמש בסופו של יום בהתבסס על רשימת הנצפים החדשה שלהם. אנחנו יכולים לתזמן עבודה שמפעילה את ניקוד האצווה בזמן ספציפי בסוף היום. כעת נוכל להמשיך ולהפיק את ההמלצות עבור כל משתמש. לשם כך אנו מוצאים את 20 הפריטים המובילים לכל משתמש from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) כך נראית התוצאה לבסוף נוכל לאחסן את החיזוי כתווית דלתא ב-UC שלנו או לפרסם אותם ב-Mongo DB או Azure Cosmos DB במערכות מטה. אנחנו הולכים עם האופציה הראשונה df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") סטרימינג/הסקה מקוונת כעת דמיינו מקרה שבו אנו רוצים לעדכן את ההמלצות שלנו על סמך אינטראקציות משתמש בזמן אמת. למקרה זה נוכל להשתמש בהגשה מודל. כאשר מישהו רוצה להשתמש במודל שלך, הוא יכול לשלוח נתונים לשרת. לאחר מכן השרת מזין את הנתונים הללו למודל הפרוס שלך, אשר נכנס לפעולה, מנתח את הנתונים ויוצר חיזוי. ניתן להשתמש בהם ביישומי אינטרנט, באפליקציות לנייד, או אפילו במערכות משובצות. אחד היישומים של גישה זו הוא לאפשר ניתוב תעבורה לבדיקת A/B. לא ניתן להשתמש באלגוריתם ALS ישירות להסקת מסקנות מקוונות מכיוון שהוא מצריך אימון מחדש של המודל תוך שימוש בכל הנתונים (ישן + חדש) כדי לעדכן את ההמלצות. אלגוריתמי למידה של ירידה בדרגה הם דוגמאות למודל שניתן להשתמש בהם עבור עדכונים מקוונים. אנו עשויים להסתכל על כמה מאלגוריתמים אלה בפוסט הבא. עם זאת, רק כדי להמחיש כיצד מודל כזה יעבוד, אנו יוצרים מודל (חסר תועלת) המשרת נקודת קצה אשר מנבא דירוג סרטים על סמך כל פעם שמשתמש מדרג סרט! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) זה ייצור ואשכול דגמי הגשה לארוחת צהריים עבורנו כך שזה לוקח קצת זמן. כעת, אם אתה פותח את חלון , אתה אמור לראות את נקודת הקצה שלך. Serving אנחנו יכולים להשתמש בנקודת קצה אחת כדי לשרת מודלים מרובים. לאחר מכן נוכל להשתמש בניתוב תעבורה עבור תרחישים כמו בדיקות A/B או להשוות את הביצועים של מודלים שונים בייצור. טבלת הסקות טבלאות מסקנות ב-Databricks Model Serving פועלות כיומן אוטומטי עבור המודלים הפרוסים שלנו. כשהם מופעלים, הם לוכדים בקשות נכנסות (נתונים שנשלחו לחיזוי), את פלטי המודל התואמים (חיזויים), וכמה מטא נתונים אחרים כטבלת Delta בתוך Unity Catalog. אנו יכולים להשתמש בטבלת מסקנות , ונוהל איסוף נתונים או המודלים שלנו. לניטור וניפוי באגים מעקב שושלת להכשרה מחדש כוונון עדין של אנו יכולים לאפשר את בנקודת הקצה המשרתת שלנו כדי לנטר את המודל. אנו יכולים לעשות זאת על ידי ציון המאפיינים במטען כאשר אנו יוצרים לראשונה את נקודת הקצה. לחלופין, אנו מעדכנים את נקודת הקצה שלנו לאחר מכן באמצעות הפקודה וכתובת ה-URL של נקודת הקצה כדלקמן (עוד inference table auto_capture_config put config ) כאן data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) עכשיו בואו נזין את נקודת הקצה עם כמה נתוני אינטראקציה של משתמש דמה import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) אנו יכולים לבדוק את יומני נקודות הקצה בטבלה . לוקח בערך 10 דקות עד שתוכל לראות את הנתונים בטבלה. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) אתה אמור לראות משהו כזה בטבלת המטען שלך כדי להבין את הסכימה של טבלת ההסקות הזו, בדוק את "טבלת ההסקות של קטלוג אחדות ==" .== כאן ניטור מודל ניטור מודלים ונתונים של נושא מורכב שדורש הרבה זמן לשלוט בו. Databricks Lakehouse Monitoring (DLM) מפחית את התקורה של בניית מערכת ניטור נכונה על ידי אספקת תבניות סטנדרטיות וניתנות להתאמה אישית למקרי שימוש נפוצים. עם זאת, שליטה ב-DLM ובניטור מודלים באופן כללי דורשת הרבה ניסויים. אני לא רוצה לתת לך סקירה נרחבת של ניטור מודלים כאן אלא לתת לך נקודת התחלה. אני עשוי להקדיש בלוג לנושא זה בעתיד. סיכום קצר של פונקציונליות ותכונות DLM כעת, לאחר שהמודל שלנו פועל, אנו יכולים להשתמש בטבלת ההסקות שנוצרה על ידי נקודת הקצה המשרתת שלנו כדי לנטר מדדי מפתח כגון ביצועי מודל וסחיפה כדי לזהות סטיות או חריגות בנתונים או במודל שלנו לאורך זמן. גישה פרואקטיבית זו עוזרת לנו לנקוט בפעולות מתקנות בזמן, כגון הדרכה מחדש של המודל או עדכון תכונותיו, כדי לשמור על ביצועים מיטביים והתאמה ליעדים העסקיים. DLM מספק שלושה סוגים של ניתוח או : , והסקת . מכיוון שאנו מעוניינים לנתח את טבלת ההסקות שלנו, אנו מתמקדים באחרון. כדי להשתמש בטבלה לניטור - " " שלנו, עלינו לוודא שלטבלה יש את המבנה הנכון. עבור כל שורה צריכה להתאים לבקשות עם העמודות הבאות: profile type סדרת זמן תמונת מצב מסקנות הטבלה הראשית , טבלת ההסקות תכונות הדגם חיזוי מודל מזהה דגם : חותמת זמן של בקשת ההסקה חותמת זמן (אופציונלי) אמת יסוד חשוב למקרים בהם אנו משרתים מספר דגמים ואנו רוצים לעקוב אחר הביצועים של כל דגם בלוח מחוונים ניטור אחד. אם יש יותר ממזהה מודל אחד זמין, DLM משתמש בו כדי לחתוך את הנתונים ולחשב מדדים וסטטים עבור כל פרוסה בנפרד. מזהה הדגם DLM מחשב כל סטטיסטיקה ומדדים עבור מרווח זמן מוגדר. לניתוח מסקנות, הוא השתמש בעמודת , בתוספת גודל חלון שהוגדר על ידי המשתמש כדי לזהות את חלונות הזמן. עוד למטה. חותמת הזמן DLM תומך בשני עבור טבלאות היסק: " " או " ". הוא מחשב חלק מהמדדים והסטטיסטיקה הרלוונטיים בהתבסס על מפרט זה. problem type סיווג רגרסיה כדי להשתמש ב-DLM, עלינו ליצור צג ולצרף אותו לטבלה. כאשר אנו עושים זאת DLM יוצרים שתי : metric tables : טבלה זו מכילה סטטיסטיקות סיכום כגון מינימום, מקסימום, אחוז של null ואפסים. הוא מכיל גם מדדים נוספים המבוססים על סוג הבעיה שהוגדר על ידי המשתמש. לדוגמה , ו- עבור מודלים הסיווג, ו- ו- עבור מודלים של רגרסיה. טבלת מדדי פרופיל precision recall f1_score mean_squared_error mean_average_error : היא מכילה נתונים סטטיסטיים המודדים כיצד השתנתה התפלגות הנתונים או ביחס . זה מחשב מדדים כמו מבחן צ'י ריבוע, מבחן KS. טבלת מדדי סחיפה לאורך זמן לערך בסיס (אם מסופק) כדי לראות את רשימת המדדים המלאים עבור כל טבלה, עיין בדף התיעוד . אפשר גם ליצור של טבלת המדדים . מדדים מותאמים אישית היבט חשוב בבניית מערכת ניטור הוא לוודא שללוח המחוונים הניטור שלנו יש גישה לנתוני ההסקנות העדכניים ביותר כשהם מגיעים. לשם כך נוכל להשתמש כדי לעקוב אחר שורות מעובדות בטבלת ההסקות. אנו משתמשים בטבלת ההסקות של המודל כטבלת המקור שלנו ( ), ובטבלת הניטור כטבלת השקיעה ( ). אנו גם מוודאים ש- (CDC) מופעל בשתי הטבלאות (הוא מופעל כברירת מחדל בטבלת ההסקות). כך אנו מעבדים רק שינויים - הוספה/עדכון/מחיקה - בטבלת המקור במקום לעבד מחדש את כל הטבלה בכל רענון. בהזרמת טבלת Delta readStream writeStream Change Data Capture ידיים כדי לאפשר את הניטור על טבלת ההסקות שלנו, אנו נוקטים את הצעדים הבאים: קרא את טבלת ההסקות כטבלת סטרימינג צור טבלת דלתא חדשה עם הסכימה הנכונה על ידי פירוק טבלת ההסקות שנוצרת על ידי נקודת הקצה לשרת המודל שלנו. הכן את טבלת הבסיס (אם יש) צור צג מעל הטבלה המתקבלת ורענן את המדד תזמן זרימת עבודה כדי לפרוק את טבלת ההסקות למבנה הנכון ולרענן את המדדים ראשית עלינו להתקין את ה-API לניטור של Lakehouse. זה אמור להיות מותקן כבר אם אתה משתמש ב-Databricks rum time 15.3 LTS ומעלה: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() בואו נקרא את טבלת ההסקות כטבלת סטרימינג requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True לאחר מכן עלינו לשים את הטבלה בפורמט הנכון כמתואר לעיל. טבלה זו צריכה לכלול שורה אחת עבור כל חיזוי עם התכונות וערך החיזוי הרלוונטיים. טבלת ההסקות שאנו מקבלים מהמודל המשרת את נקודת הקצה, מאחסנת את הבקשות והתגובות של נקודות הקצה כפורמט JSON מקונן. הנה דוגמה למטען ה-JSON עבור עמודת הבקשה והתגובה. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | כדי לפרוק את הטבלה הזו לסכימה הנכונה נוכל להשתמש בקוד הבא המותאם מתיעוד של Databricks ( ). מחברת ה-Inference table Lakehouse Monitoring starter # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned הטבלה שהתקבלה תיראה כך: בשלב הבא עלינו לאתחל את שולחן הכיור שלנו dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() ולכתוב את התוצאות checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ לבסוף, אנו יוצרים את טבלת הבסיס שלנו. DLM משתמש בטבלה זו כדי לחשב את הסחפים על ידי השוואת התפלגות של עמודות דומות של מודלים בסיסיים ועיקריים. לטבלת הבסיס צריכה להיות אותה עמודת תכונה כמו העמודה הראשית, כמו גם אותה עמודת זיהוי דגם. לטבלת הבסיס אנו משתמשים בטבלת החיזוי של שלנו שאנו מאחסנים מוקדם יותר לאחר שאימנו את המודל שלנו באמצעות ההיפרפרמטר הטוב ביותר. כדי לחשב את מדד הסחיפה, Databricks מחשב את מדדי הפרופיל הן עבור טבלת ראשי והן עבור טבלת הבסיס. כאן תוכל לקרוא על מערך האימות . הטבלה הראשית וטבלת קו הבסיס #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") כעת אנו קוראים כדי ליצור את לוח המחוונים לניטור שלנו. אנחנו יכולים לעשות זאת באמצעות או ה-API לניטור של Lakehouse. כאן אנו משתמשים באפשרות השנייה: ממשק המשתמש # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) לאחר שנפעיל את הקוד, לוקח זמן עד ש-Databricks מחשב את כל המדד. כדי לראות את לוח המחוונים עבור ללשונית של טבלת הכיורים שלך (כלומר ). אתה אמור לראות דף כדלקמן. Quality unpacked_requests_table_name אם תלחץ על של הצג, תראה את הריענונים הרצים, הממתינים והעבר שלך. לחץ על כדי לפתוח את לוח המחוונים שלך. refresh history View Dashboard אז אנחנו מתחילים עם טבלת ההסקות ( ), מעבדים אותה ושומרים את התוצאה ב- ומעבירים את הטבלה הזו יחד עם טבלת הבסיס שלנו ( ) ל-API לניטור שלנו. ה-DLM מחשב את מדדי הפרופיל עבור כל טבלה ( ) והשתמש בהם כדי לחשב את מדדי הסחיפה ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics הנה לך! יש לך את כל מה שאתה צריך כדי לשרת ולפקח על הדגם שלך! בחלק הבא אני אראה לך כיצד להפוך תהליך זה לאוטומטי באמצעות ו- ! Databrick Assets Bundle Gitlab בחלק , נקטנו את הצעדים הראשונים לבניית צינור MLOps מקצה לקצה באמצעות Databricks ו-Spark, בהנחיית ארכיטקטורת ההתייחסות של Databricks. להלן תקציר של השלבים העיקריים שסקרנו: הראשון של סדרת הדרכה זו : ארגנו את הנתונים שלנו לשכבות ברונזה, כסף וזהב בתוך קטלוג האחדות, ויצרנו מערכת מובנית ויעילה לניהול נתונים. הגדרת קטלוג האחדות לארכיטקטורת מדליונים : הדגמנו כיצד לייבא נתונים גולמיים למערכת, תוך הבטחת עקביות ואיכות לשלבי העיבוד הבאים. הכנסת נתונים לקטלוג Unity : באמצעות Databricks, הכשרנו מודל למידת מכונה המותאם למערך הנתונים שלנו, בעקבות שיטות עבודה מומלצות לפיתוח מודל מדרגי ואפקטיבי. הכשרת המודל : כדי לשפר את ביצועי המודל, השתמשנו ב-HyperOpt כדי להפוך את החיפוש אחר פרמטרים אופטימליים לאוטומטיים, תוך שיפור הדיוק והיעילות. כוונון היפרפרמטרים עם HyperOpt : השתמשנו ב-MLflow לרישום ולנטר את הניסויים שלנו, תוך שמירה על תיעוד מקיף של גרסאות מודל, מדדים ופרמטרים להשוואה ושחזור קלות. מעקב אחר ניסוי עם Databricks MLflow עם השלמת השלבים הבסיסיים האלה, המודל שלך מוכן כעת לפריסה. בחלק השני הזה, נתמקד בשילוב שני רכיבים קריטיים במערכת שלנו: : הטמעת עיבוד אצווה ליצירת תחזיות על מערכי נתונים גדולים, המתאימים ליישומים כמו ניקוד בכמות גדולה ודיווח תקופתי. מסקנות אצווה : הגדרת שירות מודל בזמן אמת כדי לספק תחזיות מיידיות, חיוניות ליישומים ושירותים אינטראקטיביים. הסקה מקוונת (הגשה של מודל) כדי להבטיח שהמודלים הפרוסים שלך ישמרו על ביצועים ואמינות מיטביים לאורך זמן. ניטור מודלים: בואו ניכנס לזה! פריסת מודל נקודת המוצא של הבלוג האחרון הייתה הערכת מודלים. כעת דמיינו שעשינו את ההשוואה וגילינו שהדגם שלנו מציג ביצועים גבוהים יותר בהשוואה לדגם הייצור הזה. מכיוון שאנו רוצים (מניחים) להשתמש במודל בייצור, אנו רוצים לנצל את כל הנתונים שיש לנו. השלב הבא הוא לאמן ולבדוק את המודל באמצעות מערך הנתונים המלא. לאחר מכן המשיך את המודל שלנו לשימוש מאוחר יותר על ידי פריסתו כדגם האלוף שלנו. מכיוון שזהו המודל הסופי שאנו רוצים להשתמש בו להסקת מסקנות, אנו משתמשים בלקוח Feature Engineering כדי לאמן את המודל. בדרך זו אנו לא רק עוקבים בקלות אחר שושלת המודל, אלא גם מורידים ללקוח את אימות הסכימה ושינוי התכונות (אם יש). with mlflow.start_run(run_name="ALS_best_model") as run: als = ALS() # Now we set the parameters for the method als.setMaxIter(MAX_ITER)\ .setSeed(SEED)\ .setRegParam(best_params["REG_PARAM"])\ .setUserCol(COL_USER)\ .setItemCol(COL_ITEM)\ .setRatingCol(COL_LABEL)\ .setRank(best_params["RANK"]) mlflow.log_param("MAX_ITER", MAX_ITER) mlflow.log_param("RANK", best_params["RANK"]) mlflow.log_param("REG_PARAM", best_params["REG_PARAM"]) # Create the model with these parameters. model = als.fit(df_full_data) #drop predictions where users and products from the test test and didn't make it into the training set. in this case, the prediction is NaN model.setColdStartStrategy('drop') predictions = model.transform(df_full_data) signature = infer_signature(model_input = df_full_data, model_output = predictions.select(COL_LABEL)) #log the model mlflow.spark.log_model(model, model_name, sample_input=df_full_data.limit(3), signature = signature, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name=f"{catalog_name}.{model_schema}.{model_name}") evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL) rmse = evaluator.setMetricName("rmse").evaluate(predictions) mlflow.log_metric('rmse', rmse) אנו יכולים גם להשתמש ב- כדי להכשיר ולרשום את המודלים Feature Store או ב-API של Feature Engineering model_info = fe.log_model(model=model, artifact_path = model_name, flavor=mlflow.spark, training_set=fe_full_data, conda_env=mlflow.spark.get_default_conda_env(), registered_model_name= f"{catalog_name}.{model_schema}.{model_name}" ) כאשר אנו משתמשים ב-API להנדסת תכונות אנו יכולים לראות את השושלת של המודל ב-Catalog Explorer כעת אפשר לעדכן את תיאור הדגם ולהקצות לו תווית Champion. import time from mlflow.tracking.client import MlflowClient from mlflow.entities.model_registry.model_version_status import ModelVersionStatus client = MlflowClient() #find the latest model version model_name_path = f"{catalog_name}.{model_schema}.{model_name}" model_version_infos = client.search_model_versions(f"name ='{model_name_path}'") new_model_version = max([int(model_version_info.version) for model_version_info in model_version_infos]) #add the model and model version descirption client.update_registered_model( name=model_name_path, description="collaborative filtering using Spark mllib ALS. This model use rating table" ) client.update_model_version( name=model_name_path, version=new_model_version, description="this model is optimized Rank and REG_PARAM with Hyperopt and rmse as a loss function. trained on the full dataset" ) # assign alias client.set_registered_model_alias(model_name_path, "Champion", new_model_version) עכשיו קדימה ובדוק את הסכימה שרשמת את המודל. אתה אמור לראות את כל העדכונים שלך כדלקמן : אם אתה משתמש בסביבת עבודה עבור רישום מודלים, עליך לבצע שלבים לניהול המודלים שלך. שימוש בכינויים לא יעבוד. בדוק כדי לראות איך זה עובד שלבי מודל כאן מסקנות דגם ניקוד אצווה עכשיו תארו לעצמכם שאנחנו רוצים להשתמש במודל שלנו בייצור כדי להסיק. בשלב זה אנו טוענים את מודל האלוף ומשתמשים בו כדי להפיק 20 המלצות לסרטים עבור כל משתמש. from mlflow.spark import load_model as spark_load_model from mlflow.tracking.client import MlflowClient from create_training_set import split_data #-- set UC as model registray mlflow.set_registry_uri("databricks-uc") #-- initate mlflow client client = MlflowClient() # -- read the config file with open('config.json') as config_file: config = json.load(config_file) catalog_name = config["catalog_name"] gold_layer = config["gold_layer_name"] silver_layer = config["silver_layer_name"] user_item_table_name = config["user_item_table_name"] ft_user_item_name = config["ft_user_item_name"] model_name = config["model_name"] model_schema = config["model_schema"] #-- create the model uri model_path = f"{catalog_name}.{model_schema}.{model_name}" # --create the model_uri: there are two ways to do this # 1: using the alias (we use this*) model_version_uri = f"models:/{model_uri}@champion" # 2: using model version #champion_version = client.get_model_version_by_alias(model_uri, "champion") #model_version_uri = f"models:/{model_uri}/{champion_version.version}" # -- load the model pipline and exctract the model model_pipeline = spark_load_model(model_version_uri) model = model_pipeline.stages[0] # -- batch scoring using the the model fe_full_data, df_full_data, df_train, df_test = split_data() df_batch_input = df_full_data.drop("rating") df_scores = model.transform(df_batch_input) # --- in case you used Feature Engineering to train and register model #from databricks.feature_engineering import FeatureEngineeringClient #fe = FeatureEngineeringClient() # fe.score_batch(model_uri=f"{model_version_uri}",df = df_batch_input) ואתה יכול לראות שהשתמשנו באותם נתוני אימון לניקוד אצווה. למרות שבמקרה של מערכות ממליצים זה הגיוני, ברוב האפליקציות אנחנו רוצים להשתמש במודל כדי לצבור כמה נתונים בלתי נראים. לדוגמה, הדמיה שלך היא נטפליקס ורוצה לעדכן את המלצות המשתמש בסופו של יום בהתבסס על רשימת הנצפים החדשה שלהם. אנחנו יכולים לתזמן עבודה שמפעילה את ניקוד האצווה בזמן ספציפי בסוף היום. כעת נוכל להמשיך ולהפיק את ההמלצות עבור כל משתמש. לשם כך אנו מוצאים את 20 הפריטים המובילים לכל משתמש from pyspark.sql.window import Window from pyspark.sql.functions import col, split, row_number, collect_list from pyspark.sql.functions import col, collect_list, expr, lit, min, row_number, desc windowSpec = Window.partitionBy("user_id").orderBy(col("prediction").desc()) df_top_20_items = df_scores.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 20) df_user_recs = df_top_20_items.groupBy("user_id") \ .agg(collect_list(col("item_id").cast("double")).alias("top_item_ids")) כך נראית התוצאה לבסוף נוכל לאחסן את החיזוי כתווית דלתא ב-UC שלנו או לפרסם אותם ב-Mongo DB או Azure Cosmos DB במערכת במורד הזרם. אנחנו הולכים עם האופציה הראשונה df_user_recs.write.mode("overwrite").saveAsTable(f"{catalog_name}.{output_schema}.top20_item_recommendations") סטרימינג/הסקה מקוונת כעת דמיינו מקרה שבו אנו רוצים לעדכן את ההמלצות שלנו על סמך אינטראקציות משתמש בזמן אמת. למקרה זה נוכל להשתמש בהגשה מודל. כאשר מישהו רוצה להשתמש במודל שלך, הוא יכול לשלוח נתונים לשרת. לאחר מכן השרת מזין את הנתונים הללו למודל הפרוס שלך, אשר נכנס לפעולה, מנתח את הנתונים ויוצר חיזוי. הם יכולים לשמש ביישומי אינטרנט, אפליקציות לנייד, או אפילו מערכות משובצות. אחד היישומים של גישה זו הוא לאפשר ניתוב תעבורה לבדיקת A/B. לא ניתן להשתמש באלגוריתם ALS ישירות להסקת מסקנות מקוונות מכיוון שהוא מצריך אימון מחדש של המודל תוך שימוש בכל הנתונים (ישן + חדש) כדי לעדכן את ההמלצות. אלגוריתמי למידה של Gradient Descent הם דוגמאות למודל שניתן להשתמש בהם עבור עדכונים מקוונים. אנו עשויים להסתכל על כמה מאלגוריתמים אלה בפוסט הבא. עם זאת, רק כדי להמחיש כיצד מודל כזה יעבוד, אנו יוצרים מודל (חסר תועלת) המשרת נקודת קצה אשר מנבא דירוג סרטים על סמך כל פעם שמשתמש מדרג סרט! import requests model_path = f"{catalog_name}.{model_schema}.{model_name}" champion_version = client.get_model_version_by_alias(model_path, "champion") # Set the name of the MLflow endpoint endpoint_name = config["model_serving_endpoint_name"] # Name of the registered MLflow model model_name = model_path # Specify the type of compute (CPU, GPU_SMALL, GPU_MEDIUM, etc.) workload_type = "CPU" # Specify the scale-out size of compute (Small, Medium, Large, etc.) workload_size = "Small" # Get the latest version of the MLflow model model_version = int(champion_version.version) # Specify Scale to Zero(only supported for CPU endpoints) scale_to_zero = False # Get the API endpoint and token for the current notebook context API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() data = { "name": endpoint_name, "config": { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ] }, } headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.post( url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers ) זה ייצור עבורנו אשכול הגשה של דגמי ארוחת צהריים כך שזה לוקח קצת זמן. כעת, אם אתה פותח את חלון , אתה אמור לראות את נקודת הקצה שלך. Serving אנחנו יכולים להשתמש בנקודת קצה אחת כדי לשרת מודלים מרובים. לאחר מכן נוכל להשתמש בניתוב תעבורה עבור תרחישים כמו בדיקות A/B או להשוות את הביצועים של מודלים שונים בייצור. טבלת הסקות טבלאות מסקנות ב-Databricks Model Serving פועלות כיומן אוטומטי עבור המודלים הפרוסים שלנו. כשהם מופעלים, הם לוכדים בקשות נכנסות (נתונים שנשלחו לחיזוי), את פלטי המודל התואמים (חיזויים), וכמה מטא נתונים אחרים כטבלת Delta בתוך Unity Catalog. אנו יכולים להשתמש בטבלת מסקנות , ונוהל איסוף נתונים או המודלים שלנו. לניטור וניפוי באגים מעקב שושלת להכשרה מחדש כוונון עדין של אנו יכולים לאפשר את בנקודת הקצה המשרתת שלנו כדי לנטר את המודל. אנו יכולים לעשות זאת על ידי ציון המאפיינים במטען כאשר אנו יוצרים לראשונה את נקודת הקצה. לחלופין, אנו מעדכנים את נקודת הקצה שלנו לאחר מכן באמצעות הפקודה וכתובת ה-URL של נקודת הקצה כדלקמן (עוד inference table auto_capture_config put config ) כאן data = { "served_models": [ { "model_name": model_name, "model_version": int(model_version), "workload_size": workload_size, "scale_to_zero_enabled": scale_to_zero, "workload_type": workload_type, } ], "auto_capture_config":{ "catalog_name": catalog_name, "schema_name": model_schema, "table_name_prefix": payload_table, } } headers = {"Context-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"} response = requests.put(url=f"{API_ROOT}/api/2.0/serving-endpoints/{endpoint_name}/config", json=data, headers=headers) print(json.dumps(response.json(), indent=4)) עכשיו בואו נזין את נקודת הקצה עם כמה נתוני אינטראקציה של משתמש דמה import random import time all_items = df_full_data.select(col("item_id")).distinct() for user_id in range(50,54): items_not_rated_by_user = df_full_data.where(col("user_id")==user_id).select(col("item_id")).distinct()#collect()[0][0] no_rated_items = [item.item_id for item in all_items.subtract(items_not_rated_by_user).limit(4).collect()] data = { "dataframe_records": [ {"user_id":user_id, "item_id":no_rated_items[0], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[1], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, {"user_id":user_id, "item_id":no_rated_items[2], "rating": random.randint(1, 5)}, ] } response = requests.post( url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers ) # generate the data within the timespan of 1 to 8 minutes time.sleep(random.randint(60*1, 60*8)) אנו יכולים לבדוק את יומני נקודות הקצה בטבלה . לוקח בערך 10 דקות עד שתוכל לראות את הנתונים בטבלה. <catalog>.<schema>.<payload_table> table_path = f"{catalog_name}.{model_schema}.{payload_table}" # Read data from the inference table df_inf_table = spark.read.table(table_path) display(df_inf_table ) אתה אמור לראות משהו כזה בטבלת המטען שלך כדי להבין את הסכימה של טבלת ההסקות הזו, בדוק את "טבלת ההסקות של קטלוג אחדות ==" .== כאן ניטור מודל ניטור מודלים ונתונים של נושא מורכב שדורש הרבה זמן לשלוט בו. Databricks Lakehouse Monitoring (DLM) מפחית את התקורה של בניית מערכת ניטור נכונה על ידי אספקת תבניות סטנדרטיות וניתנות להתאמה אישית למקרי שימוש נפוצים. עם זאת, שליטה ב-DLM ובניטור מודלים באופן כללי דורשת הרבה ניסויים. אני לא רוצה לתת לך סקירה נרחבת של ניטור מודלים כאן אלא לתת לך נקודת התחלה. אני עשוי להקדיש בלוג לנושא זה בעתיד. סיכום קצר של פונקציונליות ותכונות DLM כעת, לאחר שהמודל שלנו פועל, אנו יכולים להשתמש בטבלת ההסקות שנוצרה על ידי נקודת הקצה המשרתת שלנו כדי לנטר מדדי מפתח כגון ביצועי מודל וסחיפה כדי לזהות סטיות או חריגות בנתונים או במודל שלנו לאורך זמן. גישה פרואקטיבית זו עוזרת לנו לנקוט בפעולות מתקנות בזמן, כגון הדרכה מחדש של המודל או עדכון תכונותיו, כדי לשמור על ביצועים מיטביים והתאמה ליעדים העסקיים. DLM מספק שלושה סוגים של ניתוח או : , והסקת . מכיוון שאנו מעוניינים לנתח את טבלת ההסקות שלנו, אנו מתמקדים באחרון. כדי להשתמש בטבלה לניטור - " " שלנו, עלינו לוודא שלטבלה יש את המבנה הנכון. עבור כל שורה צריכה להתאים לבקשות עם העמודות הבאות: profile type סדרת זמן תמונת מצב מסקנות הטבלה הראשית , טבלת ההסקות תכונות הדגם חיזוי מודל מזהה דגם : חותמת זמן של בקשת ההסקה חותמת זמן (אופציונלי) אמת יסוד חשוב למקרים בהם אנו משרתים מספר דגמים ואנו רוצים לעקוב אחר הביצועים של כל דגם בלוח מחוונים ניטור אחד. אם יש יותר ממזהה מודל אחד זמין, DLM משתמש בו כדי לחתוך את הנתונים ולחשב מדדים וסטטים עבור כל פרוסה בנפרד. מזהה הדגם DLM מחשב כל סטטיסטיקה ומדדים עבור מרווח זמן מוגדר. לניתוח מסקנות, הוא השתמש בעמודת , בתוספת גודל חלון שהוגדר על ידי המשתמש כדי לזהות את חלונות הזמן. עוד למטה. חותמת הזמן DLM תומך בשני עבור טבלאות היסק: " " או " ". הוא מחשב חלק מהמדדים והסטטיסטיקה הרלוונטיים בהתבסס על מפרט זה. problem type סיווג רגרסיה כדי להשתמש ב-DLM, עלינו ליצור צג ולצרף אותו לטבלה. כאשר אנו עושים זאת DLM יוצרים שתי : metric tables : טבלה זו מכילה סטטיסטיקות סיכום כגון מינימום, מקסימום, אחוז של null ואפסים. הוא מכיל גם מדדים נוספים המבוססים על סוג הבעיה שהוגדר על ידי המשתמש. לדוגמה , ו- עבור מודלים הסיווג, ו- ו- עבור מודלים של רגרסיה. טבלת מדדי פרופיל precision recall f1_score mean_squared_error mean_average_error : היא מכילה נתונים סטטיסטיים המודדים כיצד השתנתה התפלגות הנתונים או ביחס . זה מחשב מדדים כמו מבחן צ'י ריבוע, מבחן KS. טבלת מדדי סחיפה לאורך זמן לערך בסיס (אם מסופק) כדי לראות את רשימת המדדים המלאים עבור כל טבלה, עיין בדף התיעוד . אפשר גם ליצור של טבלת המדדים . מדדים מותאמים אישית היבט חשוב בבניית מערכת ניטור הוא לוודא שללוח המחוונים הניטור שלנו יש גישה לנתוני ההסקנות העדכניים ביותר כשהם מגיעים. לשם כך נוכל להשתמש כדי לעקוב אחר שורות מעובדות בטבלת ההסקות. אנו משתמשים בטבלת ההסקות של המודל כטבלת המקור שלנו ( ), ובטבלת הניטור כטבלת השקיעה ( ). אנו גם מוודאים ש- (CDC) מופעל בשתי הטבלאות (הוא מופעל כברירת מחדל בטבלת ההסקות). כך אנו מעבדים רק שינויים - הוספה/עדכון/מחיקה - בטבלת המקור במקום לעבד מחדש את כל הטבלה בכל רענון. בהזרמת טבלת Delta readStream writeStream Change Data Capture ידיים כדי לאפשר את הניטור מעל טבלת ההסקות שלנו, אנו נוקטים את הצעדים הבאים: קרא את טבלת ההסקות כטבלת סטרימינג צור טבלת דלתא חדשה עם הסכימה הנכונה על ידי פירוק טבלת ההסקות שנוצרת על ידי נקודת הקצה של המודל שלנו. הכן את טבלת הבסיס (אם יש) צור צג מעל הטבלה המתקבלת ורענן את המדד תזמן זרימת עבודה כדי לפרוק את טבלת ההסקות למבנה הנכון ולרענן את המדדים ראשית עלינו להתקין את ה-API לניטור של Lakehouse. זה אמור להיות מותקן כבר אם אתה משתמש ב-Databricks rum time 15.3 LTS ומעלה: %pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl" dbutils.library.restartPython() בואו נקרא את טבלת ההסקות כטבלת סטרימינג requests_raw = spark.readStream\ .format("delta")\ .table(inference_table_name) requests_raw.isStreaming #-> True לאחר מכן עלינו לשים את הטבלה בפורמט הנכון כפי שתואר לעיל. טבלה זו צריכה לכלול שורה אחת עבור כל חיזוי עם התכונות וערך החיזוי הרלוונטיים. טבלת ההסקות שאנו מקבלים מהמודל המשרת את נקודת הקצה, מאחסנת את הבקשות והתגובות של נקודות הקצה כפורמט JSON מקונן. הנה דוגמה למטען ה-JSON עבור עמודת הבקשה והתגובה. #requests {"dataframe_records": [ {"user_id": 1, "item_id": 346, "rating": 5}, {"user_id": 1, "item_id": 377, "rating": 2}, {"user_id": 1, "item_id": 302, "rating": 4} ] } #reponse {"predictions": [4.248899936676025, 1.1172138452529907, 4.279165744781494] } # --> what we need | user_id | item_id | rating | prediction | |---------|---------|--------|------------| | 1 | 346 | 5 | 4.248900 | | 1 | 377 | 2 | 1.117214 | | 1 | 302 | 4 | 4.279166 | כדי לפרוק את הטבלה הזו לסכימה הנכונה נוכל להשתמש בקוד הבא המותאם מתיעוד של Databricks ( ). מחברת ה-Inference table Lakehouse Monitoring starter # define the schema of the request and reponse fields in the inference tabel REQUEST_FIELDS = [StructField('user_id', IntegerType(), False),\ StructField('item_id', IntegerType(), False),\ StructField('rating', IntegerType(), False)\ ] RESPONSE_FIELD = [T.StructField("predictions", FloatType())] def process_col_requests(json_str:str) -> str: """ to proccess the JSON payload of request column in inference table """ request = json.loads(json_str) dataframe_records = request.get("dataframe_records", []) return dataframe_records def procces_col_response(json_str: str) -> str: """ to proccess the JSON payload of reponse column in inference table """ reponse = json.loads(json_str) output = [{prediction_col: round(prediction,4)} for prediction in reponse["predictions"]] return output def get_model_id(endpoint_name: str) -> str: """ create the model id by concatinating the model name and the model version. note: the assumption is the endpoint serves only one model """ served_models = get_served_models(endpoint_name) model_id = f"{served_models[0]['model_name']}_{served_models[0]['model_version']}" return model_id def process_requests(requests_raw: DataFrame, request_fields: List[T.StructField], response_field: T.StructField, endpoint_name: str) -> DataFrame: """ Takes a stream of raw requests and processes them by: - Unpacking JSON payloads for requests and responses - Exploding batched requests into individual rows - Converting Unix epoch millisecond timestamps to be Spark TimestampType :param requests_raw: DataFrame containing raw requests. Assumed to contain the following columns: - `request` - `response` - `timestamp_ms` :param request_fields: List of StructFields representing the request schema :param response_field: A StructField representing the response schema :return: A DataFrame containing processed requests """ # Convert the timestamp milliseconds to TimestampType for downstream processing. requests_timestamped = requests_raw \ .withColumn(timestamp_col, (F.col("timestamp_ms") / 1000).cast(T.TimestampType())) \ .drop("timestamp_ms") # create the model identifier column model_id = get_model_id(endpoint_name) # Convert the model name and version columns into a model identifier column. requests_identified = requests_timestamped \ .withColumn(model_id_col, F.lit(model_id)) # Rename the date column to avoid collisions with features. requests_dated = requests_identified.withColumnRenamed("date", date_col) # Consolidate and unpack JSON. request_schema = T.ArrayType(T.StructType(request_fields)) response_schema = T.ArrayType(T.StructType(response_field)) # w udf_request = F.udf(process_col_requests, request_schema) udf_reponse = F.udf(procces_col_response, response_schema) requests_unpacked = requests_dated.withColumn("request", udf_request("request")).\ withColumn("response", udf_reponse("response")) # Explode batched requests into individual rows. DB_PREFIX = "__db" requests_exploded = requests_unpacked \ .withColumn(f"{DB_PREFIX}_request_response", F.arrays_zip(F.col("request"), F.col("response"))) \ .withColumn(f"{DB_PREFIX}_request_response", F.explode(F.col(f"{DB_PREFIX}_request_response"))) \ .select(F.col("*"), F.col(f"{DB_PREFIX}_request_response.request.*"), F.col(f"{DB_PREFIX}_request_response.response.*")) \ .drop(f"{DB_PREFIX}_request_response", "request", "response") requests_cleaned = requests_exploded.drop("status_code", "sampling_fraction", "client_request_id", "databricks_request_id", "request_metadata") return requests_cleaned הטבלה שהתקבלה תיראה כך: בשלב הבא עלינו לאתחל את שולחן הכיור שלנו dt_builder = DeltaTable.createIfNotExists(spark) \ .tableName(unpacked_requests_table_name) \ .addColumns(schema) \ .partitionedBy(requests_cleaned.schema) \ .property("delta.enableChangeDataFeed", "true") \ dt_builder.execute() ולכתוב את התוצאות checkpoint_path = f"dbfs:/payload-logging/{endpoint_name}/checkpoint" requests_stream = requests_cleaned.writeStream \ .trigger(once=True) \ .format("delta") \ .partitionBy(date_col) \ .outputMode("append") \ .option("checkpointLocation", checkpoint_path) \ .toTable(unpacked_requests_table_name) \ לבסוף, אנו יוצרים את טבלת הבסיס שלנו. DLM משתמש בטבלה זו כדי לחשב את הסחפים על ידי השוואת התפלגות של עמודות דומות של מודלים בסיסיים ועיקריים. לטבלת הבסיס צריכה להיות אותה עמודת תכונה כמו העמודה הראשית, כמו גם אותה עמודת זיהוי דגם. לטבלת הבסיס אנו משתמשים בטבלת החיזוי של שלנו שאנו מאחסנים מוקדם יותר לאחר שאימנו את המודל שלנו באמצעות ההיפרפרמטר הטוב ביותר. כדי לחשב את מדד הסחיפה, Databricks מחשב את מדדי הפרופיל הן עבור טבלת היסוד והן עבור טבלת היסוד. כאן תוכל לקרוא על מערך האימות . הטבלה הראשית וטבלת קו הבסיס #read the prediction table df_base_table = spark.table(f"{catalog_name}.{model_schema}.predictions") # create the model id and add it to the table model_id = get_model_id(endpoint_name) df_base_table = df_base_table.withColumn(model_id_col, F.lit(model_id)) #write the new table and enable the CDC on it output_base_table_name = f"{catalog_name}.{model_schema}.{base_table_prefix}_{model_name}" df_base_table.write.format("delta").mode("overwrite").saveAsTable(output_base_table_name) spark.sql(f"ALTER TABLE {output_base_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") כעת אנו קוראים כדי ליצור את לוח המחוונים לניטור שלנו. אנחנו יכולים לעשות זאת באמצעות או ה-API לניטור של Lakehouse. כאן אנו משתמשים באפשרות השנייה: ממשק המשתמש # This is where we store the metric tables. output_schema_name = f"{catalog_name}.{model_schema}" try: info = lm.create_monitor( table_name=unpacked_requests_table_name, profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities,#the aggregation window model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, # We will refresh the metrics on-demand in this notebook baseline_table_name=output_base_table_name, ) print(info) except Exception as e: # Ensure the exception was expected assert "RESOURCE_ALREADY_EXISTS" in str(e), f"Unexpected error: {e}" # Update the monitor if any parameters of this notebook have changed. lm.update_monitor( table_name=unpacked_requests_table_name, updated_params=dict( profile_type=lm.InferenceLog( timestamp_col=timestamp_col, granularities=granularities, model_id_col=model_id_col, prediction_col=prediction_col, label_col=label_col, problem_type=problem_type, ), output_schema_name=output_schema_name, schedule=None, baseline_table_name=output_base_table_name, ) ) # Refresh metrics calculated on the requests table. refresh_info = lm.run_refresh(table_name=unpacked_requests_table_name) print(refresh_info) לאחר שנפעיל את הקוד, לוקח קצת זמן עד ש-Databricks מחשב את כל המדד. כדי לראות את לוח המחוונים עבור ללשונית של טבלת הכיורים שלך (כלומר ). אתה אמור לראות דף כדלקמן. Quality unpacked_requests_table_name אם תלחץ על של הצג, תראה את הריענונים הרצים, הממתינים והעבר שלך. לחץ על כדי לפתוח את לוח המחוונים שלך. refresh history View Dashboard אז אנחנו מתחילים עם טבלת ההסקות ( ), מעבדים אותה ושומרים את התוצאה ב- ומעבירים את הטבלה הזו יחד עם טבלת הבסיס שלנו ( ) ל-API לניטור שלנו. ה-DLM מחשב את מדדי הפרופיל עבור כל טבלה ( ) והשתמש בהם כדי לחשב את מדדי הסחיפה ( ) my_endpoint_payload my_endpoint_payload_unpacked base_table_als my_endpoint_payload_unpacked_profile_metric my_endpoint_payload_unpacked_drift_metrics הנה לך! יש לך את כל מה שאתה צריך כדי לשרת ולפקח על הדגם שלך! בחלק הבא אני אראה לך כיצד להפוך תהליך זה לאוטומטי באמצעות ו- ! Databrick Assets Bundle Gitlab