paint-brush
대규모 언어 모델을 사용하여 dbt 프로젝트를 향상시키는 방법~에 의해@klimmy
505 판독값
505 판독값

대규모 언어 모델을 사용하여 dbt 프로젝트를 향상시키는 방법

~에 의해 Kliment Merzlyakov15m2024/06/02
Read on Terminal Reader

너무 오래; 읽다

LLM을 사용하여 텍스트 데이터에 대한 일반적인 자연어 처리 작업(분류, 감정 분석 등)을 dbt 환경에 유지하면서 100만 행당 10달러(작업 및 모델에 따라 다름)의 저렴한 비용으로 자동으로 해결할 수 있습니다. 지침, 세부정보 및 코드는 다음과 같습니다.
featured image - 대규모 언어 모델을 사용하여 dbt 프로젝트를 향상시키는 방법
Kliment Merzlyakov HackerNoon profile picture
0-item
1-item



TL;DR

LLM을 사용하여 텍스트 데이터에 대한 일반적인 자연어 처리 작업(분류, 감정 분석 등)을 dbt 환경에 유지하면서 100만 행당 10달러(작업 및 모델에 따라 다름)의 저렴한 비용으로 자동으로 해결할 수 있습니다. 지침, 세부정보 및 코드는 다음과 같습니다.


변환 레이어로 dbt를 사용하는 경우 구조화되지 않은 텍스트 데이터에서 의미 있는 정보를 추출하려는 상황이 발생할 수 있습니다. 이러한 데이터에는 고객 리뷰, 제목, 설명, Google Analytics 소스/매체 등이 포함될 수 있습니다. 이를 그룹으로 분류하거나 감정과 분위기를 가져올 수 있습니다.


잠재적인 해결책은 다음과 같습니다.

  • dbt 흐름 외부에서 기계 학습 모델 적용(또는 LLM 호출)
  • CASE WHEN 문을 사용하여 dbt 모델 내에서 간단한 분류 정의
  • 카테고리를 미리 정의하고 이를 원시 데이터베이스 레이어에 업로드하거나 dbt 시드 기능을 활용하세요.


Python dbt 모델이 발전함에 따라 해결책이 하나 더 있습니다. dbt 모델 중 하나로 dbt 환경 내에서 이러한 자연어 처리 작업을 유지할 수 있습니다.


이것이 도움이 될 수 있다면 dbt 프로젝트에서 OpenAI API를 사용하는 방법에 대한 아래의 단계별 가이드를 참조하세요. GitHub 리포지토리의 코드와 데이터 샘플을 사용하여 이 가이드의 모든 내용을 사용자 환경에서 재현할 수 있습니다(마지막 링크 참조).

환경 설정

이미 dbt 프로젝트와 데이터가 있거나 결과를 재현하고 싶지 않은 경우 (4)로 이동하거나 이 섹션을 완전히 건너뛰세요. 그렇지 않으면 다음이 필요합니다.


  1. dbt 프로젝트를 설정합니다 . 공식 문서

    1. 이 가이드를 위해 제가 준비한 것을 GitHub 에서 간단히 복제할 수 있습니다.

    2. Profiles.yml 파일을 생성/업데이트하는 것을 잊지 마세요.


  2. 데이터베이스를 설정합니다 . 저는 눈송이를 사용했습니다. 안타깝게도 무료 버전은 없지만 30일 무료 평가판을 제공합니다.

    1. 현재 dbt Python 모델은 Snowflake, Databricks 및 BigQuery(PostgreSQL 없음)에서만 작동합니다. 따라서 일부 세부 사항은 다를 수 있지만 이 튜토리얼은 모든 항목에 적용됩니다.


  3. 소스 데이터 준비

    1. 데이터 세트로는 TidyTuesday 저장소에 게시된 R 패키지 메타데이터를 사용했습니다.

      1. 여기 에서 다운로드할 수 있습니다. 데이터 세트에 대한 자세한 내용은여기에 있습니다.
      2. 또는 여기 내 저장소의 경량 버전을 사용할 수 있습니다.
    2. 데이터베이스에 업로드하세요.

    3. 데이터베이스 및 스키마 이름과 일치하도록 dbt 프로젝트의 source.yml 파일을 업데이트합니다.


  4. OpenAI API 키 가져오기

    1. 공식 문서 의 빠른 시작 지침을 따르세요.

    2. 아니요: 무료는 아니지만 종량제입니다. 따라서 테스트 10행 데이터 세트를 사용하면 실험 중에 1달러 이상 비용이 청구되지 않습니다.

    3. 특히 주의하려면 지출 한도를 설정하세요.


  5. Snowflake에서 외부 액세스 통합 설정

    1. 이는 Snowflake를 사용하는 경우에만 적용됩니다.
    2. 그렇지 않으면 dbt Python 모델은 인터넷의 모든 API(OpenAI API 포함)에 액세스할 수 없습니다.
    3. 공식 지침을 따르십시오.
    4. 이 통합에 OpenAI API 키를 저장합니다.

