paint-brush
大規模言語モデルを使用して dbt プロジェクトを強化する方法@klimmy
563 測定値
563 測定値

大規模言語モデルを使用して dbt プロジェクトを強化する方法

Kliment Merzlyakov15m2024/06/02
Read on Terminal Reader

長すぎる; 読むには

LLM を使用すると、テキスト データの一般的な自然言語処理タスク (分類、感情分析など) を、dbt 環境にとどまりながら、100 万行あたり 10 ドルという低価格で自動的に解決できます (タスクとモデルによって異なります)。手順、詳細、およびコードを以下に示します。
featured image - 大規模言語モデルを使用して dbt プロジェクトを強化する方法
Kliment Merzlyakov HackerNoon profile picture
0-item
1-item



要約

LLM を使用すると、テキスト データの一般的な自然言語処理タスク (分類、感情分析など) を、dbt 環境にとどまりながら、100 万行あたり 10 ドルという低価格で自動的に解決できます (タスクとモデルによって異なります)。手順、詳細、およびコードを以下に示します。


dbt を変換レイヤーとして使用している場合、非構造化テキスト データから意味のある情報を抽出したい場合があります。このようなデータには、顧客レビュー、タイトル、説明、Google アナリティクスのソース/メディアなどが含まれます。それらをグループに分類したり、感情やトーンを取得したりする必要があるかもしれません。


潜在的な解決策としては

  • dbtフローの外で機械学習モデルを適用する(またはLLMを呼び出す)
  • CASE WHEN ステートメントを使用して dbt モデル内で単純な分類を定義する
  • 事前にカテゴリを定義済みで、それを生のデータベース レイヤーにアップロードするか、dbt シード機能を活用する


Python dbt モデルが進化するにつれて、もう 1 つの解決策があります。これらの自然言語処理タスクを dbt モデルの 1 つとして dbt 環境内に保持することができます。


それが役に立つと思われる場合は、dbt プロジェクトで OpenAI API を使用する方法についてのステップバイステップ ガイドを以下で参照してください。GitHub リポジトリのコードとデータ サンプルを使用して、このガイドのすべてを自分の環境で再現できます (最後のリンクを参照)。

環境の設定

すでに dbt プロジェクトとデータがある場合、または結果を再現したくない場合は、(4) に進むか、このセクションを完全にスキップしてください。それ以外の場合は、次のものが必要になります。


  1. dbt プロジェクトをセットアップします公式ドキュメント

    1. このガイド用に用意したものをGitHubからクローンするだけです。

    2. profiles.yml ファイルを作成/更新することを忘れないでください。


  2. データベースをセットアップします。私はSnowflakeを使いました。残念ながら無料版はありませんが、 30日間の無料トライアルは提供されています。

    1. 現在、dbt Python モデルは Snowflake、Databricks、BigQuery でのみ動作します (PostgreSQL は動作しません)。したがって、このチュートリアルはいずれのモデルでも動作するはずですが、詳細は異なる場合があります。


  3. ソースデータを準備する

    1. データセットとして、TidyTuesday リポジトリで公開されている R パッケージ メタデータを使用しました。

      1. ここからダウンロードできます。データセットの詳細はこちら
      2. あるいは、私のリポジトリから軽量版を使用することもできます
    2. データベースにアップロードします。

    3. データベース名とスキーマ名と一致するように、dbt プロジェクトのsource.ymlファイルを更新します。


  4. OpenAI APIキーを取得する

    1. 公式ドキュメントのクイックスタート手順に従ってください。

    2. 注意: 無料ではありませんが、従量課金制です。したがって、テスト用の 10 行のデータセットでは、実験中に 1 ドルを超える料金は請求されません。

    3. より注意するために、支出限度額を設定してください。


  5. Snowflakeで外部アクセス統合を設定する

    1. これは、Snowflake を使用する場合にのみ適用されます。
    2. これを行わないと、dbt Python モデルはインターネット上のどの API (OpenAI API を含む) にもアクセスできません。
    3. 公式の指示に従ってください。
    4. この統合に OpenAI API キーを保存します。

カテゴリーのリストを作成する

まず、分類タスクを解く場合、LLM プロンプトで使用するカテゴリ (クラスとも呼ばれます) が必要です。基本的には、「これらのカテゴリのリストがありますが、このテキストがどのカテゴリに属するか定義できますか?」と言うことになります。


