ТЛ;ДР Вы можете автоматически решать типичные задачи обработки естественного языка (классификация, анализ тональности и т. д.) для ваших текстовых данных с помощью LLM всего за 10 долларов за 1 миллион строк (это зависит от задачи и модели), оставаясь в своей среде dbt. Инструкции, подробности и код приведены ниже. Если вы используете dbt в качестве слоя преобразования, у вас может возникнуть ситуация, когда вы хотите извлечь значимую информацию из неструктурированных текстовых данных. Такие данные могут включать отзывы клиентов, заголовки, описания, источники/каналы Google Analytics и т. д. Возможно, вы захотите разделить их на группы или получить настроения и тональности. Потенциальные решения могут быть Применяйте модели машинного обучения (или вызывайте LLM) вне потока dbt. Определите простые категоризации внутри моделей dbt, используя операторы CASE WHEN. Предварительно определите категории и либо загрузите их в необработанный уровень базы данных, либо используйте функцию начального значения dbt. По мере развития моделей dbt Python есть еще одно решение: вы можете сохранить эти задачи обработки естественного языка внутри своей среды dbt в качестве одной из моделей dbt. Если это может быть вам полезно, см. ниже пошаговое руководство по использованию OpenAI API в вашем проекте dbt. Вы можете воспроизвести все из этого руководства в своей среде, имея пример кода и данных из репозитория GitHub (см. ссылки в конце). Настройка среды Если у вас уже есть проект и данные dbt или вы не хотите воспроизводить результаты, перейдите к (4) или полностью пропустите этот раздел. В противном случае вам понадобится следующее: . Настройте проект dbt Официальные документы Вы можете просто клонировать тот, который я подготовил для этого руководства, с . GitHub Не забудьте создать/обновить файл Profiles.yml. . Я использовал Снежинку. К сожалению, бесплатной версии нет, но они предоставляют . Настройте базу данных 30-дневную бесплатную пробную версию В настоящее время модели Python dbt работают только со Snowflake, Databricks и BigQuery (без PostgreSQL). Итак, это руководство подойдет для любого из них, хотя некоторые детали могут отличаться. Подготовьте исходные данные В качестве набора данных я использовал метаданные пакета R, опубликованные в репозитории TidyTuesday. Вы можете скачать его . Подробности о наборе данных отсюда здесь. Альтернативно, вы можете использовать облегченную версию из моего репозитория здесь. Загрузите его в свою базу данных. Обновите файл в проекте dbt, чтобы он соответствовал именам вашей базы данных и схемы. source.yml Получите ключ API OpenAI Следуйте инструкциям по быстрому запуску из . официальной документации Нет: это не бесплатно, но с оплатой по мере использования. Таким образом, при использовании тестового набора данных из 10 строк с вас не будет взиматься плата более 1 доллара США во время экспериментов. Чтобы быть особенно осторожным, установите лимит расходов. Настройка интеграции внешнего доступа в Snowflake Это применимо только в том случае, если вы используете Snowflake. Если этого не сделать, модели dbt Python не смогут получить доступ ни к одному API в Интернете (включая API OpenAI). Следуйте . официальным инструкциям Сохраните ключ API OpenAI в этой интеграции. Придумайте список категорий Во-первых, если вы решаете задачу классификации, вам нужны категории (так называемые классы), которые будут использоваться в подсказке LLM. По сути, вы скажете: «У меня есть список этих категорий, не могли бы вы определить, к какой из них принадлежит этот текст?» Некоторые варианты здесь: Создайте список предопределенных категорий вручную Подходит, если вам нужны стабильные и предсказуемые категории. Не забудьте добавить сюда «Другие», чтобы у LLM были эти варианты, когда есть сомнения. Попросите LLM в своем приглашении предлагать название категории всякий раз, когда он использует категорию «Другие». Загрузите предопределенный список на необработанный уровень базы данных или в формате CSV в свой проект dbt (с использованием ). dbt seed Передайте образец ваших данных в LLM и попросите его придумать N категорий. Тот же подход, что и предыдущий, но мы получаем помощь со списком. Если вы используете GPT, для воспроизводимости лучше использовать начальное значение. Откажитесь от предопределенных категорий и позвольте LLM выполнять работу на ходу. Это может привести к менее предсказуемым результатам. В то же время достаточно хорошо, если у вас все в порядке с долей случайности. В случае использования GPT лучше указать температуру = 0, чтобы избежать разных результатов в случае необходимости повторного запуска. В этом сообщении блога я выберу третий вариант. Создайте модель Python dbt для вызова API OpenAI Теперь давайте перейдем к сути этого поста и создадим модель dbt, которая будет брать новые текстовые данные из восходящей таблицы, передавать их в API OpenAI и сохранять категорию в таблице. Как упоминалось выше, я собираюсь использовать набор данных пакетов R. R — очень популярный язык программирования для анализа данных. Этот набор данных содержит информацию о пакетах R из проекта CRAN, такую как версия, лицензия, автор, название, описание и т. д. Нас интересует поле , поскольку мы собираемся создать категорию для каждого пакета на основе его названия. title Подготовьте основу для модели. Конфигурацию dbt можно передать с помощью метода . dbt.config(...) В dbt.config есть дополнительные аргументы, например, — это требование пакета. packages Модель dbt Python может ссылаться на вышестоящие модели или dbt.ref('...') dbt.source('...') Он должен вернуть DataFrame. Ваша база данных сохранит его в виде таблицы. import os import openai import pandas as pd COL_TO_CATEGORIZE = 'title' def model(dbt, session): import _snowflake dbt.config( packages=['pandas', 'openai'], ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) return df Подключитесь к API OpenAI Нам нужно передать и в dbt.config. Он будет содержать секретную ссылку, которая хранится в вашей интеграции внешнего доступа Snowflake. secrets external_access_integrations Примечание: эта функция была выпущена всего несколько дней назад и доступна только в бета-версии dbt 1.8.0-b3. dbt.config( packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), ) Сделайте модель dbt инкрементной и отключите полное обновление. Эта часть необходима для поддержания низких затрат на OpenAI API. Это предотвратит категоризацию одного и того же текста несколько раз. В противном случае вы будете отправлять полные данные в OpenAI каждый раз, когда выполняете , что может происходить несколько раз в день. dbt run Мы добавляем , , в dbt.config materialized='incremental' incremental_strategy='append' full_refresh = False Теперь полное сканирование будет выполняться только для первого запуска dbt, а для последующих запусков (независимо от инкрементного или полного обновления) оно будет классифицировать только дельту. Если вы хотите быть более внимательными, вы можете немного предварительно обработать свои данные, чтобы уменьшить количество уникальных записей, но избегайте чрезмерной предварительной обработки, поскольку LLM лучше работают с естественным языком. dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) if dbt.is_incremental: pass Добавить логику приращения При добавочном прогоне (в соответствии с нашей настройкой это означает любой прогон, кроме первого) нам необходимо удалить все уже классифицированные заголовки. Мы можем сделать это, просто используя . Аналогично обычным инкрементальным моделям. dbt.this if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :] Пакетный вызов OpenAI API Чтобы сократить расходы, лучше отправлять данные в OpenAI API пакетами. Системное приглашение может быть в 5 раз больше, чем текст, который нам нужно классифицировать. Если мы отправим системное приглашение отдельно для каждого заголовка, это приведет к гораздо большему использованию токенов для повторяющихся действий. Однако партия не должна быть большой. При больших партиях GPT начинает давать менее стабильные результаты. Судя по моим экспериментам, размер партии = 5 работает достаточно хорошо. Кроме того, чтобы гарантировать, что ответ не превышает соответствующий размер, я добавил ограничение . max_tokens BATCH_SIZE = 5 n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True) Пришло время поговорить о приглашении на LLM. Вот что я получил: Вам будет предоставлен список названий пакетов CRAN R в скобках. Заголовки будут разделены знаком "|". знак. Придумайте категорию для каждого названия. Возвращать только имена категорий, разделенные знаком «|». знак. Следуйте инструкциям прямо по делу. Используйте технику ```, чтобы избежать SQL-инъекций. Четко определите формат результата. В моем случае я попросил "|" в качестве разделителя для входов и выходов Окончательный код модели dbt import os import openai import pandas as pd SYSTEM_PROMPT = '''You will be provided a list of CRAN R package titles in ``` brackets. Titles will be separated by "|" sign. Come up with a category for each title. Return only category names separated by "|" sign. ''' COL_TO_CATEGORIZE = 'title' BATCH_SIZE = 5 def model(dbt, session): import _snowflake dbt.config( materialized='incremental', incremental_strategy='append', full_refresh = False, packages=['pandas', 'openai'], secrets={'openai_key': 'openai_key', 'openai_org': 'openai_org'}, external_access_integrations=['openai_external_access_integration'], ) client = openai.OpenAI( api_key=_snowflake.get_generic_secret_string('openai_key'), organization=_snowflake.get_generic_secret_string('openai_org'), ) df = dbt.ref('package').to_pandas() df.drop_duplicates(subset=[COL_TO_CATEGORIZE], inplace=True) if dbt.is_incremental: categorized_query = f''' SELECT DISTINCT "{ COL_TO_CATEGORIZE }" AS primary_key FROM { dbt.this } WHERE "category" IS NOT NULL ''' categorized = [row.PRIMARY_KEY for row in session.sql(categorized_query).collect()] df = df.loc[~df[COL_TO_CATEGORIZE].isin(categorized), :] n_rows = df.shape[0] categories = [None for idx in range(n_rows)] for idx in range(0, n_rows, BATCH_SIZE): df_sliced = df.iloc[idx:idx+BATCH_SIZE, :] user_prompt = f'```{ "|".join(df_sliced[COL_TO_CATEGORIZE].to_list()) }```' chat_completion = client.chat.completions.create( messages=[ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': user_prompt} ], model='gpt-3.5-turbo', temperature=0, max_tokens=10*BATCH_SIZE + 2*BATCH_SIZE, ) gpt_response = chat_completion.choices[0].message.content gpt_response = [category.strip() for category in gpt_response.split('|')] categories[idx:idx + len(gpt_response)] = gpt_response df['category'] = categories df.dropna(subset=['category'], inplace=True) return df Оценка стоимости Цены на OpenAI API указаны . Они взимают плату за количество запрошенных и возвращенных токенов. Токен — это экземпляр, связанный с количеством символов в вашем запросе. Существуют пакеты с открытым исходным кодом для оценки количества токенов для данного текста. Например, . Если вы хотите оценить его вручную, вам подойдет официальный токенизатор OpenAI . здесь Тиктокен здесь В нашем наборе данных около 18 тысяч наименований. Грубо говоря, это равно 320 000 входных токенов (180 000 заголовков и 140 000 системных приглашений, если мы используем размер пакета = 5) и 50 000 выходных токенов. В зависимости от модели стоимость полного сканирования составит: : . Цены: вход: токены 10 долларов США / 1 миллион долларов США; выход: 30 долларов США / 1 миллион токенов. GPT-4 Turbo 4,7 доллара США : Цена: вход: 30 долларов США / 1 миллион токенов; выход: 60 долларов США / 1 миллион токенов. GPT-4 12,6 долларов США. : Цена: вход: 0,5 доллара США за 1 миллион токенов; выход: 1,5 доллара США / 1 миллион токенов. GPT-3.5 Turbo 0,2 доллара США. Полученные результаты Модель dbt работала как шарм. Я успешно классифицировал все пакеты 18K без каких-либо пробелов. Модель оказалась экономически эффективной и защищенной от многократного запуска DBT. Я опубликовал панель результатов в Tableau Public . Не стесняйтесь играть с ним, загружать данные и создавать на их основе все, что пожелаете. здесь Несколько интересных деталей, которые я нашел: Категория топ-1 — (1190 пакетов, или 6%). Я думаю, это доказывает популярность R как инструмента визуализации, особенно с такими пакетами, как Shiny, Plotly и другими. Data Visualization Двумя наиболее растущими категориями в 2023 году стали и . Похоже, R стал больше использоваться как инструмент обработки данных. Data Import Data Processing Самый большой годовой прирост среди топ-30 категорий зафиксирован в категории в 2019 году. Через два года после знаменитой статьи «Внимание — это все, что вам нужно» и через полгода после выпуска GPT-1 :) Natural Language Processing Дальнейшие идеи Мы можем использовать альтернативный подход — . встраивания GPT Это намного дешевле. Но это более сложная инженерная задача, поскольку классификацию придется выполнять самостоятельно (следите за обновлениями, я собираюсь изучить этот вариант в одном из следующих постов). Конечно, имеет смысл удалить эту часть из dbt и перенести ее в облачные функции или любую другую инфру, которую вы используете. В то же время, если вы хотите оставить это в тайне — этот пост поможет вам. Избегайте добавления какой-либо логики в модель. Он должен выполнить одну работу — вызвать LLM и сохранить результат. Это поможет вам избежать повторного запуска. Велика вероятность, что вы используете множество сред в своем проекте dbt. Вам необходимо быть внимательными и избегать повторного запуска этой модели в каждой среде разработки при каждом запросе на включение. Для этого вы можете включить логику с помощью if dbt.config.get("target_name") == 'dev' Ответ с разделителем может быть нестабильным. Например, GPT может возвращать меньше элементов, чем вы ожидали, и будет сложно сопоставить начальные заголовки со списком категорий. Чтобы обойти эту проблему, добавьте в свой запрос чтобы требовать вывод JSON. См. . response_format={ "type": "json_object" } официальную документацию Используя вывод JSON, вы можете запросить в командной строке ответ в формате {"title": "category"}, а затем сопоставить его с исходными значениями. Обратите внимание, что это будет дороже, так как увеличит размер ответа. Как ни странно, качество классификации резко упало, когда я перешёл на JSON для GPT 3.5 Turbo. В Snowflake есть альтернатива — использование функции . Прочтите Джоэла Лабеса в блоге dbt. cortex.complete() отличный пост Вот и все! Дайте мне знать, что вы думаете. Ссылки Полный код на GitHub: ссылка Публичная панель Tableau: ссылка Набор данных TidyTuesday R: ссылка