テーブルに格納されたデータの埋め込みは、セマンティック検索や推奨システムから、生成 AI アプリケーションや検索拡張生成に至るまで、さまざまなアプリケーションで間違いなく役立ちます。ただし、PostgreSQL テーブル内のデータのエンベディングの作成と管理は、テーブルの更新と削除でエンベディングを最新の状態に保つこと、障害に対する回復力の確保、依存関係にある既存のシステムへの影響など、多くの考慮事項や特殊なケースを考慮する必要があるため、難しい場合があります。テーブル。 PostgreSQL PostgreSQL に存在するデータの場合は、 – PostgreSQL に存在するデータ用のシンプルで復元力の高い埋め込み作成システム。 PostgreSQL データベースにデータが保存されているブログ アプリケーションを例として、Python、LangChain、Timescale Vector を使用してベクトル埋め込みを作成し、最新の状態に保つ方法を説明しました。 前回のブログ投稿では、埋め込みの作成と管理のプロセスに関するステップバイステップのガイドを詳しく説明しました。 PgVectorizer このブログ投稿では、 を構築する際に、シンプルさ、復元力、および高いパフォーマンスを確保するために行った技術的な設計上の決定とトレードオフについて説明します。独自のデザインを作成したい場合の代替デザインについても説明します。 PgVectorizer さあ、飛び込みましょう。 PostgreSQL データ用の高性能ベクタライザー (PgVectorizer) の設計 まず、構築しているシステムがどのように機能するかを説明します。すでに読んでいる場合は、このセクションをスキップしてください。 。 PgVectorizer の投稿 システム概要 わかりやすい例として、次のように定義されたテーブルを使用して PostgreSQL にデータを保存する単純なブログ アプリケーションを使用します。 CREATE TABLE blog ( id SERIAL PRIMARY KEY NOT NULL, title TEXT NOT NULL, author TEXT NOT NULL, contents TEXT NOT NULL, category TEXT NOT NULL, published_time TIMESTAMPTZ NULL --NULL if not yet published ); ブログ投稿のコンテンツに埋め込みを作成して、後でセマンティック検索とパワー取得拡張生成に使用できるようにしたいと考えています。埋め込みは、公開されたブログ ( 場合) にのみ存在し、検索可能である必要があります。 published_time NOT NULL このエンベディング システムを構築する際、エンベディングを作成する簡単で復元力のあるシステムが持つべき、次のような多くの目標を特定することができました。 これにより、このテーブルをすでに使用しているシステムとアプリケーションは、埋め込みシステムへの変更による影響を受けなくなります。これはレガシー システムにとって特に重要です。 元のテーブルへの変更はありません。 テーブルを変更するコードを変更する必要は、従来のシステムでは不可能な場合があります。また、エンベディングを使用しないシステムとエンベディングを生成するコードを結合するため、ソフトウェア設計としても不適切です。 テーブルと対話するアプリケーションには変更はありません。 ソーステーブル (この場合はブログテーブル) の行が変更されたときに、 。メンテナンスの負担を軽減し、安心なソフトウェアの実現に貢献します。同時に、この更新は瞬時に行われる必要も、同じコミット内で行われる必要もありません。ほとんどのシステムでは、「結果整合性」で問題ありません。 埋め込みを自動的に更新します ほとんどのシステムは、OpenAI API などの外部システムへの呼び出しを介して埋め込みを生成します。外部システムがダウンしたり、ネットワーク障害が発生したりするシナリオでは、データベース システムの残りの部分が動作し続けることが不可欠です。 ネットワークやサービスの障害に対する回復力を確保する: これらのガイドラインは、私たちが使用して実装した堅牢なアーキテクチャの基礎でした。 、PostgreSQL を使用してベクター データを操作するためのライブラリ。ジョブを正常に完了するために、このライブラリに新しい機能を追加しました。 -PostgreSQL データの埋め込みをできるだけ簡単にするため。 タイムスケール ベクトル Python ライブラリ PgVectorizer 私たちが決定したアーキテクチャは次のとおりです。 この設計では、まず変更を監視するトリガーをブログ テーブルに追加し、変更が検出されると、ブログ テーブル内の行がその埋め込みとともに古いことを示すジョブを blog_work_queue テーブルに挿入します。 固定スケジュールで、embeddings Creator ジョブは blog_work_queue テーブルをポーリングし、実行すべき作業が見つかった場合はループで次の処理を実行します。 blog_work_queue テーブルの行を読み取り、ロックする ブログテーブル内の対応する行を読み取ります。 ブログ行のデータの埋め込みを作成する 埋め込みを blog_embedding テーブルに書き込みます blog_work_queue テーブル内のロックされた行を削除します。 このシステムの動作を確認するには、使用例を参照してください。 。 このブログ投稿では、OpenAI、LangChain、Timescale Vector を使用して PostgreSQL テーブルの埋め込みを作成および維持します ブログ アプリケーション テーブルの例に戻ると、大まかに言うと、 次の 2 つのことを実行する必要があります。 PgVectorizer は ブログ行への変更を追跡して、どの行が変更されたかを確認します。 変更を処理して埋め込みを作成するメソッドを提供します。 これらは両方とも高度に同時実行され、パフォーマンスが高い必要があります。どのように機能するかを見てみましょう。 blog_work_queue テーブルを使用してブログ テーブルへの変更を追跡する 以下を使用して、単純なワークキューテーブルを作成できます。 CREATE TABLE blog_embedding_work_queue ( id INT ); CREATE INDEX ON blog_embedding_work_queue(id); これは非常に単純なテーブルですが、注意すべき点が 1 つあります。このテーブルには一意のキーがありません。これはキューの処理時にロックの問題を回避するために行われましたが、重複が発生する可能性があることを意味します。トレードオフについては、以下の代替案 1 で後述します。 次に、 に加えられた変更を追跡するトリガーを作成します。 blog CREATE OR REPLACE FUNCTION blog_wq_for_embedding() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN IF (TG_OP = 'DELETE') THEN INSERT INTO blog_embedding_work_queue VALUES (OLD.id); ELSE INSERT INTO blog_embedding_work_queue VALUES (NEW.id); END IF; RETURN NULL; END; $$; CREATE TRIGGER track_changes_for_embedding AFTER INSERT OR UPDATE OR DELETE ON blog FOR EACH ROW EXECUTE PROCEDURE blog_wq_for_embedding(); INSERT INTO blog_embedding_work_queue SELECT id FROM blog WHERE published_time is NOT NULL; トリガーは、変更されたブログの ID を blog_work_queue に挿入します。トリガーをインストールしてから、既存のブログを work_queue に挿入します。この順序は、ID が欠落しないようにするために重要です。 ここで、いくつかの代替デザインと、それらを拒否した理由について説明しましょう。 代替案 1: blog_work_queue テーブルに主キーまたは一意キーを実装します。 このキーを導入すると、重複エントリの問題が解消されます。ただし、課題がないわけではありません。特に、このようなキーを使用すると、新しい ID をテーブルに挿入するために 句を使用する必要があり、その句が B ツリー内の ID をロックするからです。 INSERT…ON CONFLICT DO NOTHING ここにジレンマがあります。処理フェーズでは、同時処理を防ぐために作業中の行を削除する必要があります。ただし、この削除のコミットは、対応する埋め込みが blog_embeddings に配置された後にのみ実行できます。これにより、途中で中断があった場合、たとえば、削除後、埋め込みが書き込まれる前に埋め込みの作成がクラッシュした場合でも、ID が失われることはありません。 ここで、一意キーまたは主キーを作成すると、削除を監視するトランザクションは開いたままになります。したがって、これはこれらの特定の ID に対するロックとして機能し、埋め込み作成ジョブの全期間中、blog_work_queue にそれらの ID が再度挿入されるのを防ぎます。埋め込みの作成には通常のデータベース トランザクションよりも時間がかかることを考えると、これは問題を引き起こします。ロックによりメインの「ブログ」テーブルのトリガーが停止し、プライマリ アプリケーションのパフォーマンスの低下につながります。さらに悪いことに、複数の行をバッチで処理する場合、デッドロックも潜在的な問題になります。 ただし、後で説明するように、時折重複するエントリによって生じる潜在的な問題は、処理段階で管理できます。散発的な重複は、埋め込みジョブが実行する作業量をわずかに増加させるだけなので、問題はありません。これは、前述のロックの課題に取り組むよりも確かに快適です。 代替案 2: ブログ テーブルに列を追加して、最新の埋め込みが存在するかどうかを追跡することで、実行する必要がある作業を追跡します。 たとえば、変更時に false に設定され、埋め込みの作成時に true に反転される ブール列を追加できます。この設計を拒否する理由は 3 つあります。 embedded すでに上で述べた理由により、 テーブルを変更したくありません。 blog 非埋め込みブログのリストを効率的に取得するには、ブログ テーブルに追加のインデックス (または部分インデックス) が必要になります。これにより、他の操作が遅くなります。 PostgreSQL の MVCC の性質により、すべての変更が 2 回 (embedding=false で 1 回、embedding=true で 1 回) 書き込まれるため、これによりテーブルのチャーンが増加します。 別個の work_queue_table はこれらの問題を解決します。 代替案 3: トリガー内に埋め込みを直接作成します。 このアプローチにはいくつかの問題があります。 埋め込みサービスがダウンしている場合は、トリガーが失敗する (トランザクションを中止する) か、キューに埋め込めなかった ID を保存するバックアップ コード パスを作成する必要があります。後者の解決策では、提案した設計に戻りますが、その上にさらに複雑さが追加されます。 このトリガーは、外部サービスとの接続に必要な待ち時間のため、おそらく残りのデータベース操作よりもはるかに遅くなります。これにより、テーブルに対する残りのデータベース操作が遅くなります。 ユーザーは、作成埋め込みコードをデータベースに直接記述する必要があります。 AI の共通言語は Python であり、埋め込みの作成には他の多くのライブラリが必要になることが多いことを考えると、これは必ずしも簡単ではなく、可能であるとは限りません (特に、ホストされた PostgreSQL クラウド環境内で実行している場合)。データベースの内部または外部に埋め込みを作成することを選択できる設計を採用する方がはるかに優れています。 これで、埋め込む必要があるブログのリストができました。リストを処理してみましょう。 埋め込みを作成する 埋め込みを作成するにはさまざまな方法があります。外部の Python スクリプトを使用することをお勧めします。このスクリプトは、ワーク キューと関連するブログ投稿をスキャンし、外部サービスを呼び出して埋め込みを作成し、これらの埋め込みをデータベースに格納し直します。この戦略の理由は次のとおりです。 : をお勧めします。Python は、強力な LLM 開発と そしてラマインデックス。 Python の選択 Python ラングチェーン : ユーザーがデータの埋め込み方法を制御できるようにしたいと考えました。しかし同時に、多くの Postgres クラウド プロバイダーは、セキュリティ上の懸念から、データベース内での任意の Python コードの実行を許可していません。そのため、ユーザーが埋め込みスクリプトとデータベースをホストする場所の両方を柔軟に行えるようにするために、外部 Python スクリプトを使用する設計を採用しました。 PL/Python の代わりに外部スクリプトを選択する ジョブはパフォーマンスが高く、同時実行性も安全である必要があります。同時実行性により、ジョブの実行が遅れて開始された場合でも、スケジューラーがさらに多くのジョブを開始して、システムが追いつき、負荷を処理できることが保証されます。 これらの各メソッドの設定方法については後で説明しますが、最初に、Python スクリプトがどのようになるかを見てみましょう。基本的に、スクリプトは 3 つの部分で構成されます。 ワークキューとブログ投稿を読む ブログ投稿の埋め込みを作成する 埋め込みを blog_embedding テーブルに書き込みます ステップ 2 と 3 は 。それでは、ワークキューをどのように処理するかをさらに詳しく見てみましょう。 embed_and_write PgVectorizer のブログ投稿 ワークキューを処理する まずコードを示し、次に重要な要素を取り上げます。 def process_queue(embed_and_write_cb, batch_size:int=10): with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(f""" SELECT to_regclass('blog_embedding_work_queue')::oid; """) table_oid = cursor.fetchone()[0] cursor.execute(f""" WITH selected_rows AS ( SELECT id FROM blog_embedding_work_queue LIMIT {int(batch_size)} FOR UPDATE SKIP LOCKED ), locked_items AS ( SELECT id, pg_try_advisory_xact_lock( {int(table_oid)}, id) AS locked FROM ( SELECT DISTINCT id FROM selected_rows ORDER BY id ) as ids ), deleted_rows AS ( DELETE FROM blog_embedding_work_queue WHERE id IN ( SELECT id FROM locked_items WHERE locked = true ORDER BY id ) ) SELECT locked_items.id as locked_id, {self.table_name}.* FROM locked_items LEFT JOIN blog ON blog.id = locked_items.id WHERE locked = true ORDER BY locked_items.id """) res = cursor.fetchall() if len(res) > 0: embed_and_write_cb(res) return len(res) process_queue(embed_and_write) 上記のスニペットの SQL コードは、パフォーマンスと同時実行性の両方を考慮して設計されているため、わかりにくいため、詳しく見てみましょう。 : 最初に、システムは、バッチ キュー サイズ パラメータによって決定される、指定された数のエントリをワーク キューから取得します。 FOR UPDATE ロックは、同時に実行されるスクリプトが同じキュー項目を処理しようとしないようにするために取得されます。 SKIP LOCKED ディレクティブを使用すると、エントリが現在別のスクリプトによって処理されている場合、システムは待機せずにそのエントリをスキップし、不必要な遅延を回避します。 ワーク キューからアイテムを取得する : ワークキュー テーブル内で同じ blog_id のエントリが重複する可能性があるため、単にテーブルをロックするだけでは不十分です。異なるジョブによる同じ ID の同時処理は悪影響を及ぼします。次の潜在的な競合状態を考慮してください。 ブログ ID のロック ジョブ 1 はブログを開始してアクセスし、バージョン 1 を取得します。 ブログに対する外部更新が発生します。 続いてジョブ 2 が開始され、バージョン 2 が取得されます。 どちらのジョブも埋め込み生成プロセスを開始します。 ジョブ 2 が終了し、ブログ バージョン 2 に対応する埋め込みが保存されます。 ジョブ 1 は、終了時に、バージョン 2 の埋め込みを古いバージョン 1 で誤って上書きします。 明示的なバージョン追跡を導入することでこの問題に対抗することもできますが、パフォーマンス上の利点はなく、かなりの複雑さが生じます。私たちが選択した戦略は、この問題を軽減するだけでなく、スクリプトの同時実行による冗長な操作や無駄な作業を防止します。 他のロックとの潜在的な重複を避けるために、テーブル識別子がプレフィックスとして付けられた Postgres アドバイザリー ロックが使用されます。 バリアントは、以前の SKIP LOCKED のアプリケーションに似ており、システムがロックで待機することを確実に回避します。 ORDER BY blog_id 句を含めると、潜在的なデッドロックを防ぐことができます。以下にいくつかの代替案を説明します。 try : 次に、スクリプトは、ロックに成功したブログのすべてのワーク キュー アイテムを削除します。これらのキュー アイテムがマルチバージョン同時実行制御 (MVCC) 経由で表示される場合、それらの更新は取得されたブログ行に明示されます。行の選択時に読み取られたアイテムだけでなく、指定されたブログ ID を持つすべてのアイテムが削除されることに注意してください。これにより、同じブログ ID の重複エントリが効果的に処理されます。この削除は、embed_and_write() 関数の呼び出しとその後の更新された埋め込みの保存後にのみコミットされることに注意することが重要です。このシーケンスにより、埋め込み生成フェーズ中にスクリプトが失敗した場合でも、更新が失われないことが保証されます。 ワーク キューのクリーンアップ 最後のステップでは、処理するブログを取得します。左結合の使用に注意してください。これにより、ブログ行を持たない削除済みアイテムのブログ ID を取得できます。埋め込みを削除するには、それらのアイテムを追跡する必要があります。 コールバックでは、削除されるブログ (または非公開。この場合は埋め込みも削除します) のセンチネルとして、NULL であるpublished_time を使用します。 処理するブログの取得: embed_and_write 代替案 4: 別のテーブルを使用して、勧告ロックの使用を回避します。 システムがすでに勧告ロックを使用していて、衝突が心配な場合は、ブログ ID を持つテーブルを主キーとして使用し、行をロックすることができます。実際、これらのロックが他のシステムの速度を低下させないと確信できる場合は、これをブログ テーブル自体にすることもできます (これらのロックは埋め込みプロセス全体を通じて保持する必要があり、時間がかかる場合があることに注意してください)。 あるいは、この目的のためだけに blog_embedding_locks テーブルを作成することもできます。このテーブルの作成は、スペースの点で非常に無駄になる可能性があるため、推奨しませんでした。また、勧告ロックを使用すると、このオーバーヘッドが回避されます。 結論と次のステップ 自動的に最新の状態に保ちます。このアーキテクチャにより、埋め込みは絶えず進化するデータとの同期を維持し、挿入、変更、削除にシームレスに対応します。 PgVectorizer を導入し、PostgreSQL に保存されたデータからベクトル埋め込みを生成するのに適したシステムの概要を説明しました。 このブログ投稿では、埋め込み生成サービスの潜在的なダウンタイムに効果的に対処し、回復力を誇るシステムをどのように作成したかを舞台裏で紹介しました。その設計は、高速なデータ変更の管理に優れており、同時の埋め込み生成プロセスをシームレスに使用して負荷の増大に対応できます。 さらに、データを PostgreSQL にコミットし、データベースを使用してバックグラウンドで埋め込みの生成を管理するというパラダイムは、データ変更中の埋め込みの維持を監視する簡単なメカニズムとして浮上しています。 AI 分野の無数のデモやチュートリアルは、ドキュメントからのデータの初期作成のみに焦点を当てており、進化するデータ同期の維持に関連する複雑なニュアンスを見逃しています。 ただし、実際の運用環境ではデータは常に変化するため、これらの変化の追跡と同期の複雑さに取り組むのは簡単な作業ではありません。しかし、データベースはそのように設計されています。なぜそれを使わないのでしょうか? : 学習の旅を続けるためのリソースをいくつか紹介します : PostgreSQL データから生成されたエンベディングを、PostgreSQL 上に構築された高速でスケーラブルなベクター データベースに保存します。もっと詳しく知る そして 。 Timescale Vector を 90 日間無料でお試しください タイムスケールベクトルについて どのように機能するか : PgVectorizer とその使用方法について詳しくは、 。 ドキュメントを読む timescale_vector Python ライブラリ : このブログ投稿で説明されている方法を使用して、PostgreSQL テーブルからブログ投稿データを作成、埋め込み、保存する方法については、このステップバイステップのチュートリアルに従ってください。 チュートリアル – PostgreSQL へのブログ データの埋め込み 上記の例では、ドキュメントの解析と埋め込みの作成を説明するために LangChain を使用しました。 Timescale Vector を使用してベクトル ストレージ、類似性検索、ハイブリッド検索を行う方法について詳しく説明します。 LangChain とタイムスケール ベクトル : マトヴェイ・アリエ著。 こちらでも公開しております。