Das Einbetten von in einer PostgreSQL- Tabelle gespeicherten Daten ist zweifellos nützlich – bei Anwendungen, die von semantischen Such- und Empfehlungssystemen bis hin zu generativen KI-Anwendungen und Retrieval Augmented Generation reichen. Das Erstellen und Verwalten von Einbettungen für Daten in PostgreSQL-Tabellen kann jedoch schwierig sein, da viele Überlegungen und Randfälle zu berücksichtigen sind, z. B. die Aktualisierung von Einbettungen durch Tabellenaktualisierungen und -löschungen auf dem neuesten Stand zu halten, die Widerstandsfähigkeit gegen Ausfälle sicherzustellen und die Auswirkungen auf bestehende Systeme zu berücksichtigen Der Tisch.
In diesem Blogbeitrag besprechen wir die technischen Designentscheidungen und die Kompromisse, die wir beim Aufbau von PgVectorizer eingegangen sind, um Einfachheit, Belastbarkeit und hohe Leistung sicherzustellen. Wir besprechen auch alternative Designs, wenn Sie Ihr eigenes rollen möchten.
Lasst uns hineinspringen.
Lassen Sie uns zunächst beschreiben, wie das System, das wir aufbauen, funktionieren wird. Überspringen Sie diesen Abschnitt gerne, wenn Sie ihn bereits gelesen haben
Als anschauliches Beispiel verwenden wir eine einfache Blog-Anwendung, die Daten in PostgreSQL unter Verwendung einer wie folgt definierten Tabelle speichert:
CREATE TABLE blog ( id SERIAL PRIMARY KEY NOT NULL, title TEXT NOT NULL, author TEXT NOT NULL, contents TEXT NOT NULL, category TEXT NOT NULL, published_time TIMESTAMPTZ NULL --NULL if not yet published );
Wir möchten Einbettungen in den Inhalt des Blog-Beitrags erstellen, damit wir ihn später für die semantische Suche und die erweiterte Generierung von Power Retrieval verwenden können. Einbettungen sollten nur für Blogs vorhanden und durchsuchbar sein, die veröffentlicht wurden (wobei „ published_time
NOT NULL
ist).
Beim Aufbau dieses Einbettungssystems konnten wir eine Reihe von Zielen identifizieren, die jedes unkomplizierte und belastbare System zum Erstellen von Einbettungen haben sollte:
Keine Änderungen an der Originaltabelle. Dadurch können Systeme und Anwendungen, die diese Tabelle bereits verwenden, nicht durch Änderungen am Einbettungssystem beeinträchtigt werden. Dies ist besonders wichtig für Legacy-Systeme.
Keine Änderung an den Anwendungen, die mit der Tabelle interagieren. Bei älteren Systemen ist es möglicherweise nicht möglich, den Code zu ändern, der die Tabelle ändert. Es handelt sich außerdem um ein schlechtes Softwaredesign, da es Systeme, die keine Einbettungen verwenden, mit Code koppelt, der die Einbettungen generiert.
Einbettungen automatisch aktualisieren, wenn sich Zeilen in der Quelltabelle ändern (in diesem Fall die Blog-Tabelle). Dies verringert den Wartungsaufwand und trägt zu einer sorgenfreien Software bei. Gleichzeitig muss dieses Update nicht sofort oder innerhalb desselben Commits erfolgen. Für die meisten Systeme ist „eventuelle Konsistenz“ völlig ausreichend.
Stellen Sie die Widerstandsfähigkeit gegenüber Netzwerk- und Dienstausfällen sicher: Die meisten Systeme generieren Einbettungen über einen Aufruf an ein externes System, beispielsweise die OpenAI-API. In Szenarien, in denen das externe System ausfällt oder eine Netzwerkstörung auftritt, ist es unbedingt erforderlich, dass der Rest Ihres Datenbanksystems weiterhin funktioniert.
Diese Richtlinien bildeten die Grundlage einer robusten Architektur, die wir mithilfe von implementiert haben
Hier ist die Architektur, für die wir uns entschieden haben:
In diesem Entwurf fügen wir zunächst einen Trigger zur Blog-Tabelle hinzu, der auf Änderungen überwacht und bei Erkennung einer Änderung einen Job in die blog_work_queue-Tabelle einfügt, der angibt, dass eine Zeile in der Blog-Tabelle aufgrund ihrer Einbettung veraltet ist.
Nach einem festen Zeitplan fragt ein Embeddings-Creator-Job die Tabelle blog_work_queue ab und führt Folgendes in einer Schleife aus, wenn er Arbeit findet:
Um dieses System in Aktion zu sehen, sehen Sie sich ein Beispiel für die Verwendung an
Zurück zum Beispiel unserer Blog-Anwendungstabelle: Auf hoher Ebene muss PgVectorizer zwei Dinge tun:
Verfolgen Sie Änderungen an den Blogzeilen, um zu wissen, welche Zeilen geändert wurden.
Stellen Sie eine Methode zur Verarbeitung der Änderungen bereit, um Einbettungen zu erstellen.
Beides muss hochgradig gleichzeitig und leistungsfähig sein. Mal sehen, wie es funktioniert.
Sie können eine einfache Arbeitswarteschlangentabelle mit Folgendem erstellen:
CREATE TABLE blog_embedding_work_queue ( id INT ); CREATE INDEX ON blog_embedding_work_queue(id);
Dies ist eine sehr einfache Tabelle, aber es gibt einen Hinweis: Diese Tabelle hat keinen eindeutigen Schlüssel. Dies wurde durchgeführt, um Sperrprobleme bei der Verarbeitung der Warteschlange zu vermeiden, es bedeutet jedoch, dass wir möglicherweise Duplikate haben. Wir diskutieren den Kompromiss später in Alternative 1 weiter unten.
Anschließend erstellen Sie einen Auslöser, um alle am blog
vorgenommenen Änderungen zu verfolgen:
CREATE OR REPLACE FUNCTION blog_wq_for_embedding() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN IF (TG_OP = 'DELETE') THEN INSERT INTO blog_embedding_work_queue VALUES (OLD.id); ELSE INSERT INTO blog_embedding_work_queue VALUES (NEW.id); END IF; RETURN NULL; END; $$; CREATE TRIGGER track_changes_for_embedding AFTER INSERT OR UPDATE OR DELETE ON blog FOR EACH ROW EXECUTE PROCEDURE blog_wq_for_embedding(); INSERT INTO blog_embedding_work_queue SELECT id FROM blog WHERE published_time is NOT NULL;
Der Trigger fügt die ID des Blogs, das sich geändert hat, in blog_work_queue ein. Wir installieren den Trigger und fügen dann alle vorhandenen Blogs in die work_queue ein. Diese Reihenfolge ist wichtig, um sicherzustellen, dass keine Ausweise verloren gehen.
Beschreiben wir nun einige alternative Designs und warum wir sie abgelehnt haben.
Durch die Einführung dieses Schlüssels würde das Problem doppelter Einträge beseitigt. Dies ist jedoch nicht ohne Herausforderungen, insbesondere weil ein solcher Schlüssel uns dazu zwingen würde, die INSERT…ON CONFLICT DO NOTHING
Klausel zu verwenden, um neue IDs in die Tabelle einzufügen, und diese Klausel eine Sperre für die ID im B-Baum vornimmt.
Hier liegt das Dilemma: Während der Verarbeitungsphase müssen die bearbeiteten Zeilen gelöscht werden, um eine gleichzeitige Verarbeitung zu verhindern. Allerdings kann dieser Löschvorgang erst durchgeführt werden, nachdem die entsprechende Einbettung in blog_embeddings platziert wurde. Dadurch wird sichergestellt, dass keine IDs verloren gehen, wenn es auf halbem Weg zu einer Unterbrechung kommt, beispielsweise wenn die Erstellung der Einbettung nach dem Löschen, aber bevor die Einbettung geschrieben wird, abstürzt.
Wenn wir nun einen eindeutigen oder Primärschlüssel erstellen, bleibt die Transaktion, die die Löschung überwacht, offen. Folglich fungiert dies als Sperre für diese spezifischen IDs und verhindert, dass sie für die gesamte Dauer des Einbettungserstellungsauftrags wieder in die blog_work_queue eingefügt werden. Angesichts der Tatsache, dass das Erstellen von Einbettungen länger dauert als bei einer typischen Datenbanktransaktion, bedeutet dies Probleme. Die Sperre würde den Auslöser für die Haupttabelle „Blog“ blockieren, was zu einem Leistungsabfall der primären Anwendung führen würde. Erschwerend kommt hinzu, dass bei der Verarbeitung mehrerer Zeilen in einem Stapel auch Deadlocks zu einem potenziellen Problem werden können.
Allerdings können die potenziellen Probleme, die sich aus gelegentlichen doppelten Einträgen ergeben, während der Verarbeitungsphase bewältigt werden, wie später dargestellt wird. Ein sporadisches Duplikat hier und da ist kein Problem, da es den Arbeitsaufwand der Einbettungsaufgabe nur geringfügig erhöht. Das ist sicherlich angenehmer, als sich mit den oben genannten Verriegelungsherausforderungen herumzuschlagen.
Beispielsweise könnten wir eine embedded
boolesche Spalte hinzufügen, die bei der Änderung auf „false“ gesetzt und beim Erstellen der Einbettung auf „true“ umgedreht wird. Es gibt drei Gründe, diesen Entwurf abzulehnen:
Wir möchten die blog
Tabelle aus den oben bereits genannten Gründen nicht ändern.
Um effizient eine Liste nicht eingebetteter Blogs zu erhalten, wäre ein zusätzlicher Index (oder Teilindex) für die Blog-Tabelle erforderlich. Dies würde andere Vorgänge verlangsamen.
Dies erhöht die Fluktuation in der Tabelle, da jede Änderung aufgrund der MVCC-Natur von PostgreSQL nun zweimal geschrieben würde (einmal mit Embedding=false und einmal mit Embedding=true).
Eine separate work_queue_table löst diese Probleme.
Dieser Ansatz weist mehrere Probleme auf:
Wenn der Einbettungsdienst ausfällt, muss entweder der Trigger fehlschlagen (Ihre Transaktion wird abgebrochen) oder Sie müssen einen Backup-Codepfad erstellen, der … die IDs speichert, die nicht in eine Warteschlange eingebettet werden konnten. Die letztere Lösung bringt uns zurück zu unserem vorgeschlagenen Design, allerdings mit zusätzlicher Komplexität.
Dieser Auslöser wird aufgrund der für die Kontaktaufnahme mit einem externen Dienst erforderlichen Latenz wahrscheinlich viel langsamer sein als die übrigen Datenbankvorgänge. Dadurch werden die restlichen Datenbankvorgänge für die Tabelle verlangsamt.
Es zwingt den Benutzer, den Einbettungscode für die Erstellung direkt in die Datenbank zu schreiben. Angesichts der Tatsache, dass die Verkehrssprache der KI Python ist und für die Einbettungserstellung oft viele andere Bibliotheken erforderlich sind, ist dies nicht immer einfach oder überhaupt möglich (insbesondere, wenn es in einer gehosteten PostgreSQL-Cloud-Umgebung ausgeführt wird). Es ist viel besser, ein Design zu haben, bei dem Sie die Wahl haben, Einbettungen innerhalb oder außerhalb der Datenbank zu erstellen.
Jetzt haben wir eine Liste der Blogs, die eingebettet werden müssen. Lassen Sie uns die Liste verarbeiten!
Es gibt viele Möglichkeiten, Einbettungen zu erstellen. Wir empfehlen die Verwendung eines externen Python-Skripts. Dieses Skript durchsucht die Arbeitswarteschlange und die zugehörigen Blogbeiträge, ruft einen externen Dienst auf, um die Einbettungen zu erstellen, und speichert diese Einbettungen dann wieder in der Datenbank. Unsere Begründung für diese Strategie lautet wie folgt:
Wahl von Python : Wir empfehlen Python , weil es ein reichhaltiges, unübertroffenes Ökosystem für KI-Datenaufgaben bietet, hervorgehoben durch leistungsstarke LLM-Entwicklung und Datenbibliotheken wie
Entscheidung für ein externes Skript anstelle von PL/Python : Wir wollten, dass Benutzer die Kontrolle darüber haben, wie sie ihre Daten einbetten. Doch gleichzeitig erlauben viele Postgres-Cloud-Anbieter aus Sicherheitsgründen nicht die Ausführung von beliebigem Python-Code innerhalb der Datenbank. Um Benutzern Flexibilität sowohl bei der Einbettung von Skripten als auch beim Hosten ihrer Datenbank zu ermöglichen, haben wir uns für ein Design entschieden, das externe Python-Skripte verwendet.
Die Jobs müssen sowohl performant als auch nebenläufigkeitssicher sein. Parallelität garantiert, dass die Planer weitere Jobs starten können, wenn Jobs in Verzug geraten, um dem System zu helfen, aufzuholen und die Last zu bewältigen.
Wir werden uns später mit der Einrichtung jeder dieser Methoden befassen, aber zunächst wollen wir sehen, wie das Python-Skript aussehen würde. Im Wesentlichen besteht das Skript aus drei Teilen:
Lesen Sie die Arbeitswarteschlange und den Blogbeitrag
Erstellen Sie eine Einbettung für den Blogbeitrag
Schreiben Sie die Einbettung in die Tabelle blog_embedding
Die Schritte 2 und 3 werden durch einen embed_and_write
Callback ausgeführt, den wir im definieren
Wir zeigen Ihnen zunächst den Code und heben dann die wichtigsten Elemente hervor:
def process_queue(embed_and_write_cb, batch_size:int=10): with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(f""" SELECT to_regclass('blog_embedding_work_queue')::oid; """) table_oid = cursor.fetchone()[0] cursor.execute(f""" WITH selected_rows AS ( SELECT id FROM blog_embedding_work_queue LIMIT {int(batch_size)} FOR UPDATE SKIP LOCKED ), locked_items AS ( SELECT id, pg_try_advisory_xact_lock( {int(table_oid)}, id) AS locked FROM ( SELECT DISTINCT id FROM selected_rows ORDER BY id ) as ids ), deleted_rows AS ( DELETE FROM blog_embedding_work_queue WHERE id IN ( SELECT id FROM locked_items WHERE locked = true ORDER BY id ) ) SELECT locked_items.id as locked_id, {self.table_name}.* FROM locked_items LEFT JOIN blog ON blog.id = locked_items.id WHERE locked = true ORDER BY locked_items.id """) res = cursor.fetchall() if len(res) > 0: embed_and_write_cb(res) return len(res) process_queue(embed_and_write)
Der SQL-Code im obigen Snippet ist subtil, da er sowohl leistungsfähig als auch parallelitätssicher ist. Gehen wir ihn also durch:
Elemente aus der Arbeitswarteschlange entfernen : Zunächst ruft das System eine bestimmte Anzahl von Einträgen aus der Arbeitswarteschlange ab, die durch den Parameter für die Stapelwarteschlangengröße bestimmt wird. Eine FOR UPDATE-Sperre wird verwendet, um sicherzustellen, dass gleichzeitig ausgeführte Skripts nicht versuchen, dieselben Warteschlangenelemente zu verarbeiten. Die SKIP LOCKED-Direktive stellt sicher, dass das System, wenn ein Eintrag gerade von einem anderen Skript verarbeitet wird, ihn überspringt, anstatt zu warten, wodurch unnötige Verzögerungen vermieden werden.
Sperren von Blog-IDs : Aufgrund der Möglichkeit doppelter Einträge für dieselbe blog_id in der Arbeitswarteschlangentabelle reicht es nicht aus, diese Tabelle einfach zu sperren. Die gleichzeitige Verarbeitung derselben ID durch verschiedene Jobs wäre schädlich. Berücksichtigen Sie die folgende mögliche Rennbedingung:
Job 1 initiiert einen Blog, greift darauf zu und ruft Version 1 ab.
Es findet eine externe Aktualisierung des Blogs statt.
Anschließend beginnt Job 2 und erhält Version 2.
Beide Jobs beginnen mit dem Einbettungsgenerierungsprozess.
Job 2 schließt ab und speichert die Einbettung, die der Blog-Version 2 entspricht.
Job 1 überschreibt nach Abschluss fälschlicherweise die Einbettung von Version 2 mit der veralteten Version 1.
Obwohl man diesem Problem durch die Einführung einer expliziten Versionsverfolgung entgegenwirken könnte, führt dies zu einer erheblichen Komplexität ohne Leistungsvorteile. Die von uns gewählte Strategie mildert nicht nur dieses Problem, sondern verhindert auch redundante Vorgänge und verschwendete Arbeit durch die gleichzeitige Ausführung von Skripten.
Es wird eine Postgres-Hinweissperre verwendet, der die Tabellenkennung vorangestellt ist, um mögliche Überschneidungen mit anderen derartigen Sperren zu vermeiden. Die try
Variante stellt analog zur früheren Anwendung von SKIP LOCKED sicher, dass das System das Warten auf Sperren vermeidet. Die Einbeziehung der ORDER BY blog_id-Klausel trägt dazu bei, potenzielle Deadlocks zu verhindern. Nachfolgend gehen wir auf einige Alternativen ein.
Bereinigen der Arbeitswarteschlange : Das Skript löscht dann alle Arbeitswarteschlangenelemente für Blogs, die wir erfolgreich gesperrt haben. Wenn diese Warteschlangenelemente über Multi-Version Concurrency Control (MVCC) sichtbar sind, werden ihre Aktualisierungen in der abgerufenen Blogzeile angezeigt. Beachten Sie, dass wir alle Elemente mit der angegebenen Blog-ID löschen, nicht nur die Elemente, die bei der Auswahl der Zeilen gelesen wurden: Dadurch werden doppelte Einträge für dieselbe Blog-ID effektiv behandelt. Es ist wichtig zu beachten, dass dieser Löschvorgang erst nach dem Aufruf der Funktion „embed_and_write()“ und der anschließenden Speicherung der aktualisierten Einbettung ausgeführt wird. Diese Reihenfolge stellt sicher, dass wir keine Updates verlieren, selbst wenn das Skript während der Einbettungsgenerierungsphase fehlschlägt.
Zu verarbeitende Blogs abrufen: Im letzten Schritt rufen wir die zu verarbeitenden Blogs ab. Beachten Sie die Verwendung des Links-Joins: Dadurch können wir die Blog-IDs für gelöschte Elemente abrufen, die keine Blog-Zeile haben. Wir müssen diese Elemente verfolgen, um ihre Einbettungen zu löschen. Im Rückruf embed_and_write
verwenden wir „publicated_time“ als NULL als Sentinel für den Blog, der gelöscht wird (oder unveröffentlicht ist; in diesem Fall möchten wir auch die Einbettung löschen).
Wenn das System bereits Empfehlungssperren verwendet und Sie Kollisionen befürchten, können Sie eine Tabelle mit einer Blog-ID als Primärschlüssel verwenden und die Zeilen sperren. Tatsächlich kann es sich hierbei um die Blog-Tabelle selbst handeln, wenn Sie sicher sind, dass diese Sperren kein anderes System verlangsamen (denken Sie daran, dass diese Sperren während des gesamten Einbettungsvorgangs aufrechterhalten werden müssen, was eine Weile dauern kann).
Alternativ können Sie nur für diesen Zweck eine blog_embedding_locks-Tabelle verwenden. Wir haben nicht empfohlen, diese Tabelle zu erstellen, da wir der Meinung sind, dass sie sehr platzraubend sein kann, und die Verwendung von Empfehlungssperren vermeidet diesen Mehraufwand.
In diesem Blogbeitrag haben wir Ihnen einen Blick hinter die Kulissen geworfen und Ihnen gezeigt, wie wir ein System geschaffen haben, das sich durch Stabilität auszeichnet und potenzielle Ausfallzeiten des Einbettungsgenerierungsdienstes effektiv bewältigt. Sein Design ist in der Lage, eine hohe Rate an Datenänderungen zu bewältigen und kann nahtlos gleichzeitige Einbettungs- und Generierungsprozesse nutzen, um erhöhte Lasten zu bewältigen.
Darüber hinaus erweist sich das Paradigma, Daten an PostgreSQL zu übergeben und die Datenbank zur Verwaltung der Einbettungsgenerierung im Hintergrund zu verwenden, als einfacher Mechanismus zur Überwachung der Einbettungspflege inmitten von Datenänderungen. Eine Vielzahl von Demos und Tutorials im KI-Bereich konzentrieren sich ausschließlich auf die anfängliche Erstellung von Daten aus Dokumenten und übersehen dabei die komplizierten Nuancen, die mit der Aufrechterhaltung der Datensynchronisierung während ihrer Entwicklung verbunden sind.
In realen Produktionsumgebungen ändern sich Daten jedoch unweigerlich, und die Komplexität der Verfolgung und Synchronisierung dieser Veränderungen ist kein triviales Unterfangen. Aber genau dafür ist eine Datenbank konzipiert! Warum nicht einfach nutzen?
Geschrieben von Matvey Arye.
Auch hier veröffentlicht.