Встраивание данных, хранящихся в таблице PostgreSQL , несомненно, полезно – от систем семантического поиска и рекомендаций до генеративных приложений искусственного интеллекта и генерации с расширенным поиском. Но создание и управление встраиваниями данных в таблицах PostgreSQL может оказаться сложной задачей, при этом необходимо учитывать множество факторов и крайних случаев, например, поддержание актуальности встраивания при обновлении и удалении таблиц, обеспечение устойчивости к сбоям и влияние на существующие системы, зависящие от Таблица.
В этом сообщении блога мы обсудим технические проектные решения и компромиссы, на которые мы пошли при создании PgVectorizer , чтобы обеспечить простоту, отказоустойчивость и высокую производительность. Мы также обсудим альтернативные конструкции, если вы захотите создать свою собственную.
Давайте перейдем к этому.
Для начала давайте опишем, как будет работать система, которую мы строим. Не стесняйтесь пропустить этот раздел, если вы уже прочитали
В качестве наглядного примера мы будем использовать простое приложение блога, хранящее данные в PostgreSQL, используя таблицу, определенную следующим образом:
CREATE TABLE blog ( id SERIAL PRIMARY KEY NOT NULL, title TEXT NOT NULL, author TEXT NOT NULL, contents TEXT NOT NULL, category TEXT NOT NULL, published_time TIMESTAMPTZ NULL --NULL if not yet published );
Мы хотим создать встраивания в содержимое сообщения блога, чтобы мы могли позже использовать его для семантического поиска и расширенной генерации извлечения мощности. Вложения должны существовать и быть доступными для поиска только для опубликованных блогов (где published_time
NOT NULL
).
При создании этой системы внедрения мы смогли определить ряд целей, которые должна преследовать любая простая и отказоустойчивая система, создающая внедрения:
Никаких изменений в исходной таблице. Это позволяет не влиять на системы и приложения, которые уже используют эту таблицу, изменения во встроенной системе. Это особенно важно для устаревших систем.
Никаких изменений в приложениях, взаимодействующих с таблицей. Изменение кода, изменяющего таблицу, может оказаться невозможным для устаревших систем. Это также плохой дизайн программного обеспечения, поскольку он объединяет системы, не использующие встраивания, с кодом, генерирующим встраивание.
Автоматически обновлять внедрения при изменении строк в исходной таблице (в данном случае в таблице блога). Это уменьшает нагрузку на обслуживание и способствует бесперебойной работе программного обеспечения. В то же время это обновление не обязательно должно быть мгновенным или в рамках одного и того же коммита. Для большинства систем «конечная согласованность» вполне подходит.
Обеспечьте устойчивость к сбоям сети и служб. Большинство систем генерируют внедрения посредством вызова внешней системы, такой как API OpenAI. В сценариях, когда внешняя система вышла из строя или произошел сбой в сети, крайне важно, чтобы остальная часть вашей системы базы данных продолжала работать.
Эти рекомендации легли в основу надежной архитектуры, которую мы реализовали с помощью
Вот архитектура, на которой мы остановились:
В этом проекте мы сначала добавляем в таблицу блога триггер, который отслеживает изменения, и, увидев изменение, вставляет задание в таблицу blog_work_queue, которое указывает, что строка в таблице блога устарела из-за ее внедрения.
По фиксированному расписанию задание создателя внедрений опрашивает таблицу blog_work_queue и, если находит работу, выполняет в цикле следующее:
Чтобы увидеть эту систему в действии, посмотрите пример использования
Возвращаясь к примеру с таблицей нашего приложения для блога, на высоком уровне PgVectorizer должен делать две вещи:
Отслеживайте изменения в строках блога, чтобы знать, какие строки изменились.
Предоставьте метод для обработки изменений для создания внедрений.
Оба из них должны быть высококонкурентными и производительными. Давайте посмотрим, как это работает.
Вы можете создать простую таблицу очереди работ, используя следующее:
CREATE TABLE blog_embedding_work_queue ( id INT ); CREATE INDEX ON blog_embedding_work_queue(id);
Это очень простая таблица, но есть одно замечание: у этой таблицы нет уникального ключа. Это было сделано для того, чтобы избежать проблем с блокировкой при обработке очереди, но это означает, что у нас могут быть дубликаты. Мы обсудим этот компромисс позже в Альтернативе 1 ниже.
Затем вы создаете триггер для отслеживания любых изменений, внесенных в blog
:
CREATE OR REPLACE FUNCTION blog_wq_for_embedding() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN IF (TG_OP = 'DELETE') THEN INSERT INTO blog_embedding_work_queue VALUES (OLD.id); ELSE INSERT INTO blog_embedding_work_queue VALUES (NEW.id); END IF; RETURN NULL; END; $$; CREATE TRIGGER track_changes_for_embedding AFTER INSERT OR UPDATE OR DELETE ON blog FOR EACH ROW EXECUTE PROCEDURE blog_wq_for_embedding(); INSERT INTO blog_embedding_work_queue SELECT id FROM blog WHERE published_time is NOT NULL;
Триггер вставляет идентификатор блога, который изменился, в blog_work_queue. Мы устанавливаем триггер, а затем вставляем все существующие блоги в work_queue. Этот порядок важен, чтобы гарантировать, что никакие идентификаторы не будут удалены.
Теперь давайте опишем некоторые альтернативные конструкции и почему мы от них отказались.
Введение этого ключа устранит проблему дублирования записей. Однако здесь не обошлось без проблем, особенно потому, что такой ключ заставил бы нас использовать предложение INSERT…ON CONFLICT DO NOTHING
для вставки новых идентификаторов в таблицу, а это предложение блокирует идентификатор в B-дереве.
Вот дилемма: на этапе обработки необходимо удалить обрабатываемые строки, чтобы предотвратить одновременную обработку. Тем не менее, зафиксировать это удаление можно только после того, как соответствующее встраивание было помещено в blog_embeddings. Это гарантирует, что никакие идентификаторы не будут потеряны, если произойдет сбой на полпути — скажем, если создание внедрения произойдет сбой после удаления, но до того, как внедрение будет записано.
Теперь, если мы создадим уникальный или первичный ключ, транзакция, контролирующая удаление, останется открытой. Следовательно, это действует как блокировка этих конкретных идентификаторов, предотвращая их вставку обратно в blog_work_queue на протяжении всего задания создания внедрения. Учитывая, что создание вложений занимает больше времени, чем обычная транзакция базы данных, это создает проблемы. Блокировка приведет к остановке триггера для основной таблицы «блога», что приведет к падению производительности основного приложения. Что еще хуже, при пакетной обработке нескольких строк потенциальной проблемой также становятся взаимоблокировки.
Однако потенциальные проблемы, возникающие из-за случайных повторяющихся записей, можно устранить на этапе обработки, как показано ниже. Спорадические дубликаты здесь и там не являются проблемой, поскольку они лишь незначительно увеличивают объем работы, выполняемой при внедрении. Это, безусловно, более приемлемо, чем бороться с вышеупомянутыми проблемами блокировки.
Например, мы могли бы добавить embedded
логический столбец, для которого при изменении установлено значение false, а при создании внедрения оно будет изменено на true. Есть три причины отказаться от этой конструкции:
Мы не хотим изменять таблицу blog
по причинам, которые мы уже упомянули выше.
Для эффективного получения списка невстроенных блогов потребуется дополнительный индекс (или частичный индекс) в таблице блогов. Это замедлит выполнение других операций.
Это увеличивает текучесть таблицы, поскольку каждое изменение теперь будет записываться дважды (один раз с embedding=false и один раз с embedding=true) из-за MVCC-природы PostgreSQL.
Эти проблемы решает отдельная таблица work_queue_table.
У этого подхода есть несколько проблем:
Если служба внедрения не работает, либо триггер должен дать сбой (прервать транзакцию), либо вам необходимо создать резервный путь кода, который… хранит идентификаторы, которые не удалось внедрить в очередь. Последнее решение возвращает нас к предложенной нами конструкции, но с большей сложностью сверху.
Этот триггер, вероятно, будет выполняться намного медленнее, чем остальные операции с базой данных, из-за задержки, необходимой для связи с внешней службой. Это замедлит остальные операции вашей базы данных с таблицей.
Это заставляет пользователя писать код внедрения создания непосредственно в базе данных. Учитывая, что языком искусственного интеллекта является Python и что для создания встраивания часто требуется множество других библиотек, это не всегда легко или даже возможно (особенно при работе в размещенной облачной среде PostgreSQL). Гораздо лучше иметь проект, в котором у вас есть возможность создавать встраивания внутри или вне базы данных.
Теперь у нас есть список блогов, которые необходимо встроить, давайте его обработаем!
Существует множество способов создания вложений. Мы рекомендуем использовать внешний скрипт Python. Этот сценарий просканирует рабочую очередь и связанные сообщения в блоге, вызовет внешнюю службу для создания внедрений, а затем сохранит эти внедрения обратно в базу данных. Наши аргументы в пользу этой стратегии заключаются в следующем:
Выбор Python : мы рекомендуем Python , поскольку он предлагает богатую, не имеющую себе равных экосистему для задач ИИ по работе с данными, подчеркнутую мощными разработками LLM и библиотеками данных, такими как
Выбор внешнего сценария вместо PL/Python . Мы хотели, чтобы пользователи могли контролировать способ внедрения своих данных. Однако в то же время многие поставщики облачных услуг Postgres не разрешают выполнение произвольного кода Python внутри базы данных из соображений безопасности. Итак, чтобы предоставить пользователям гибкость как в сценариях внедрения, так и в выборе места размещения базы данных, мы выбрали дизайн, в котором использовались внешние сценарии Python.
Задания должны быть одновременно производительными и безопасными для параллелизма. Параллелизм гарантирует, что если задания начнут отставать, планировщики смогут запустить больше заданий, чтобы помочь системе наверстать упущенное и справиться с нагрузкой.
Позже мы рассмотрим, как настроить каждый из этих методов, но сначала давайте посмотрим, как будет выглядеть сценарий Python. По сути, сценарий состоит из трех частей:
Прочитайте очередь работ и сообщение в блоге
Создайте вложение для публикации в блоге
Запишите вложение в таблицу blog_embedding.
Шаги 2 и 3 выполняются обратным вызовом embed_and_write
, который мы определяем в
Сначала мы покажем вам код, а затем выделим ключевые элементы:
def process_queue(embed_and_write_cb, batch_size:int=10): with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(f""" SELECT to_regclass('blog_embedding_work_queue')::oid; """) table_oid = cursor.fetchone()[0] cursor.execute(f""" WITH selected_rows AS ( SELECT id FROM blog_embedding_work_queue LIMIT {int(batch_size)} FOR UPDATE SKIP LOCKED ), locked_items AS ( SELECT id, pg_try_advisory_xact_lock( {int(table_oid)}, id) AS locked FROM ( SELECT DISTINCT id FROM selected_rows ORDER BY id ) as ids ), deleted_rows AS ( DELETE FROM blog_embedding_work_queue WHERE id IN ( SELECT id FROM locked_items WHERE locked = true ORDER BY id ) ) SELECT locked_items.id as locked_id, {self.table_name}.* FROM locked_items LEFT JOIN blog ON blog.id = locked_items.id WHERE locked = true ORDER BY locked_items.id """) res = cursor.fetchall() if len(res) > 0: embed_and_write_cb(res) return len(res) process_queue(embed_and_write)
Код SQL в приведенном выше фрагменте является тонким, поскольку он спроектирован так, чтобы быть одновременно производительным и безопасным для параллелизма, поэтому давайте рассмотрим его:
Получение элементов из рабочей очереди . Первоначально система извлекает из рабочей очереди определенное количество записей, определяемое параметром размера пакетной очереди. Блокировка FOR UPDATE используется для того, чтобы гарантировать, что одновременно выполняемые сценарии не пытаются обрабатывать одни и те же элементы очереди. Директива SKIP LOCKED гарантирует, что если какая-либо запись в данный момент обрабатывается другим скриптом, система пропустит ее вместо ожидания, избегая ненужных задержек.
Блокировка идентификаторов блогов : из-за возможности дублирования записей для одного и того же blog_id в таблице рабочей очереди простой блокировки указанной таблицы недостаточно. Одновременная обработка одного и того же идентификатора разными заданиями была бы вредной. Рассмотрим следующее потенциальное состояние гонки:
Задание 1 инициирует блог и обращается к нему, получая версию 1.
Происходит внешнее обновление блога.
Впоследствии начинается задание 2 с получением версии 2.
Оба задания начинают процесс генерации встраивания.
Задача 2 завершается сохранением встраивания, соответствующего версии блога 2.
По завершении задание 1 ошибочно перезаписывает встраивание версии 2 устаревшей версией 1.
Эту проблему можно решить, введя явное отслеживание версий, но это значительно усложняет работу без увеличения производительности. Стратегия, которую мы выбрали, не только смягчает эту проблему, но также предотвращает избыточные операции и напрасную работу за счет одновременного выполнения сценариев.
Используется консультативная блокировка Postgres с префиксом идентификатора таблицы, чтобы избежать потенциального дублирования с другими подобными блокировками. Вариант try
, аналогичный предыдущему применению SKIP LOCKED, гарантирует, что система избежит ожидания блокировки. Включение предложения ORDER BY blog_id помогает предотвратить возможные взаимоблокировки. Ниже мы рассмотрим некоторые альтернативы.
Очистка рабочей очереди . Затем сценарий удаляет все элементы рабочей очереди для блогов, которые мы успешно заблокировали. Если эти элементы очереди видны через Multi-Version Concurrency Control (MVCC), их обновления отображаются в полученной строке блога. Обратите внимание, что мы удаляем все элементы с данным идентификатором блога, а не только элементы, прочитанные при выборе строк: это эффективно обрабатывает повторяющиеся записи для одного и того же идентификатора блога. Очень важно отметить, что это удаление фиксируется только после вызова функции embed_and_write() и последующего сохранения обновленного внедрения. Эта последовательность гарантирует, что мы не потеряем никаких обновлений, даже если сценарий выйдет из строя на этапе генерации внедрения.
Обработка блогов. На последнем этапе мы получаем блоги для обработки. Обратите внимание на использование левого соединения: оно позволяет нам получать идентификаторы блогов для удаленных элементов, у которых не будет строки блога. Нам нужно отслеживать эти элементы, чтобы удалить их встраивания. В обратном вызове embed_and_write
мы используем значение public_time, равное NULL, в качестве индикатора удаляемого блога (или неопубликованного, и в этом случае мы также хотим удалить встраивание).
Если система уже использует консультативные блокировки и вы опасаетесь коллизий, можно использовать таблицу с идентификатором блога в качестве первичного ключа и заблокировать строки. Фактически, это может быть сама таблица блога, если вы уверены, что эти блокировки не замедлят работу какой-либо другой системы (помните, что эти блокировки необходимо удерживать на протяжении всего процесса внедрения, который может занять некоторое время).
В качестве альтернативы вы можете использовать таблицу blog_embedding_locks специально для этой цели. Мы не предлагали создавать эту таблицу, поскольку считаем, что она может оказаться весьма расточительной с точки зрения пространства, а использование консультативных блокировок позволяет избежать этих накладных расходов.
В этом сообщении блога мы заглянули за кулисы того, как мы создали систему, которая может похвастаться отказоустойчивостью и эффективно справляется с потенциальными простоями службы внедрения генерации. Его конструкция позволяет управлять высокой скоростью изменения данных и может беспрепятственно использовать параллельные процессы генерации внедрений для удовлетворения повышенных нагрузок.
Более того, парадигма передачи данных в PostgreSQL и использования базы данных для управления генерацией встраивания в фоновом режиме становится простым механизмом контроля над обслуживанием встраивания при изменении данных. Множество демонстраций и учебных пособий в области ИИ сосредоточены исключительно на первоначальном создании данных из документов, упуская из виду сложные нюансы, связанные с сохранением синхронизации данных по мере их развития.
Однако в реальных производственных средах данные неизменно изменяются, и преодоление сложностей отслеживания и синхронизации этих изменений — нетривиальная задача. Но именно для этого и предназначена база данных! Почему бы просто не использовать его?
Автор Матвей Арье.
Также опубликовано здесь.