いくつかのオプションは次のとおりです:

  1. 定義済みカテゴリのリストを手動で作成する

    1. 安定した予測可能なカテゴリが必要な場合に適しています。

    2. ここで「その他」を忘れずに追加してください。そうすれば、不明な場合に LLM がこれらのオプションを使用できるようになります。

    3. LLM が「その他」カテゴリを使用する場合は、プロンプトでカテゴリ名を提案するように依頼します。

    4. 定義済みのリストをデータベースの生のレイヤーにアップロードするか、または dbt プロジェクトに CSV としてアップロードします ( dbt seedを使用)。


  2. データのサンプルを LLM に入力し、N 個のカテゴリを作成するように依頼します。

    1. 前回と同じアプローチですが、リストの作成に協力してもらっています。

    2. GPT を使用する場合は、再現性のためにここでシードを使用することをお勧めします。


  3. 事前定義されたカテゴリを使用せずに、LLM に作業を任せましょう。

    1. これにより、予測しにくい結果が生じる可能性があります。

    2. 同時に、ランダム性の余地が許容できるのであれば、それで十分です。

    3. GPT の使用例では、再実行が必要な場合に異なる結果を回避するために、temperature = 0 を設定することをお勧めします。


このブログ投稿では、3 番目のオプションを選択します。

OpenAI API を呼び出すための dbt Python モデルを作成する

さて、この投稿の本題に入り、アップストリーム テーブルから新しいテキスト データを取得し、それを OpenAI API にフィードして、カテゴリをテーブルに保存する dbt モデルを作成しましょう。


前述のように、R パッケージ データセットを使用します。R は、データ分析で非常に人気のあるプログラミング言語です。このデータセットには、バージョン、ライセンス、作成者、タイトル、説明など、CRAN プロジェクトの R パッケージに関する情報が含まれています。タイトルに基づいて各パッケージのカテゴリを作成するため、 titleフィールドに注目します。


  1. モデルのベースを準備する

    • dbt 構成はdbt.config(...)メソッドを介して渡すことができます。


    • dbt.config には追加の引数があります。たとえば、 packagesパッケージ要件です。


    • dbt Python モデルは上流モデルdbt.ref('...')またはdbt.source('...')を参照できます。


    • DataFrame を返す必要があります。データベースはそれをテーブルとして保存します。


     import os import openai import pandas as pd COL_TO_CATEGORIZE = 'title' def model(dbt, session): import _snowflake dbt.config( packages=['pandas', 'openai'], ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) return df
  2. OpenAI APIに接続する

    • secretsexternal_access_integrations dbt.config に渡す必要があります。これには、Snowflake 外部アクセス統合に保存されているシークレット参照が含まれます。


    • 注: この機能は数日前にリリースされたばかりで、ベータ版の dbt バージョン 1.8.0-b3 でのみ利用可能です。

     dbt.config( packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), )
  3. dbt モデルを増分にして、完全更新をオフにします。

    • この部分は、OpenAI API のコストを低く抑えるために不可欠です。
    • 同じテキストが複数回分類されることを防ぎます。
    • そうしないと、 dbt runを実行するたびに、完全なデータが OpenAI に送信されることになります。これは 1 日に数回実行されることもあります。
    • dbt.config に、 materialized='incremental'incremental_strategy='append'full_refresh = False追加します。
    • これで、フルスキャンは最初の dbt 実行に対してのみ実行され、それ以降の実行では (増分更新か完全更新かに関係なく) デルタのみが分類されます。
    • より注意したい場合は、データを少し前処理して一意のエントリの数を減らすことができますが、LLM は自然言語でより効果的に機能するため、前処理をしすぎないようにしてください。
     dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) if dbt.is_incremental: pass


  4. 増分ロジックを追加する

    • 増分実行(設定により、最初の実行を除くすべての実行を意味します)では、すでに分類されているタイトルをすべて削除する必要があります。
    • dbt.thisを使用するだけで実行できます。通常の増分モデルと同様です。
     if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
  5. OpenAI APIをバッチで呼び出す

    • コストを削減するには、データをバッチで OpenAI API に送信することをお勧めします。
    • システム プロンプトは、分類する必要があるテキストの 5 倍の大きさになることがあります。タイトルごとにシステム プロンプトを別々に送信すると、繰り返しの作業でトークンの使用量が大幅に増加します。
    • ただし、バッチは大きくすべきではありません。バッチが大きいと、GPT は安定性の低い結果を生成し始めます。私の実験では、バッチ サイズ = 5 で十分に機能します。
    • さらに、応答が関連するサイズを超えないようにするために、 max_tokens制約を追加しました。
     BATCH_SIZE = 5 n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True)


  6. LLM のプロンプトについて話す時間です。私が得たものは次のとおりです。

