이 기사에서는 Swetrix라는 웹 분석 플랫폼용 AI 서비스 개발 경험을 공유하고 싶습니다.
내 목표는 다음 스크린샷에 표시된 데이터를 기반으로 향후 웹사이트 트래픽을 예측하는 기계 학습 모델을 개발하는 것이었습니다.
최종 목표는 미래에 웹사이트에 어떤 트래픽이 나타날지에 대한 명확한 비전을 고객에게 제공함으로써 고객이 더 나은 통찰력을 얻고 전반적인 비즈니스 계획을 향상시킬 수 있도록 하는 것입니다.
기획 과정에서 AI와 API 서비스 간의 통신을 위해 RabbitMQ 메시지 브로커를 사용하여 마이크로서비스 아키텍처를 진행하기로 결정했습니다.
우선, 시간별 크론 작업으로 데이터를 별도의 데이터베이스에 모아야 합니다. 우리는 Swetrix 웹사이트의 원본 데이터가 ClickHouse에 저장되어 있기 때문에 ClickHouse를 선택하기로 결정했습니다. 형식에 대한 자세한 내용은 다음 섹션에서 다루겠습니다.
RabbitMQ는 단순성 때문에 메시지 브로커로 선택되었으며 AI와 API 서비스 간의 통신을 구축해야 합니다. 다 분해해서 주요 논리를 확인해 봅시다
Swetrix-AI 서비스는 백엔드 측에는 NestJs 프레임워크를 사용하고 데이터 전처리 및 모델 예측에는 Python 스크립트를 사용합니다.
우리는 프로젝트에 대한 다음 데이터를 analytics
테이블에 수집합니다. 기사의 첫 번째 섹션에서 이 데이터의 렌더링된 버전을 이미 확인했습니다.
다음 쿼리를 사용하여 이 (거의 허용되는) 결과를 얻을 수 있었습니다.
@Cron(CronExpression.EVERY_HOUR) async insertHourlyProjectData(): Promise<void> { const gatherProjectsData = ` INSERT INTO analytics.hourly_projects_data (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals) SELECT generateUUIDv4() as UniqueID, pid as projectID, toStartOfHour(now()) as statisticsGathered, groupArray(br) as br_keys, groupArray(br_count) as br_vals, groupArray(os) as os_keys, groupArray(os_count) as os_vals, ... groupArray(ct) as ct_keys, groupArray(ct_count) as ct_vals FROM ( SELECT pid, br, count(*) as br_count, os, count(*) as os_count, ... ct, count(*) as ct_count FROM analytics.analytics GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct ) GROUP BY pid; ` try { await clickhouse.query(gatherProjectsData).toPromise() } catch (e) { console.error( `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`, )
이 함수는 Cron 작업을 사용하여 매시간 실행되도록 예약되어 있습니다. 분석 데이터를 수집하여 클릭하우스 analytics.hourly_projects_data
에 삽입합니다.
산출
ClickHouse 제한으로 인해 원하는 데이터 형식을 얻을 수 없었습니다. 따라서 모델 학습에 필요한 전처리를 완료하기 위해 pandas
사용하기로 결정했습니다.
특히 저는 Python을 사용하여 다음을 수행했습니다.
하나의 카테고리와 관련된 키와 값을 하나의 JSON 필드로 결합합니다. 예를 들어 장치의 키와 값을 하나의 객체로 결합합니다.
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”} os_values = {1, 2, 2, 1, 5}
안으로:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
코드 및 출력 첨부:
def format_data(keys_list, vals_list, threshold): """ Format data by converting string representations of lists to actual lists, then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'. """ counts = defaultdict(int) for keys_str, vals_str in zip(keys_list, vals_list): keys = ast.literal_eval(keys_str) vals = ast.literal_eval(vals_str) for key, val in zip(keys, vals): counts[key] += val final_data = defaultdict(int) for value, count in counts.items(): final_data[value] = count return dict(final_data) def process_group(group): """ Combine specific groups by a group clause, and make a """ result = {} for col in group.columns: if col.endswith('_keys'): prefix = col.split('_')[0] # Extract prefix to identify the category (eg, 'br' for browsers) threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1 vals_col = col.replace('_keys', '_vals') keys_list = group[col].tolist() vals_list = group[vals_col].tolist() result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold) return pd.Series(result)
이 데이터 형식은 예측 자체에는 사용되지 않습니다. 데이터베이스에 저장하고 디버깅 목적으로 누락된 값이 없는지 확인하고 더 나아가 모델이 정확한 결과를 생성하는지 다시 확인하는 데 더 적합하다고 말하고 싶습니다. 결과.
적절한 모델을 훈련하기 위해 다양한 범주에 대해 다른 그룹을 정의하기로 결정했습니다. 즉, 특정 범주에 있는 그룹의 인스턴스 수가 전 세계적으로 특정 백분율(%) 미만인 경우 다른 범주의 일부로 추가됩니다.
예를 들어, os
카테고리에는 다음이 있습니다:
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
이 경우 Linux와 TempleOS는 모두 극히 드물기 때문에 다른 그룹 으로 결합되므로 최종 결과는 다음과 같습니다.
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
그리고 "희소성"은 카테고리에 따라 그리고 이 카테고리 임계값에 지정된 기준에 따라 다르게 결정됩니다.
고객의 선호도와 원하는 데이터에 따라 구성 가능
other_thresholds = { 'br': 0.06, 'os': 0.04, 'cc': 0.02, 'lc': 0.02, 'ref': 0.02, 'so': 0.03, 'me': 0.03, 'ca': 0.03, 'cc': 0.02, 'dv': 0.02, 'rg': 0.01, 'ct': 0.01 }
이를 달성하기 위해 2가지 기능이 구현되었습니다.
def get_groups_by_treshholds(df,column_name): """Calculate total values for all columns""" if column_name in EXCLUDED_COLUMNS: return counter = count_dict_values(df[column_name]) total = sum(counter.values()) list1 = [] for key, value in counter.items(): if not (value / total) < other_thresholds[column_name]: list1.append(key) return list1 def create_group_columns(df): column_values = [] for key in other_thresholds.keys(): groups = get_groups_by_treshholds(df, key) if not groups: continue for group in groups: column_values.append(f"{key}_{group}") column_values.append(f"{key}_other") return column_values column_values = create_group_columns(df) column_values
산출
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other', 'cc_UA', 'cc_GB', 'cc_other', 'dv_mobile', 'dv_desktop', 'dv_other']
기계 학습 모델로 작업할 때 입력 데이터가 모델이 이해할 수 있는 형식이어야 합니다. 기계 학습 모델에는 일반적으로 JSON과 같은 복잡한 데이터 구조가 아닌 숫자 값(정수, 부동 소수점)이 필요합니다.
따라서 이 요구 사항에 맞게 데이터를 조금 더 전처리하는 것이 좋습니다.
각 기능이 별도의 열로 표시되고 행에 해당 숫자 값이 포함되는 create_exploded_df
함수를 만들었습니다. (아직 이상적이지는 않지만 이것이 제가 생산할 수 있는 최고의 솔루션이었습니다)
def create_exploded_df(df): """ Function which creates a new data set, iterates through the old one and fill in values according to their belongings (br_other, etc..) """ new_df = df[['projectID', 'statisticsGathered']] for group in column_values: new_df[group] = 0 new_df_cols = new_df.columns df_cols = df.columns for column in df_cols: if column in ['projectID', 'statisticsGathered']: continue for index, row in enumerate(df[column]): if column in EXCLUDED_COLUMNS: continue for key, value in row.items(): total = 0 if (a:=f"{column}_{key}") in new_df_cols: new_df[a][index] = value else: total += value new_df[f"{column}_other"][index] = total return new_df new_df = create_exploded_df(df) new_df.to_csv("2-weeks-exploded.csv") new_df
3.3 시간 입력
우리가 가지고 있던 데이터 형식의 또 다른 문제점은 빈 행을 생성하는 대신 특정 시간에 프로젝트에 대한 트래픽이 없으면 행이 전혀 생성되지 않는다는 것인데, 이는 모델이 다음과 같이 설계되었다는 점을 고려하면 불편합니다. 다가오는 시간대(예: 다음 시간)에 대한 데이터를 예측합니다. 그러나 초기 기간에 사용할 수 있는 데이터가 없으면 예측을 위해 모델을 훈련하는 것이 불가능합니다.
따라서 누락된 시간을 찾아 한 시간을 건너뛸 때 빈 행을 삽입하는 스크립트를 작성했습니다.
모델 훈련과 관련하여 기본 접근 방식은 이전 시간의 데이터를 모델의 대상으로 사용하는 것이었습니다. 이를 통해 모델은 현재 데이터를 기반으로 향후 트래픽을 예측할 수 있습니다.
def sort_df_and_assign_targets(df): df = df.copy() df = df.sort_values(by=['projectID', 'statisticsGathered']) for column_name in df.columns: if not column_name.endswith('target'): continue df[column_name] = df.groupby('projectID')[column_name].shift(-1) return df new_df = sort_df_and_assign_targets(new_df)
산출
statisticsGathered
별도의 열로 수집 이러한 접근 방식의 주된 이유는 statisticsGathered
datetime
객체이기 때문에 내가 사용하려고 했던 모델(다음 섹션 확인)에서는 이를 처리하고 올바른 패턴을 식별할 수 없었기 때문입니다.
그 결과 끔찍한 MSE/MRSE
측정항목이 탄생했습니다. 따라서 개발 중에 day
, month
, hour
에 대한 기능을 분리하기로 결정하여 결과가 크게 향상되었습니다.
def split_statistic_gathered(df): df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int df['Hour'] = df['statisticsGathered'].dt.hour df = df.drop('statisticsGathered', axis = 1) return df new_df = split_statistic_gathered(new_df) new_df
그리고 그게 다야! 훈련 자체로 넘어 갑시다! 🎉🎉🎉
글쎄, 내 생각엔 이 애플리케이션을 구축하는 동안 실제 예측이 가장 어려운 부분이었던 것 같습니다.
가장 먼저 시도해 보고 싶었던 것은 LinearRegression
모델을 사용하는 것입니다.
다음 기능을 구현했습니다.
def create_model_for_target(train_df, target_series): X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False) reg = LinearRegression() reg.fit(X_train, Y_train) y_pred = reg.predict(x_test) return {"y_test": y_test, "y_pred": y_pred} def create_models_for_targets(df): models_data = dict() df = df.dropna() train_df = clear_df(df) for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]: models_data[target_name] = create_model_for_target(train_df, df[target_name]) return models_data
각 목표 열에 대해 데이터를 훈련 세트와 테스트 세트로 분할합니다. 그런 다음 훈련 데이터에 대해 LinearRegression
모델을 훈련하고 테스트 데이터에 대해 예측합니다.
결과가 올바른지 평가하기 위해 필요한 측정항목을 수집하고 출력을 생성하는 기능을 추가했습니다.
def evaluate_models(data): evaluation = [] for target, results in data.items(): y_test, y_pred = results['y_test'], results['y_pred'] mse = mean_squared_error(y_test, y_pred) rmse = mean_squared_error(y_test, y_pred) ** 0.5 mae = mean_absolute_error(y_test, y_pred) mean_y = y_test.mean() median_y = y_test.median() evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y}) return pd.DataFrame(evaluation)
출력을 생성하고 이를 mse
, rmse
, mae
및 mean_y
값을 계산하여 Excel 파일에 저장하는 스크립트를 작성했습니다.
보시다시피 측정항목이 만족스럽지 않으며 예상 교통 데이터는 정확하지 않으며 교통 예측 목표에 적합하지 않습니다.
그래서 시간당 총 방문자 수를 예측하기로 결정하여 다음과 같은 함수를 만들었습니다.
def add_target_column(df, by): totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1) df['total'] = totals_series df[f'total_{by}_target'] = totals_series return df def shift_target_column(df, by): df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True) df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1) return df new_df = add_target_column(new_df, 'br') new_df = shift_target_column(new_df, 'br') new_df[['total_br_target']]
이 기능은 특정 카테고리를 선택하고 이를 기반으로 총 방문자 수를 계산합니다. 이는 장치 값의 총 개수가 OS 값의 총 개수와 동일하기 때문에 작동합니다.
이러한 접근 방식을 통해 모델은 이전보다 10배 더 나은 결과를 보여주었습니다 .
이 경우에 대해 이야기하고 있다면 거의 수용 가능하며 기능을 사용할 준비가 되었습니다. 이제 고객은 이러한 예측 결과에 따라 예산 할당 및 서버 확장을 계획할 수 있습니다.
예측은 약 2.45명의 방문자만큼 실제 값과 다릅니다(RMSE = √MSE 이므로 ) . 마케팅 요구에 부정적인 영향을 미칠 수 없습니다.
이 기사는 상당히 광범위해졌고 앱은 아직 개발 중이므로 여기서 잠시 멈추겠습니다. 우리는 앞으로 이 접근 방식을 계속해서 개선할 것이며 계속해서 업데이트해 드리겠습니다!
읽어주시고 관심을 가져주셔서 감사합니다! 댓글 섹션에서 여러분의 피드백과 생각을 듣기를 기대합니다. 이 정보가 귀하의 목표에 도움이 되기를 바랍니다!
그리고 행운을 빌어!