카테고리 목록을 생각해 보세요

첫째, 분류 작업을 해결하는 경우 LLM 프롬프트에서 사용할 카테고리(클래스라고도 함)가 필요합니다. 기본적으로 다음과 같이 말할 것입니다. "이러한 범주 목록이 있습니다. 이 텍스트가 어느 범주에 속하는지 정의할 수 있습니까?"


여기에는 몇 가지 옵션이 있습니다:

  1. 사전 정의된 카테고리 목록을 수동으로 생성

    1. 안정적이고 예측 가능한 카테고리가 필요한 경우 적합합니다.

    2. 여기에 "기타"를 추가하는 것을 잊지 마세요. 그러면 LLM은 불확실할 때 이러한 옵션을 갖게 됩니다.

    3. 프롬프트에서 LLM에게 "기타" 카테고리를 사용할 때마다 카테고리 이름을 제안하도록 요청하세요.

    4. 사전 정의된 목록을 데이터베이스의 원시 레이어에 업로드하거나 dbt 프로젝트의 CSV로 업로드합니다( dbt seed 활용).


  2. 데이터 샘플을 LLM에 제공하고 N개의 범주를 생성하도록 요청합니다.

    1. 이전 접근 방식과 동일하지만 목록에 대한 도움을 받고 있습니다.

    2. GPT를 사용하는 경우 재현성을 위해 여기에서 시드를 사용하는 것이 좋습니다.


  3. 사전 정의된 카테고리 없이 LLM이 이동 중에도 작업을 수행하도록 하세요.

    1. 이로 인해 예측하기 어려운 결과가 발생할 수 있습니다.

    2. 동시에, 임의성의 여유가 있으면 괜찮다면 충분합니다.

    3. GPT 사용 사례에서는 다시 실행해야 할 경우 다른 결과가 발생하지 않도록 온도 = 0을 설정하는 것이 좋습니다.


이번 블로그 게시물에서는 세 번째 옵션을 선택하겠습니다.

OpenAI API를 호출하는 dbt Python 모델 생성

이제 이 게시물의 핵심으로 가서 업스트림 테이블에서 새 텍스트 데이터를 가져와 OpenAI API에 공급하고 카테고리를 테이블에 저장하는 dbt 모델을 만들어 보겠습니다.