CRAN R パッケージ タイトルのリストが ``` 括弧で囲まれて提供されます。タイトルは "|" 記号で区切られます。タイトルごとにカテゴリを作成します。"|" 記号で区切られたカテゴリ名のみを返します。


  • 指示は要点を簡潔にしてください。
  • SQL インジェクションを回避するには、 ``` テクニックを使用します。
  • 結果のフォーマットを明確にしてください。私の場合、入力と出力の両方の区切りとして「|」を要求しました。


  1. 最終的な dbt モデル コード

    import os import openai import pandas as pd SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign. ''' COL_TO_CATEGORIZE = 'title' BATCH_SIZE = 5 def model(dbt, session): import _snowflake dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :] n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True) return df

コスト見積

OpenAI API の料金については、こちら をご覧ください。リクエストされたトークンの数と返されたトークンの数に応じて料金が発生します。トークンは、リクエスト内の文字数に関連付けられたインスタンスです。特定のテキストのトークンの数を評価するオープンソース パッケージがあります。たとえば、 Tiktoken などです。手動で評価したい場合は、こちら の公式 OpenAI トークナイザーにアクセスしてください。


私たちのデータセットには、約 18,000 のタイトルがあります。大まかに言うと、320,000 の入力トークン (バッチ サイズ = 5 を使用する場合、180,000 のタイトルと 140,000 のシステム プロンプト) と 50,000 の出力トークンに相当します。モデルに応じて、フルスキャンのコストは次のようになります。


  1. GPT-4 Turbo : 4.7 ドル。価格: 入力: 100 万トークンあたり 10 ドル、出力: 30 万トークンあたり。
  2. GPT-4 : 12.6 ドル。価格: 入力: 30 ドル / 100 万トークン、出力: 60 ドル / 100 万トークン。
  3. GPT-3.5 Turbo : $0.2。価格: 入力: $0.5 / 100万トークン、出力: $1.5 / 100万トークン。

結果

dbt モデルは見事に機能しました。18K のパッケージすべてを隙間なく分類できました。このモデルはコスト効率が良く、複数の dbt 実行から保護されていることが証明されました。


結果ダッシュボードを Tableau Public にここで公開しました。自由に操作し、データをダウンロードし、その上に好きなものを作成してください。

私が見つけた興味深い詳細は次のとおりです:


  • トップ 1 のカテゴリはData Visualization (1,190 パッケージ、6%) です。これは、特に Shiny、Plotly などのパッケージとともに、視覚化ツールとしての R の人気を証明していると思います。


  • 2023 年に最も成長したカテゴリは、 Data ImportData Processingでした。R はデータ処理ツールとしてより多く使用され始めたようです。


  • 2019年のトップ30カテゴリーの中で、前年比で最も成長が大きかったのはNatural Language Processingた。有名な論文「Attention Is All You Need」の2年後、GPT-1のリリースから半年後のことでした:)

その他のアイデア

  1. 別のアプローチとして、 GPT 埋め込みを使用することもできます。

    • ずっと安いですよ。

    • ただし、分類部分は自分で行う必要があるため、エンジニアリングの要素が強くなります (このオプションについては、次の投稿で取り上げる予定ですので、お楽しみに)。


  2. 確かに、この部分を dbt から削除し、クラウド機能または使用するインフラにプッシュすることは理にかなっています。同時に、これを dbt の下に保持したい場合は、この投稿が役立ちます。


  3. モデルにロジックを追加しないでください。モデルは LLM を呼び出して結果を保存するという 1 つの作業だけを行う必要があります。これにより、再実行を避けることができます。


  4. dbt プロジェクトでは多くの環境を使用している可能性が高いです。各プル リクエストで各開発環境でこのモデルを何度も実行しないように注意する必要があります。

    • これを行うには、 if dbt.config.get("target_name") == 'dev'というロジックを組み込むことができます。


  5. 区切り文字を使用した応答は不安定になる可能性があります。

    • たとえば、GPT は予想よりも少ない要素を返す可能性があり、最初のタイトルをカテゴリのリストにマッピングすることが難しくなります。

    • これを解決するには、リクエストにresponse_format={ "type": "json_object" }を追加して、JSON 出力を要求します。公式ドキュメントを参照してください。

    • JSON 出力を使用すると、プロンプトで {"title": "category"} の形式で回答を入力するように要求し、それを初期値にマッピングできます。

    • 応答サイズが大きくなるため、コストが高くなることに注意してください。

    • 不思議なことに、GPT 3.5 Turbo を JSON に切り替えると、分類の品質が劇的に低下しました。


  6. Snowflake にはcortex.complete()関数を使用する代替手段があります。dbt ブログの Joel Labes による素晴らしい投稿をご覧ください。


以上です!ご意見をお聞かせください。

リンク

GitHub の完全なコード:リンク

Tableau Public ダッシュボード: リンク

TidyTuesday Rデータセット:リンク