总结
您可以使用 LLM 自动解决文本数据的典型自然语言处理任务(分类、情绪分析等),每 100 万行只需 10 美元(具体取决于任务和模型),并且只需在您的 dbt 环境中即可。说明、详细信息和代码如下
如果您使用 dbt 作为转换层,您可能会遇到需要从非结构化文本数据中提取有意义的信息的情况。此类数据可能包括客户评论、标题、描述、Google Analytics 来源/媒介等。您可能希望将它们分类或获取情绪和语气。
潜在的解决方案包括
随着 Python dbt 模型的发展,还有一个解决方案:您可以将这些自然语言处理任务作为 dbt 模型之一保存在您的 dbt 环境中。
如果这对您有帮助,请参阅下面有关如何在 dbt 项目中使用 OpenAI API 的分步指南。您可以在您的环境中重现本指南中的所有内容,并使用来自 GitHub 存储库的代码和数据示例(请参阅末尾的链接)。
如果您已经有 dbt 项目和数据,或者不想重现结果,请跳至 (4) 或完全跳过此部分。否则,您将需要以下内容:
设置 dbt 项目。官方文档
您可以简单地从GitHub克隆我为本指南准备的那个。
不要忘记创建/更新您的profiles.yml文件。
设置数据库。我使用了 Snowflake。不幸的是,没有免费版本,但他们提供了30 天的免费试用版
目前,dbt Python 模型仅适用于 Snowflake、Databricks 和 BigQuery(不支持 PostgreSQL)。因此,本教程应该适用于其中任何一种,尽管某些细节可能会有所不同
准备源数据
获取 OpenAI API 密钥
按照官方文档中的快速入门说明进行操作。
注意:它不是免费的,但它是按需付费的。因此,使用 10 行测试数据集,您在实验期间不会支付超过 1 美元的费用。
为了格外小心,请设定消费限额。
在 Snowflake 中设置外部访问集成
首先,如果你正在解决分类任务,你需要在 LLM 提示中使用类别(又称类)。基本上,你会说:“我有这些类别的列表,你能定义一下这篇文章属于哪一个类别吗?”
这里有一些选项:
手动创建预定义类别列表
如果您需要稳定且可预测的类别,它就很适合。
不要忘记在这里添加“其他”,这样 LLM 在不确定时就会有这些选项。
在提示中要求 LLM 在使用“其他”类别时建议一个类别名称。
将预定义列表上传到数据库的原始层或作为 dbt 项目中的 CSV 上传(利用dbt seed
)。
将您的数据样本输入到 LLM,并要求其提出 N 个类别。
与前一种方法相同,但是我们正在获得列表方面的帮助。
如果您使用 GPT,最好在这里使用种子来实现可重复性。
不再需要预先定义的类别,让 LLM 随时完成工作。
这可能会导致不太可预测的结果。
同时,如果您对随机性有一定程度的接受,那么这就足够了。
在 GPT 用例中,最好将温度设置为 0,以避免在需要重新运行时出现不同的结果。
在这篇博文中,我将选择第三种选择。
现在,让我们进入这篇文章的核心部分,创建一个 dbt 模型,它将从上游表中获取新的文本数据,将其提供给 OpenAI API,并将类别保存到表中。
如上所述,我将使用 R 包数据集。R 是一种在数据分析中非常流行的编程语言。此数据集包含来自 CRAN 项目的 R 包的信息,例如版本、许可证、作者、标题、描述等。我们对title
字段感兴趣,因为我们将根据每个包的标题为其创建一个类别。
准备模型的基础
dbt 配置可以通过dbt.config(...)
方法传递。
dbt.config 中还有其他参数,例如, packages
是一个包要求。
dbt Python 模型可以引用上游模型dbt.ref('...')
或dbt.source('...')
它必须返回一个 DataFrame。您的数据库将把它保存为一个表。
import os import openai import pandas as pd COL_TO_CATEGORIZE = 'title' def model(dbt, session): import _snowflake dbt.config( packages=['pandas', 'openai'], ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) return df
连接到 OpenAI API
我们需要将secrets
和external_access_integrations
传递给 dbt.config。它将包含存储在 Snowflake External Access Integration 中的 secret 引用。
注意:此功能几天前才发布,并且仅在 beta dbt 版本 1.8.0-b3 中可用
dbt.config( packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), )
使 dbt 模型增量,并关闭完全刷新。
dbt run
时,您都会将完整数据发送给 OpenAI,一天可能会发送几次。materialized='incremental'
, incremental_strategy='append'
, full_refresh = False
dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) if dbt.is_incremental: pass
添加增量逻辑
dbt.this
即可做到这一点。类似于普通的增量模型。 if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :]
批量调用OpenAI API
max_tokens
约束。 BATCH_SIZE = 5 n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True)
是时候讨论一下法学硕士的提示了。这就是我得到的:
您将获得一个 CRAN R 软件包标题列表,这些标题用“|”符号分隔。每个标题都归类。仅返回用“|”符号分隔的类别名称。
最终的 dbt 模型代码
import os import openai import pandas as pd SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign. ''' COL_TO_CATEGORIZE = 'title' BATCH_SIZE = 5 def model(dbt, session): import _snowflake dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :] n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True) return df
OpenAI API 定价在此处列出。他们根据请求和返回的 token 数量收费。token 是与请求中的字符数相关的实例。有一些开源软件包可以评估给定文本的 token 数量。例如Tiktoken 。如果您想手动评估它,可以去这里的官方 OpenAI tokenizer。
在我们的数据集中,有约 18K 个标题。大致相当于 320K 个输入标记(如果我们使用批处理大小 = 5,则为 180K 个标题和 140K 个系统提示)和 50K 个输出标记。根据模型,完整扫描的成本将是:
GPT-4 Turbo
: 4.7 美元。定价:输入:10 美元/100 万个代币;输出:30 美元/100 万个代币。GPT-4
: 12.6 美元。定价:输入:30 美元/100 万个代币;输出:60 美元/100 万个代币。GPT-3.5 Turbo
: 0.2 美元。定价:输入:0.5 美元/100 万个代币;输出:1.5 美元/100 万个代币。dbt 模型非常有效。我成功地将所有 18K 封装进行了无间隙分类。事实证明,该模型具有成本效益,并且能够抵御多次 dbt 运行。
我在这里将结果仪表板发布到了 Tableau Public。您可以随意使用它、下载数据并在其上创建任何您想要的内容。
我发现了一些有趣的细节:
Data Visualization
(1,190 个包,占 6%)。我想这证明了 R 作为可视化工具的受欢迎程度,尤其是 Shiny、Plotly 等包。
Data Import
和Data Processing
。听起来 R 开始更多地用作数据处理工具。
Natural Language Processing
。这是著名论文《注意力就是你所需要的一切》发表两年后,GPT-1 发布半年后 :)我们可以使用一种替代方法——GPT嵌入。
它便宜多了。
但是工程量更大,因为您应该自己完成分类部分(请继续关注,因为我将在下一篇文章中探讨这个选项)。
当然,从 dbt 中删除此部分并将其推送到云函数或您使用的任何基础设施是有意义的。同时,如果您想将其保留在 dbt 下 — 这篇文章可以满足您的要求。
避免在模型中添加任何逻辑。它应该只做一件事——调用 LLM 并保存结果。这将帮助你避免重新运行它。
您很可能在 dbt 项目中使用了许多环境。您需要注意并避免在每个 Pull 请求的每个开发人员环境中反复运行此模型。
为此,你可以将逻辑与if dbt.config.get("target_name") == 'dev'
结合起来
带有分隔符的响应可能不稳定。
例如,GPT 返回的元素可能比您预期的要少,并且很难将初始标题映射到类别列表。
为了解决这个问题,请在请求中添加response_format={ "type": "json_object" }
以要求 JSON 输出。请参阅官方文档。
使用 JSON 输出,您可以要求提示以 {"title": "category"} 格式提供答案,然后将其映射到您的初始值。
请注意,它会更加昂贵,因为它会增加响应大小。
奇怪的是,当我将 GPT 3.5 Turbo 切换到 JSON 时,分类质量急剧下降。
Snowflake 中还有另一种方法——使用cortex.complete()函数。请查看 Joel Labes 在 dbt 博客上发表的一篇精彩文章。
就是这样!请告诉我你的想法。
GitHub 上的完整代码:链接
Tableau Public 仪表板: 链接
TidyTuesday R 数据集:链接