위에서 언급한 대로 R 패키지 데이터세트를 사용하겠습니다. R은 데이터 분석 분야에서 널리 사용되는 프로그래밍 언어입니다. 이 데이터 세트에는 버전, 라이센스, 작성자, 제목, 설명 등과 같은 CRAN 프로젝트의 R 패키지에 대한 정보가 포함되어 있습니다. 제목을 기반으로 각 패키지에 대한 카테고리를 생성할 것이므로 title 필드에 관심이 있습니다.


  1. 모델의 베이스 준비

    • dbt 구성은 dbt.config(...) 메소드를 통해 전달될 수 있습니다.


    • dbt.config에는 추가 인수가 있습니다. 예를 들어 packages 는 패키지 요구 사항입니다.


    • dbt Python 모델은 업스트림 모델 dbt.ref('...') 또는 dbt.source('...') 참조할 수 있습니다.


    • DataFrame을 반환해야 합니다. 데이터베이스는 이를 테이블로 저장합니다.


     import os import openai import pandas as pd COL_TO_CATEGORIZE = 'title' def model(dbt, session): import _snowflake dbt.config( packages=['pandas', 'openai'], ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) return df
  2. OpenAI API에 연결

    • secretsexternal_access_integrations dbt.config에 전달해야 합니다. 여기에는 Snowflake 외부 액세스 통합에 저장된 비밀 참조가 포함됩니다.


    • 참고: 이 기능은 불과 며칠 전에 출시되었으며 베타 dbt 버전 1.8.0-b3에서만 사용할 수 있습니다.

     dbt.config( packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), )
  3. dbt 모델을 증분적으로 만들고 전체 새로 고침을 끄십시오.

    • 이 부분은 OpenAI API 비용을 낮게 유지하는 데 필수적입니다.
    • 동일한 텍스트를 여러 번 분류하는 것을 방지합니다.
    • 그렇지 않으면 하루에 여러 번 dbt run 실행할 때마다 전체 데이터를 OpenAI로 전송하게 됩니다.
    • dbt.config에 materialized='incremental' , incremental_strategy='append' , full_refresh = False 추가하고 있습니다.
    • 이제 전체 검색은 첫 번째 DBT 실행에 대해서만 수행되고 이후 실행(증분 또는 전체 새로 고침에 관계없이)에서는 델타만 분류됩니다.
    • 더욱 주의를 기울이고 싶다면 데이터를 약간 전처리하여 고유한 항목 수를 줄일 수 있지만 LLM이 자연어에서 더 잘 작동하므로 너무 많은 전처리를 피하는 것이 좋습니다.
     dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) if dbt.is_incremental: pass


  4. 증분 논리 추가

    • 증분 실행에서는(설정으로 인해 첫 번째 실행을 제외한 모든 실행을 의미함) 이미 분류된 타이틀을 모두 제거해야 합니다.
    • 간단히 dbt.this 사용하여 이를 수행할 수 있습니다. 일반 증분 모델과 유사합니다.
     if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
  5. 일괄적으로 OpenAI API 호출

    • 비용을 줄이려면 일괄적으로 OpenAI API에 데이터를 보내는 것이 좋습니다.
    • 시스템 프롬프트는 분류해야 하는 텍스트보다 5배 더 클 수 있습니다. 각 타이틀에 대해 별도로 시스템 프롬프트를 보내면 반복적인 항목에 대한 토큰 사용량이 훨씬 높아집니다.
    • 그러나 배치는 크지 않아야 합니다. 대규모 배치의 경우 GPT는 덜 안정적인 결과를 생성하기 시작합니다. 내 실험에 따르면 배치 크기 = 5가 충분히 잘 작동합니다.
    • 또한 응답이 관련 크기를 초과하지 않도록 max_tokens 제약 조건을 추가했습니다.
     BATCH_SIZE = 5 n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True)


  6. LLM에 대한 프롬프트에 대해 이야기할 시간입니다. 그것이 내가 얻은 것입니다:

