L'intégration de données stockées dans une table PostgreSQL est sans aucun doute utile – avec des applications allant des systèmes de recherche sémantique et de recommandation aux applications d'IA générative et à la génération augmentée de récupération. Mais créer et gérer des intégrations pour les données dans les tables PostgreSQL peut être délicat, avec de nombreuses considérations et cas extrêmes à prendre en compte, tels que la mise à jour des intégrations avec les mises à jour et les suppressions de tables, la garantie de la résilience contre les pannes et l'impact sur les systèmes existants en fonction de la table.
Dans cet article de blog, nous discuterons des décisions de conception technique et des compromis que nous avons faits lors de la création de PgVectorizer pour garantir la simplicité, la résilience et les hautes performances. Nous discuterons également de modèles alternatifs si vous souhaitez créer le vôtre.
Allons-y.
Tout d’abord, décrivons comment fonctionnera le système que nous construisons. N'hésitez pas à sauter cette section si vous avez déjà lu le
À titre d'exemple illustratif, nous utiliserons une simple application de blog stockant des données dans PostgreSQL à l'aide d'une table définie comme suit :
CREATE TABLE blog ( id SERIAL PRIMARY KEY NOT NULL, title TEXT NOT NULL, author TEXT NOT NULL, contents TEXT NOT NULL, category TEXT NOT NULL, published_time TIMESTAMPTZ NULL --NULL if not yet published );
Nous souhaitons créer des intégrations sur le contenu de l'article de blog afin de pouvoir l'utiliser ultérieurement pour la recherche sémantique et la génération augmentée de récupération de puissance. Les intégrations ne doivent exister et pouvoir être recherchées que pour les blogs qui ont été publiés (où l' published_time
est NOT NULL
).
Lors de la création de ce système d'intégrations, nous avons pu identifier un certain nombre d'objectifs que tout système simple et résilient créant des intégrations devrait avoir :
Aucune modification au tableau d'origine. Cela permet aux systèmes et applications qui utilisent déjà cette table de ne pas être impactés par les modifications apportées au système d'intégration. Ceci est particulièrement important pour les systèmes existants.
Aucune modification des applications qui interagissent avec la table. Devoir modifier le code qui modifie la table peut ne pas être possible pour les systèmes existants. Il s'agit également d'une mauvaise conception logicielle car elle associe des systèmes qui n'utilisent pas d'intégration avec le code qui génère l'intégration.
Mettez automatiquement à jour les intégrations lorsque les lignes de la table source changent (dans ce cas, la table blog). Cela réduit la charge de maintenance et contribue à un logiciel sans souci. Dans le même temps, cette mise à jour n’a pas besoin d’être instantanée ou au sein du même commit. Pour la plupart des systèmes, la « cohérence éventuelle » est très bien.
Garantissez la résilience contre les pannes de réseau et de service : la plupart des systèmes génèrent des intégrations via un appel à un système externe, tel que l'API OpenAI. Dans les scénarios où le système externe est en panne ou où un dysfonctionnement du réseau se produit, il est impératif que le reste de votre système de base de données continue de fonctionner.
Ces lignes directrices constituent la base d'une architecture robuste que nous avons mise en œuvre à l'aide du
Voici l'architecture sur laquelle nous avons opté :
Dans cette conception, nous ajoutons d'abord un déclencheur à la table blog qui surveille les modifications et, dès que nous constatons une modification, insérons une tâche dans la table blog_work_queue qui indique qu'une ligne de la table blog est obsolète avec son intégration.
Selon un planning fixe, une tâche de création d'intégrations interrogera la table blog_work_queue et, si elle trouve du travail à faire, effectuera les opérations suivantes en boucle :
Pour voir ce système en action, consultez un exemple d'utilisation pour
Pour revenir à l'exemple de notre table d'application de blog, à un niveau élevé, PgVectorizer doit faire deux choses :
Suivez les modifications apportées aux lignes du blog pour savoir quelles lignes ont changé.
Fournissez une méthode pour traiter les modifications afin de créer des intégrations.
Les deux doivent être hautement simultanés et performants. Voyons voir comment ça fonctionne.
Vous pouvez créer une table de file d'attente de travail simple avec les éléments suivants :
CREATE TABLE blog_embedding_work_queue ( id INT ); CREATE INDEX ON blog_embedding_work_queue(id);
Il s'agit d'un tableau très simple, mais il y a un élément à noter : ce tableau n'a pas de clé unique. Cela a été fait pour éviter des problèmes de verrouillage lors du traitement de la file d'attente, mais cela signifie que nous pouvons avoir des doublons. Nous discutons du compromis plus loin dans l’alternative 1 ci-dessous.
Ensuite, vous créez un déclencheur pour suivre les modifications apportées au blog
:
CREATE OR REPLACE FUNCTION blog_wq_for_embedding() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN IF (TG_OP = 'DELETE') THEN INSERT INTO blog_embedding_work_queue VALUES (OLD.id); ELSE INSERT INTO blog_embedding_work_queue VALUES (NEW.id); END IF; RETURN NULL; END; $$; CREATE TRIGGER track_changes_for_embedding AFTER INSERT OR UPDATE OR DELETE ON blog FOR EACH ROW EXECUTE PROCEDURE blog_wq_for_embedding(); INSERT INTO blog_embedding_work_queue SELECT id FROM blog WHERE published_time is NOT NULL;
Le déclencheur insère l'ID du blog devenu blog_work_queue. Nous installons le déclencheur, puis insérons tous les blogs existants dans la work_queue. Cet ordre est important pour garantir qu’aucun identifiant ne soit supprimé.
Décrivons maintenant quelques conceptions alternatives et pourquoi nous les avons rejetées.
L'introduction de cette clé éliminerait le problème des entrées en double. Cependant, cela n'est pas sans défis, notamment parce qu'une telle clé nous obligerait à utiliser la clause INSERT…ON CONFLICT DO NOTHING
pour insérer de nouveaux ID dans la table, et cette clause verrouille l'ID dans l'arbre B.
Voici le dilemme : lors de la phase de traitement, il est nécessaire de supprimer les lignes en cours de traitement pour éviter un traitement simultané. Cependant, cette suppression ne peut être effectuée qu'après que l'intégration correspondante a été placée dans blog_embeddings. Cela garantit qu'aucun identifiant n'est perdu en cas de perturbation à mi-chemin, par exemple si la création de l'intégration plante après la suppression mais avant l'écriture de l'intégration.
Désormais, si nous créons une clé unique ou primaire, la transaction supervisant la suppression reste ouverte. Par conséquent, cela agit comme un verrou sur ces identifiants spécifiques, empêchant leur réinsertion dans blog_work_queue pendant toute la durée du travail de création d'intégration. Étant donné que la création d’intégrations prend plus de temps qu’une transaction de base de données classique, cela pose problème. Le verrou bloquerait le déclencheur de la table principale « blog », entraînant une baisse des performances de l'application principale. Pire encore, si vous traitez plusieurs lignes dans un lot, les blocages deviennent également un problème potentiel.
Cependant, les problèmes potentiels résultant d'entrées en double occasionnelles peuvent être gérés pendant la phase de traitement, comme illustré plus loin. Une duplication sporadique ici et là ne pose pas de problème car elle n'augmente que marginalement la quantité de travail effectuée par la tâche d'intégration. C’est certainement plus acceptable que de s’attaquer aux problèmes de verrouillage mentionnés ci-dessus.
Par exemple, nous pourrions ajouter une colonne booléenne embedded
définie sur false lors de la modification et transformée en true lors de la création de l'intégration. Il y a trois raisons de rejeter cette conception :
Nous ne souhaitons pas modifier la table blog
pour les raisons déjà évoquées ci-dessus.
Obtenir efficacement une liste de blogs non intégrés nécessiterait un index supplémentaire (ou un index partiel) sur la table du blog. Cela ralentirait d'autres opérations.
Cela augmente le taux de désabonnement sur la table car chaque modification serait désormais écrite deux fois (une fois avec embedding=false et une fois avec embedding=true) en raison de la nature MVCC de PostgreSQL.
Un work_queue_table distinct résout ces problèmes.
Cette approche présente plusieurs problèmes :
Si le service d'intégration est en panne, soit le déclencheur doit échouer (interrompant votre transaction), soit vous devez créer un chemin de code de sauvegarde qui… stocke les identifiants qui n'ont pas pu être intégrés dans une file d'attente. Cette dernière solution nous ramène à notre conception proposée mais avec plus de complexité en plus.
Ce déclencheur sera probablement beaucoup plus lent que le reste des opérations de la base de données en raison de la latence nécessaire pour contacter un service externe. Cela ralentira le reste de vos opérations de base de données sur la table.
Il oblige l'utilisateur à écrire le code d'intégration de création directement dans la base de données. Étant donné que la lingua franca de l'IA est Python et que la création d'intégration nécessite souvent de nombreuses autres bibliothèques, cela n'est pas toujours facile ni même possible (surtout si l'on s'exécute dans un environnement cloud PostgreSQL hébergé). Il est bien préférable d'avoir une conception dans laquelle vous avez le choix de créer des intégrations à l'intérieur ou à l'extérieur de la base de données.
Nous avons maintenant une liste de blogs qui doivent être intégrés, traitons la liste !
Il existe de nombreuses façons de créer des intégrations. Nous vous recommandons d'utiliser un script Python externe. Ce script analysera la file d'attente de travail et les articles de blog associés, invoquera un service externe pour créer les intégrations, puis stockera ces intégrations dans la base de données. Notre raisonnement pour cette stratégie est le suivant :
Choix de Python : Nous recommandons Python car il offre un écosystème riche et inégalé pour les tâches de données d'IA, mis en valeur par un développement LLM puissant et des bibliothèques de données comme
Opter pour un script externe au lieu de PL/Python : Nous voulions que les utilisateurs aient le contrôle sur la manière dont ils embarquent leurs données. Pourtant, dans le même temps, de nombreux fournisseurs de cloud Postgres n'autorisent pas l'exécution de code Python arbitraire dans la base de données pour des raisons de sécurité. Ainsi, pour permettre aux utilisateurs d'avoir de la flexibilité dans leurs scripts d'intégration ainsi que dans l'endroit où ils hébergent leur base de données, nous avons opté pour une conception utilisant des scripts Python externes.
Les tâches doivent être à la fois performantes et sécurisées en matière de concurrence. La concurrence garantit que si les tâches commencent à prendre du retard, les planificateurs peuvent démarrer davantage de tâches pour aider le système à rattraper son retard et à gérer la charge.
Nous verrons plus tard comment configurer chacune de ces méthodes, mais voyons d'abord à quoi ressemblerait le script Python. Fondamentalement, le script comporte trois parties :
Lire la file d'attente des travaux et l'article du blog
Créer une intégration pour l'article de blog
Écrivez l'intégration dans la table blog_embedding
Les étapes 2 et 3 sont effectuées par un rappel embed_and_write
que nous définissons dans le
Nous allons d'abord vous montrer le code, puis mettre en évidence les éléments clés en jeu :
def process_queue(embed_and_write_cb, batch_size:int=10): with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(f""" SELECT to_regclass('blog_embedding_work_queue')::oid; """) table_oid = cursor.fetchone()[0] cursor.execute(f""" WITH selected_rows AS ( SELECT id FROM blog_embedding_work_queue LIMIT {int(batch_size)} FOR UPDATE SKIP LOCKED ), locked_items AS ( SELECT id, pg_try_advisory_xact_lock( {int(table_oid)}, id) AS locked FROM ( SELECT DISTINCT id FROM selected_rows ORDER BY id ) as ids ), deleted_rows AS ( DELETE FROM blog_embedding_work_queue WHERE id IN ( SELECT id FROM locked_items WHERE locked = true ORDER BY id ) ) SELECT locked_items.id as locked_id, {self.table_name}.* FROM locked_items LEFT JOIN blog ON blog.id = locked_items.id WHERE locked = true ORDER BY locked_items.id """) res = cursor.fetchall() if len(res) > 0: embed_and_write_cb(res) return len(res) process_queue(embed_and_write)
Le code SQL dans l'extrait ci-dessus est subtil car il est conçu pour être à la fois performant et sécurisé en matière de concurrence, alors passons en revue :
Retirer les éléments de la file d'attente de travail : initialement, le système récupère un nombre spécifié d'entrées de la file d'attente de travail, déterminé par le paramètre de taille de file d'attente par lots. Un verrou FOR UPDATE est utilisé pour garantir que les scripts exécutés simultanément n'essaient pas de traiter les mêmes éléments de file d'attente. La directive SKIP LOCKED garantit que si une entrée est actuellement gérée par un autre script, le système l'ignorera au lieu d'attendre, évitant ainsi des retards inutiles.
Verrouillage des identifiants de blog : en raison de la possibilité d'entrées en double pour le même blog_id dans la table de file d'attente de travail, le simple verrouillage de ladite table est insuffisant. Le traitement simultané du même identifiant par différentes tâches serait préjudiciable. Considérez la condition de concurrence potentielle suivante :
La tâche 1 lance et accède à un blog, récupérant la version 1.
Une mise à jour externe du blog a lieu.
Par la suite, le Job 2 commence, obtenant la version 2.
Les deux tâches démarrent le processus de génération d’intégration.
Le travail 2 se termine en stockant l'intégration correspondant à la version 2 du blog.
Une fois terminé, le travail 1 écrase par erreur la version 2 intégrée par la version 1 obsolète.
Même si l’on pourrait résoudre ce problème en introduisant un suivi explicite des versions, cela introduit une complexité considérable sans aucun avantage en termes de performances. La stratégie pour laquelle nous avons opté atténue non seulement ce problème, mais évite également les opérations redondantes et le gaspillage de travail en exécutant simultanément des scripts.
Un verrou consultatif Postgres, préfixé par l'identifiant de la table pour éviter les chevauchements potentiels avec d'autres verrous de ce type, est utilisé. La variante try
, analogue à l'application précédente de SKIP LOCKED, garantit que le système évite d'attendre les verrous. L'inclusion de la clause ORDER BY blog_id permet d'éviter d'éventuels blocages. Nous aborderons quelques alternatives ci-dessous.
Nettoyage de la file d'attente de travail : Le script supprime ensuite tous les éléments de la file d'attente de travail des blogs que nous avons verrouillés avec succès. Si ces éléments de file d'attente sont visibles via MVCC (Multi-Version Concurrency Control), leurs mises à jour sont manifestées dans la ligne de blog récupérée. Notez que nous supprimons tous les éléments avec l'ID de blog donné, pas seulement les éléments lus lors de la sélection des lignes : cela gère efficacement les entrées en double pour le même ID de blog. Il est crucial de noter que cette suppression n'est validée qu'après l'appel de la fonction embed_and_write() et le stockage ultérieur de l'intégration mise à jour. Cette séquence garantit que nous ne perdrons aucune mise à jour même si le script échoue pendant la phase de génération d'intégration.
Faire traiter les blogs : dans la dernière étape, nous récupérons les blogs à traiter. Notez l'utilisation de la jointure gauche : qui nous permet de récupérer les identifiants de blog pour les éléments supprimés qui n'auront pas de ligne de blog. Nous devons suivre ces éléments pour supprimer leurs intégrations. Dans le rappel embed_and_write
, nous utilisons shared_time étant NULL comme sentinelle pour le blog en cours de suppression (ou non publié, auquel cas nous souhaitons également supprimer l'intégration).
Si le système utilise déjà des verrous consultatifs et que vous craignez des collisions, il est possible d'utiliser une table avec un ID de blog comme clé primaire et de verrouiller les lignes. En fait, il peut s'agir de la table du blog elle-même si vous êtes sûr que ces verrous ne ralentiront aucun autre système (rappelez-vous que ces verrous doivent être maintenus tout au long du processus d'intégration, ce qui peut prendre un certain temps).
Alternativement, vous pouvez avoir une table blog_embedding_locks uniquement à cet effet. Nous n'avons pas suggéré de créer cette table car nous pensons que cela peut être très coûteux en termes d'espace, et l'utilisation de verrous consultatifs évite cette surcharge.
Dans cet article de blog, nous vous avons donné un aperçu des coulisses de la façon dont nous avons créé un système résistant, gérant efficacement les temps d'arrêt potentiels du service de génération d'intégration. Sa conception est apte à gérer un taux élevé de modifications de données et peut utiliser de manière transparente des processus de génération d'intégration simultanés pour s'adapter à des charges accrues.
De plus, le paradigme de la validation des données dans PostgreSQL et de l'utilisation de la base de données pour gérer la génération de l'intégration en arrière-plan apparaît comme un mécanisme simple pour superviser la maintenance de l'intégration au milieu des modifications de données. Une myriade de démos et de didacticiels dans le domaine de l'IA se concentrent uniquement sur la création initiale de données à partir de documents, ignorant les nuances complexes associées à la préservation de la synchronisation des données au fur et à mesure de leur évolution.
Cependant, dans les environnements de production réels, les données changent invariablement, et la complexité du suivi et de la synchronisation de ces changements n’est pas une tâche anodine. Mais c’est pour cela qu’une base de données est conçue ! Pourquoi ne pas simplement l'utiliser ?
Écrit par Matvey Arye.
Également publié ici.