paint-brush
如何使用大型语言模型增强你的 dbt 项目经过@klimmy
502 讀數
502 讀數

如何使用大型语言模型增强你的 dbt 项目

经过 Kliment Merzlyakov15m2024/06/02
Read on Terminal Reader

太長; 讀書

您可以使用 LLM 自动解决文本数据的典型自然语言处理任务(分类、情绪分析等),每 100 万行只需 10 美元(具体取决于任务和模型),并且只需在您的 dbt 环境中即可。说明、详细信息和代码如下
featured image - 如何使用大型语言模型增强你的 dbt 项目
Kliment Merzlyakov HackerNoon profile picture
0-item
1-item



总结

您可以使用 LLM 自动解决文本数据的典型自然语言处理任务(分类、情绪分析等),每 100 万行只需 10 美元(具体取决于任务和模型),并且只需在您的 dbt 环境中即可。说明、详细信息和代码如下


如果您使用 dbt 作为转换层,您可能会遇到需要从非结构化文本数据中提取有意义的信息的情况。此类数据可能包括客户评论、标题、描述、Google Analytics 来源/媒介等。您可能希望将它们分类或获取情绪和语气。


潜在的解决方案包括

  • 在 dbt 流程之外应用机器学习模型(或调用 LLM)
  • 使用 CASE WHEN 语句在 dbt 模型中定义简单分类
  • 预先定义类别,然后将其上传到原始数据库层或利用 dbt 种子功能


随着 Python dbt 模型的发展,还有一个解决方案:您可以将这些自然语言处理任务作为 dbt 模型之一保存在您的 dbt 环境中。


如果这对您有帮助,请参阅下面有关如何在 dbt 项目中使用 OpenAI API 的分步指南。您可以在您的环境中重现本指南中的所有内容,并使用来自 GitHub 存储库的代码和数据示例(请参阅末尾的链接)。

设置环境

如果您已经有 dbt 项目和数据,或者不想重现结果,请跳至 (4) 或完全跳过此部分。否则,您将需要以下内容:


  1. 设置 dbt 项目官方文档

    1. 您可以简单地从GitHub克隆我为本指南准备的那个。

    2. 不要忘记创建/更新您的profiles.yml文件。


  2. 设置数据库。我使用了 Snowflake。不幸的是,没有免费版本,但他们提供了30 天的免费试用版

    1. 目前,dbt Python 模型仅适用于 Snowflake、Databricks 和 BigQuery(不支持 PostgreSQL)。因此,本教程应该适用于其中任何一种,尽管某些细节可能会有所不同


  3. 准备源数据

    1. 作为数据集,我使用了在 TidyTuesday 存储库中发布的 R 包元数据。

      1. 你可以从这里下载。数据集的详细信息在这里
      2. 或者,你可以使用我这里存储库中的轻量级版本
    2. 将其上传到您的数据库。

    3. 更新 dbt 项目中的source.yml文件以匹配您的数据库和模式名称。


  4. 获取 OpenAI API 密钥

    1. 按照官方文档中的快速入门说明进行操作。

    2. 注意:它不是免费的,但它是按需付费的。因此,使用 10 行测试数据集,您在实验期间不会支付超过 1 美元的费用。

    3. 为了格外小心,请设定消费限额。


  5. 在 Snowflake 中设置外部访问集成

    1. 这仅适用于您使用 Snowflake 的情况。
    2. 如果不这样做,dbt Python 模型就无法访问互联网上的任何 API(包括 OpenAI API)。
    3. 按照官方说明进行操作。
    4. 将 OpenAI API 密钥存储在此集成中。

列出类别列表

首先,如果你正在解决分类任务,你需要在 LLM 提示中使用类别(又称类)。基本上,你会说:“我有这些类别的列表,你能定义一下这篇文章属于哪一个类别吗?”