``괄호 안에 CRAN R 패키지 제목 목록이 제공됩니다. 제목은 "|"로 구분됩니다. 징후. 각 제목에 대한 카테고리를 생각해 보세요. "|"로 구분된 카테고리 이름만 반환합니다. 징후.


  • 요점을 바로 지시하십시오.
  • SQL 삽입을 방지하려면 ``` 기술을 사용하세요.
  • 결과 형식을 명확히 하세요. 내 경우에는 "|"를 요청했습니다. 입력과 출력 모두에 대한 구분 기호로 사용


  1. 최종 DBT 모델 코드

     import os import openai import pandas as pd SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign. ''' COL_TO_CATEGORIZE = 'title' BATCH_SIZE = 5 def model(dbt, session): import _snowflake dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :] n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True) return df

비용 추정

OpenAI API 가격은 여기에 나와 있습니다. 요청하고 반환한 토큰 수에 대해 요금을 청구합니다. 토큰은 요청의 문자 수와 연관된 인스턴스입니다. 특정 텍스트에 대한 여러 토큰을 평가하는 오픈 소스 패키지가 있습니다. 예를 들어 틱토큰 . 수동으로 평가하고 싶다면 공식 OpenAI 토크나이저( 여기) 를 이용하세요.


우리 데이터 세트에는 ~18,000개의 타이틀이 있습니다. 대략 320K 입력 토큰(배치 크기 = 5를 사용하는 경우 180K 제목 및 140K 시스템 프롬프트) 및 50K 출력 토큰과 같습니다. 모델에 따라 전체 스캔 비용은 다음과 같습니다.


  1. GPT-4 Turbo : $4.7 . 가격: 입력: $10 / 1M 토큰; 출력: $30 / 1M 토큰.
  2. GPT-4 : $12.6. 가격: 입력: $30 / 1M 토큰; 출력: $60 / 1M 토큰.
  3. GPT-3.5 Turbo : $0.2. 가격: 입력: $0.5 / 1M 토큰; 출력: $1.5 / 1M 토큰.

결과

dbt 모델은 매력적으로 작동했습니다. 18K 패키지를 빈틈없이 모두 분류하는데 성공했습니다. 이 모델은 비용 효율적이며 다중 DBT 실행으로부터 보호되는 것으로 입증되었습니다.


여기 Tableau Public에 결과 대시보드를 게시했습니다. 자유롭게 가지고 놀고, 데이터를 다운로드하고, 그 위에 원하는 것을 만들어 보세요.

내가 찾은 몇 가지 흥미로운 세부정보는 다음과 같습니다.


  • 상위 1위 카테고리는 Data Visualization (1,190개 패키지 또는 6%)입니다. 나는 이것이 특히 Shiny, Plotly 등과 같은 패키지에서 시각화 도구로서 R의 인기를 증명한다고 생각합니다.


  • 2023년에 가장 많이 성장한 두 가지 카테고리는 Data ImportData Processing 였습니다. R이 데이터 처리 도구로 더 많이 사용되기 시작한 것 같습니다.


  • 상위 30개 카테고리 중 가장 큰 연간 성장은 2019년 Natural Language Processing 였습니다. 유명한 논문 "Attention Is All You Need" 이후 2년, GPT-1 출시 후 반년 :)

추가 아이디어

  1. GPT 임베딩 이라는 대체 접근 방식을 사용할 수 있습니다.

    • 훨씬 저렴합니다.

    • 그러나 분류 부분은 스스로 수행해야 하기 때문에 엔지니어링이 더 많이 필요합니다(다음 게시물 중 하나에서 이 옵션을 살펴볼 것이므로 계속 지켜봐 주시기 바랍니다).


  2. 물론 이 부분을 dbt에서 제거하고 클라우드 기능이나 사용하는 인프라에 푸시하는 것이 합리적입니다. 동시에 해당 정보를 dbt에 보관하고 싶다면 이 게시물을 참조하세요.


  3. 모델에 논리를 추가하지 마십시오. 한 가지 작업만 수행해야 합니다. LLM을 호출하고 결과를 저장합니다. 이렇게 하면 재실행을 방지하는 데 도움이 됩니다.


  4. dbt 프로젝트에서 많은 환경을 사용하고 있을 가능성이 높습니다. 각 풀 요청에 대해 각 개발자 환경에서 이 모델을 반복해서 실행하지 않도록 주의해야 합니다.

    • 이를 수행하려면 if dbt.config.get("target_name") == 'dev' 와 논리를 통합할 수 있습니다.


  5. 구분 기호를 사용한 응답은 불안정할 수 있습니다.

    • 예를 들어 GPT는 예상보다 적은 수의 요소를 반환할 수 있으며 초기 제목을 카테고리 목록에 매핑하기가 어렵습니다.

    • 이 문제를 해결하려면 요청에 response_format={ "type": "json_object" } 추가하여 JSON 출력을 요구하세요. 공식 문서를 참조하세요.

    • JSON 출력을 사용하면 프롬프트에서 {"title": "category"} 형식으로 답변을 제공하도록 요청한 다음 이를 초기 값에 매핑할 수 있습니다.

    • 응답 크기가 커지므로 비용이 더 많이 듭니다.

    • 이상하게도 GPT 3.5 Turbo를 JSON으로 전환했을 때 분류 품질이 급격히 떨어졌습니다.


  6. Snowflake에는 cortex.complete() 함수를 사용하는 대안이 있습니다. dbt 블로그에서 Joel Labes의 훌륭한 게시물을 확인하세요.


그게 다야! 당신이 무슨 생각을하는지 제게 알려주세요.

연결

GitHub의 전체 코드: 링크

Tableau Public 대시보드: 링크

TidyTuesday R 데이터세트:링크