Bu makalede Swetrix adlı bir web analitiği platformu için yapay zeka hizmeti geliştirme konusundaki deneyimimi paylaşmak istiyorum.
Amacım, aşağıdaki ekran görüntüsünde görüntülenen verilere dayanarak gelecekteki web sitesi trafiğini tahmin edecek bir makine öğrenimi modeli geliştirmekti.
Nihai hedef, gelecekte web sitelerinde hangi trafiğin görüneceğine dair müşteri için net bir vizyona sahip olmak, böylece daha iyi öngörüler elde etmelerine ve genel olarak iş planlamasını geliştirmelerine olanak sağlamaktır.
Planlama sırasında, AI ve API hizmetleri arasındaki iletişim için RabbitMQ mesaj komisyoncusu ile Mikro Hizmet Mimarisine geçilmesine karar verildi.
Öncelikle saatlik cron görevi ile verileri ayrı bir veritabanına toplamamız gerekiyor. Swetrix'teki web sitelerindeki orijinal veriler burada saklandığından ClickHouse'u seçmeye karar verdik. Formatla ilgili ayrıntılar sonraki bölümlerde ele alınacaktır.
Basitliği nedeniyle mesaj komisyoncusu olarak RabbitMQ seçildi ve AI ile API hizmetleri arasında iletişim kurmamız gerekiyor. Her şeyi parçalayalım ve ana mantığı kontrol edelim
Swetrix-AI hizmeti, arka uç tarafı için NestJ çerçevesini ve veri ön işleme ve model tahminleri için Python komut dosyalarını kullanacak.
Projelere ilişkin aşağıdaki verileri bir analytics
tablosunda topluyoruz. Bu verinin render edilmiş versiyonunu zaten yazının ilk bölümünde görmüştünüz.
Bu (neredeyse kabul edilebilir) sonuca aşağıdaki sorguyla ulaşmayı başardım:
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
İşlev, bir Cron İşi kullanılarak her saat başı çalışacak şekilde planlanmıştır. Analitik verilerini toplar ve bir clickhouse analytics.hourly_projects_data
ekler.
Çıktı
ClickHouse sınırlamaları nedeniyle istenilen veri formatına ulaşamadım. Bu nedenle modelin eğitimi için gerekli olan ön işlemeyi tamamlamak amacıyla pandas
kullanmaya karar verdim.
Özellikle aşağıdakileri yapmak için Python'u kullandım:
Bir kategoriyle ilgili anahtarları ve değerleri tek bir JSON alanında birleştirin; örneğin, cihazların anahtarlarını ve değerlerini tek bir nesnede birleştirin.
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
İçine:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
Kodu ve çıktıyı ekleme:
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
Bu veri formatı tahminin kendisi için kullanılmayacaktır; bunun daha çok veri tabanında saklanması ve eksik değer olmadığını doğrulamak ve ayrıca modelin doğru bir tahmin ürettiğini tekrar kontrol etmek amacıyla hata ayıklama amaçlı olduğunu söyleyebilirim. sonuç.
Yeterli bir model geliştirmek için çeşitli kategoriler için başka gruplar tanımlamaya karar verdim. Bu, belirli bir kategorideki bir grubun küresel olarak örnek sayısının belirli bir yüzdenin (%) altında olması durumunda, diğerinin bir parçası olarak ekleneceği anlamına gelir.
Örneğin, os
kategorisinde elimizde:
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
Bu durumda hem Linux hem de TempleOS son derece nadir olduğundan, başka bir grupta birleştirilecekler, dolayısıyla sonuç şöyle olacak:
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
Ve “nadirlik”, kategoriye bağlı olarak ve bu kategori için belirlenen eşik değere göre farklı şekilde belirlenir.
Müşterinin tercihlerine ve istenilen verilere göre yapılandırılabilir
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
Bunu başarmak için uygulanan 2 fonksiyon vardı
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
Çıktı
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
Makine öğrenimi modelleriyle çalışırken giriş verilerinin modelin anlayabileceği bir formatta olması çok önemlidir. Makine öğrenimi modelleri genellikle JSON gibi karmaşık veri yapıları yerine sayısal değerler (tamsayılar, kayan değerler) gerektirir.
Bu nedenle yine bu gereksinime uyacak şekilde verilerimizin biraz daha ön işlenmesi tercih edilir.
Her özelliğin ayrı bir sütun olarak temsil edildiği ve satırların karşılık gelen sayısal değerleri içerdiği create_exploded_df
işlevini oluşturdum. (Henüz ideal değil ama üretebildiğim en iyi çözüm buydu)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 Saatleri doldurun
Veri formatıyla ilgili bir diğer sorun da, boş bir satır oluşturmak yerine belirli bir saatte bir proje için trafik olmasaydı, hiç satır olmayacaktı ki bu, modelin şu şekilde tasarlandığını düşünürsek sakıncalıdır: yaklaşan zaman aralığına (örneğin bir sonraki saate) ilişkin verileri tahmin edin. Ancak, başlangıç zaman dilimi için mevcut veri yoksa modeli tahminlerde bulunacak şekilde eğitmek mümkün değildir.
Bu nedenle eksik saatleri bulan ve bir saat atlandığında boş satırlar ekleyen bir komut dosyası yazdım
Model eğitimine ilişkin birincil yaklaşım, modelin hedefi olarak önceki saate ait verileri kullanmaktı. Bu, modelin mevcut verilere dayanarak gelecekteki trafiği tahmin etmesine olanak tanır.
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
Çıktı
statisticsGathered
bölmeAyrı sütunlarda toplandı Böyle bir yaklaşımın temel nedeni, statisticsGathered
bir datetime
nesnesi olması ve kullanmaya çalıştığım modellerin (sonraki bölümleri kontrol edin) onu işleyememesi ve doğru modeli tanımlayamamasıdır.
Bu, berbat MSE/MRSE
ölçümleriyle sonuçlandı. Bu nedenle geliştirme sırasında özelliklerin day
, month
ve hour
göre ayrılmasına karar verildi ve bu da sonuçları önemli ölçüde artırdı.
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
Ve bu kadar! Hadi eğitimin kendisine geçelim! 🎉🎉🎉
Sanırım bu uygulamayı oluştururken en zorlu kısım gerçek tahmindi.
Denemek istediğim ilk şey LinearRegression
modelini kullanmaktı:
Aşağıdaki işlevleri uyguladım:
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
Her hedef sütun için verileri eğitim ve test kümelerine ayırdık. Daha sonra eğitim verileri üzerinde bir LinearRegression
modeli eğitiyoruz ve test verileri üzerinde tahminler yapıyoruz.
Sonuçların doğru olduğunu değerlendirmek için gerekli metrikleri toplayan ve çıktıyı üreten fonksiyonu ekledim.
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
Çıktıyı oluşturan ve onu excel dosyasına kaydeden, mse
, rmse
, mae
ve mean_y
değerlerini hesaba katan bir komut dosyası yazdım
Gördüğünüz gibi ölçümler tatmin edici değil ve tahmin edilen trafik verileri doğruluktan uzak olacak ve trafik tahmini hedeflerime uygun olmayacak.
Bu nedenle saat başına toplam ziyaretçi sayısını tahmin etmeye karar verdim, böylece aşağıdaki işlevler oluşturuldu
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
Bu işlev belirli bir kategoriyi alır ve buna göre toplam ziyaretçiyi hesaplar. Bu işe yarar çünkü Cihaz değerlerinin toplam sayısı, toplam İşletim Sistemi değerleri sayısıyla aynı olacaktır.
Böyle bir yaklaşımla model, eskisinden 10 kat daha iyi sonuçlar gösterdi .
Bu durumdan bahsediyorsak neredeyse kabul edilebilir ve kullanıma hazır bir özelliktir. Müşteriler artık bu tahminlerin sonucuna göre bütçe tahsisini ve sunucu ölçeklendirmesini planlayabiliyor
Tahminler gerçek değerlerden yaklaşık 2,45 ziyaretçi kadar sapmaktadır (RMSE = √MSE olduğundan ) . Pazarlama ihtiyaçları açısından olumsuz, önemli bir etkisi olamaz.
Bu makale oldukça kapsamlı olduğundan ve uygulama geliştirilme aşamasında olduğundan burada duracağız. İlerleyen süreçte bu yaklaşımı geliştirmeye devam edeceğiz ve sizi güncel tutacağım!
Okuduğunuz ve ilginiz için teşekkürler! Yorum bölümünde geri bildirimlerinizi ve düşüncelerinizi duymayı sabırsızlıkla bekliyorum. Umarım bu bilgiler hedefleriniz için faydalı olur!
Ve iyi şanslar!