这里有一些选项:

  1. 手动创建预定义类别列表

    1. 如果您需要稳定且可预测的类别,它就很适合。

    2. 不要忘记在这里添加“其他”,这样 LLM 在不确定时就会有这些选项。

    3. 在提示中要求 LLM 在使用“其他”类别时建议一个类别名称。

    4. 将预定义列表上传到数据库的原始层或作为 dbt 项目中的 CSV 上传(利用dbt seed )。


  2. 将您的数据样本输入到 LLM,并要求其提出 N 个类别。

    1. 与前一种方法相同,但是我们正在获得列表方面的帮助。

    2. 如果您使用 GPT,最好在这里使用种子来实现可重复性。


  3. 不再需要预先定义的类别,让 LLM 随时完成工作。

    1. 这可能会导致不太可预测的结果。

    2. 同时,如果您对随机性有一定程度的接受,那么这就足够了。

    3. 在 GPT 用例中,最好将温度设置为 0,以避免在需要重新运行时出现不同的结果。


在这篇博文中,我将选择第三种选择。

创建 dbt Python 模型来调用 OpenAI API

现在,让我们进入这篇文章的核心部分,创建一个 dbt 模型,它将从上游表中获取新的文本数据,将其提供给 OpenAI API,并将类别保存到表中。


如上所述,我将使用 R 包数据集。R 是一种在数据分析中非常流行的编程语言。此数据集包含来自 CRAN 项目的 R 包的信息,例如版本、许可证、作者、标题、描述等。我们对title字段感兴趣,因为我们将根据每个包的标题为其创建一个类别。


  1. 准备模型的基础

    • dbt 配置可以通过dbt.config(...)方法传递。


    • dbt.config 中还有其他参数,例如, packages是一个包要求。


    • dbt Python 模型可以引用上游模型dbt.ref('...')dbt.source('...')


    • 它必须返回一个 DataFrame。您的数据库将把它保存为一个表。


     import os import openai import pandas as pd COL_TO_CATEGORIZE = 'title' def model(dbt, session): import _snowflake dbt.config( packages=['pandas', 'openai'], ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) return df
  2. 连接到 OpenAI API

    • 我们需要将secretsexternal_access_integrations传递给 dbt.config。它将包含存储在 Snowflake External Access Integration 中的 secret 引用。


    • 注意:此功能几天前才发布,并且仅在 beta dbt 版本 1.8.0-b3 中可用

    dbt.config( packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), )
  3. 使 dbt 模型增量,并关闭完全刷新。

    • 这部分对于保持 OpenAI API 成本较低至关重要。
    • 这将防止对相同的文本进行多次分类。
    • 否则,每次执行dbt run时,您都会将完整数据发送给 OpenAI,一天可能会发送几次。
    • 我们正在向 dbt.config 添加materialized='incremental' , incremental_strategy='append' , full_refresh = False
    • 现在,完整扫描将仅针对第一次 dbt 运行,而对于后续运行(无论是增量还是完全刷新),它将仅对增量进行分类。
    • 如果您愿意格外小心,您可以对数据进行一些预处理以减少唯一条目的数量,但要避免过度预处理,因为 LLM 更适合处理自然语言。
     dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) if dbt.is_incremental: pass


  4. 添加增量逻辑

    • 在增量运行中(由于我们的设置,这意味着除了第一次运行之外的任何运行),我们需要删除所有已经分类的标题。
    • 我们只需使用dbt.this即可做到这一点。类似于普通的增量模型。
     if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
  5. 批量调用OpenAI API

    • 为了降低成本,最好分批将数据发送到 OpenAI API。
    • 系统提示可能比我们需要分类的文本大 5 倍。如果我们为每个标题单独发送系统提示,那么重复的事情将导致更高的 token 使用率。
    • 但是,批次不应该太大。如果批次太大,GPT 的结果就会变得不稳定。根据我的实验,批次大小 = 5 就足够了。
    • 此外,为了确保响应不超过相关大小,我添加了max_tokens约束。
     BATCH_SIZE = 5 n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True)


  6. 是时候讨论一下法学硕士的提示了。这就是我得到的:

您将获得一个 CRAN R 软件包标题列表,这些标题用“|”符号分隔。每个标题都归类。仅返回用“|”符号分隔的类别名称。


  • 保持指令直入主题。
  • 使用“”技术来避免SQL注入。
  • 明确结果格式。在我的例子中,我要求使用“|”作为输入和输出的分隔符


  1. 最终的 dbt 模型代码

    import os import openai import pandas as pd SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign. ''' COL_TO_CATEGORIZE = 'title' BATCH_SIZE = 5 def model(dbt, session): import _snowflake dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :] n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True) return df

成本估算

OpenAI API 定价在此处列出。他们根据请求和返回的 token 数量收费。token 是与请求中的字符数相关的实例。有一些开源软件包可以评估给定文本的 token 数量。例如Tiktoken 。如果您想手动评估它,可以去这里的官方 OpenAI tokenizer。


在我们的数据集中,有约 18K 个标题。大致相当于 320K 个输入标记(如果我们使用批处理大小 = 5,则为 180K 个标题和 140K 个系统提示)和 50K 个输出标记。根据模型,完整扫描的成本将是:


  1. GPT-4 Turbo4.7 美元。定价:输入:10 美元/100 万个代币;输出:30 美元/100 万个代币。
  2. GPT-412.6 美元。定价:输入:30 美元/100 万个代币;输出:60 美元/100 万个代币。
  3. GPT-3.5 Turbo0.2 美元。定价:输入:0.5 美元/100 万个代币;输出:1.5 美元/100 万个代币。

结果

dbt 模型非常有效。我成功地将所有 18K 封装进行了无间隙分类。事实证明,该模型具有成本效益,并且能够抵御多次 dbt 运行。


在这里将结果仪表板发布到了 Tableau Public。您可以随意使用它、下载数据并在其上创建任何您想要的内容。

我发现了一些有趣的细节:


  • 排名第一的类别是Data Visualization (1,190 个包,占 6%)。我想这证明了 R 作为可视化工具的受欢迎程度,尤其是 Shiny、Plotly 等包。


  • 2023 年增长最快的两个类别是Data ImportData Processing 。听起来 R 开始更多地用作数据处理工具。


  • 前 30 个类别中同比增长最大的是 2019 年的Natural Language Processing 。这是著名论文《注意力就是你所需要的一切》发表两年后,GPT-1 发布半年后 :)

进一步的想法

  1. 我们可以使用一种替代方法——GPT嵌入

    • 它便宜多了。

    • 但是工程量更大,因为您应该自己完成分类部分(请继续关注,因为我将在下一篇文章中探讨这个选项)。


  2. 当然,从 dbt 中删除此部分并将其推送到云函数或您使用的任何基础设施是有意义的。同时,如果您想将其保留在 dbt 下 — 这篇文章可以满足您的要求。


  3. 避免在模型中添加任何逻辑。它应该只做一件事——调用 LLM 并保存结果。这将帮助你避免重新运行它。


  4. 您很可能在 dbt 项目中使用了许多环境。您需要注意并避免在每个 Pull 请求的每个开发人员环境中反复运行此模型。

    • 为此,你可以将逻辑与if dbt.config.get("target_name") == 'dev'结合起来


  5. 带有分隔符的响应可能不稳定。

    • 例如,GPT 返回的元素可能比您预期的要少,并且很难将初始标题映射到类别列表。

    • 为了解决这个问题,请在请求中添加response_format={ "type": "json_object" }以要求 JSON 输出。请参阅官方文档

    • 使用 JSON 输出,您可以要求提示以 {"title": "category"} 格式提供答案,然后将其映射到您的初始值。

    • 请注意,它会更加昂贵,因为它会增加响应大小。

    • 奇怪的是,当我将 GPT 3.5 Turbo 切换到 JSON 时,分类质量急剧下降。


  6. Snowflake 中还有另一种方法——使用cortex.complete()函数。请查看 Joel Labes 在 dbt 博客上发表的一篇精彩文章


就是这样!请告诉我你的想法。

链接

GitHub 上的完整代码:链接

Tableau Public 仪表板: 链接

TidyTuesday R 数据集:链接