Dans l'architecture des systèmes ciblés, l'écart de synchronisation entre les API HTTP externes et les cibles de bases de données relationnelles représente un défi d'ingénierie persistant, en particulier lorsque les réponses API manquent de sémantique transactionnelle native ou exposent des flux de données non normalisés et profondément enracinés avec un point de contrôle intégré, des filaments de commentaire, des hiérarchies organisationnelles, des structures de BOM multi-niveaux. L'abstraction de source personnalisée de CocoIndex fournit un cadre déclaratif pour convertir les endpoints HTTP/gRPC arbitraires en flux de données synchronisés à l'état avec un modèle de CDC (Change Data Capture) basé sur • Stateless API→Stateful Stream transformation: Converting REST GET requests into append-log semantics • Recursive tree traversal with cycle detection: Handling unbounded-depth comment DAGs (Directed Acyclic Graphs) without stack overflow • Schema evolution support: Python dataclass-based typing with forward compatibility via structural subtyping • Intégration des connecteurs postgres : exploitation du poolage de connexion async de psycopg3 + du protocole COPY pour les inserts en vrac • Configuration de recherche de texte complet indexée par GIN: matérialisation des colonnes tsvector avec des fonctions d'origine du dictionnaire anglais + de classement Dans cet exemple, nous construisons un connecteur personnalisé pour HackerNews. Il récupère les histoires récentes + les commentaires nichés, les indexe et expose une interface de recherche simple alimentée par la recherche de texte complet de Postgres. Pourquoi utiliser une source Custom ? In many scenarios, pipelines don't just read from clean tables. They depend on: Services internes REST Partenaires APIs Systèmes héréditaires Modèles de données non standard qui ne correspondent pas aux connecteurs traditionnels CocoIndex’s Custom Source API makes these integrations , incremental, and safe by default. Instead of writing ad-hoc scripts, you wrap your API as a “source component,” and CocoIndex takes it from there. Déclaration Project Walkthrough — Building a HackerNews Index Goals Télécharger HackerNews Search API Feché commentaires Update only modified threads Store content in Postgres Exposer une interface de recherche de texte CocoIndex gère automatiquement la détection des changements, l'idempotence, la ligne et la synchronisation de l'état. Vue d’ensemble The pipeline consists of three major parts: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query Each Seuls les processus ont changé les fils HN et tout est synchronisé. cocoindex update Le projet est open source et disponible sur . GitHub Prerequisites if you don't have one. Install Postgres Définir le modèle de données Every custom source defines two lightweight data types: Type de clé → identifie uniquement un élément Value Type → the full content for that item Dans Hacker News, chaque nouvelle est un thread, et chaque thread peut avoir plusieurs commentaires. Pour HackerNews, définissons les clés comme cela: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str Les clés doivent être : hashable serializable stable (ne change pas au fil du temps) Les valeurs détiennent le dataset réel : @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] This tells CocoIndex exactly what every HackerNews “item” looks like when fully fetched. Un article et tous ses commentaires... Les commentaires individuels. _HackerNewsThread _HackerNewsComment Créer un connecteur source A Custom Source has two parts: — declarative configuration SourceSpec SourceConnector – logique opérationnelle pour la lecture des données Écrire le SourceSpec A in CocoIndex is a declarative configuration that tells the system et to a source. It doesn’t fetch data itself — that’s handled by the source connector. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 Fields: tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. Définir le connecteur Sets up the connector's configuration and HTTP session so it can fetch HackerNews data efficiently. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread is called by CocoIndex to initialize the connector, and it sets up a fresh for making HTTP requests. create() aiohttp.ClientSession Télécharger les thèmes disponibles Le Méthode en is responsible for qui correspondent aux critères donnés (tag, résultats max) et renvoient des métadonnées à leur sujet. qui pourraient avoir changé. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) fetches . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal allows CocoIndex to track what threads exist and which have changed without fetching full thread content. Purpose: Fetching Full Thread Content Cette méthode asynchrone obtient une (y compris ses commentaires) de la , et enveloppe le résultat dans un object — the structure CocoIndex uses for row-level ingestion. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) fetches the , including comments. get_value() full content of a specific thread Parsètre le JSON brut en objets Python structurés (_HackerNewsThread + _HackerNewsComment). Retourne un fichier PartialSourceRowData contenant le fil complet. Ordinal Support Tells CocoIndex that this source provides timestamps (ordinals). def provides_ordinal(self) -> bool: return True CocoIndex utilise les ordinaires pour mettre à jour progressivement uniquement les fils modifiés, améliorant ainsi l'efficacité. Parsing JSON into Structured Data This static method takes the raw JSON response from the and turns it into a normalized Objet contenant : API _HackerNewsThread Le message (titre, texte, métadonnées) Tous les commentaires nichés, flattés dans une seule liste Objets Python à temps daté Il exécute une of the comment tree. recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) Converts raw HackerNews API response into and . _HackerNewsThread _HackerNewsComment recursively parses nested comments. _add_comments() Combines + into the main thread content. title text Produces a fully structured object ready for indexing. Tout mettre ensemble dans un flux Your flow now reads exactly like a React component. Définir le flux et connecter la source @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() Process each thread and collect structured information with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) Traiter chaque commentaire d'un thread et collecter des informations structurées with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) Export to database tables message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) CocoIndex now: Lire la suite de HackerNews API tracks changes incrementally Les commentaires de Foucault Exportations à Postgres supports live mode Votre application peut désormais la consulter en tant qu’index de recherche en temps réel. Rechercher et rechercher l'index HackerNews At this point you are done with the index flow. As the next step, you could define query handlers — so you can run queries in CocoInsight. You can use any library or framework of your choice to perform queries. You can read more in the documentation about . Query Handler @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) This code defines a query handler that searches HackerNews threads and comments indexed in CocoIndex. It determines the database table storing the messages, then uses PostgreSQL full-text search ( and Pour trouver les lignes correspondant à la requête. to_tsvector plainto_tsquery Results are ranked by relevance ( ) and creation time, formatted into dictionaries, and returned as a structured Essentiellement, il effectue une recherche de texte complet sur le contenu indexé et fournit des résultats classés et structurés. ts_rank cocoindex.QueryOutput Exécuter votre HackerNews Source personnalisée Once your custom source and flow are ready, running it with CocoIndex is straightforward. You can either ou par HackerNews. update the index on-demand keep it continuously in sync 1 – Créer des dépendances Make sure you have Python installed and then install your project in editable mode: pip install -e . This installs CocoIndex along with all required dependencies, letting you develop and update the connector without reinstalling. Mise à jour de la cible (sur demande) To populate your target (e.g., Postgres) with the latest HackerNews threads: cocoindex update main Only threads that will be re-processed. have changed Your target remains in sync with the . most recent 500 HackerNews threads Des mises à jour incrementelles efficaces permettent d’économiser du temps et des ressources informatiques. Notez que chaque fois que vous exécutez la commande de mise à jour, CocoIndex ne traitera que les fils qui ont changé et gardera la cible en synchronisation avec les 500 fils récents de HackerNews. cocoindex update -L main Runs the flow in , polling HackerNews periodically. live mode CocoIndex automatically handles incremental changes and keeps the target synchronized. Idéal pour les tableaux de bord, la recherche ou les pipelines d’IA qui nécessitent des données en temps réel. 3. Troubleshoot & Inspect with CocoInsight CocoInsight lets you , see the lineage of your data, and understand what’s happening under the hood. visualize and debug your flow Démarrer le serveur : cocoindex server -ci main Ouvrez l’UI dans votre navigateur : https://cocoindex.io/cocoinsight CocoInsight a zéro rétention de données de pipeline - il est sûr pour le débogage et l'inspection de vos flux localement. CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. Notez que cela nécessite la configuration de QueryHandler dans l'étape précédente. What You Can Build Next This simple example opens the door to a lot more: Créer un détecteur de sujets tendance Run LLM summarization pipelines on top of indexed threads Add embeddings + vector search Mirror HN into your internal data warehouse Créer un tableau de bord HN en temps réel Extend to other news sources (Reddit, Lobsters, etc.) Because the whole pipeline is declarative and incremental, extending it is straightforward. Parce que les sources personnalisées vous permettent d'emballer Python logic into an incremental data stream, the best use cases are usually données - systèmes qui n'ont pas de connecteurs de base de données standard, ont un nidage complexe ou nécessitent un pré-traitement lourd. any "Hard-to-Reach" The Knowledge Aggregator for LLM Context Building a context engine for an AI bot often requires pulling from non-standard documentation sources. The "Composite" Entity (Data Stitching) La plupart des entreprises ont des données utilisateur fragmentées sur plusieurs microservices. Vous pouvez créer une source personnalisée qui agit comme un «join virtuel» avant que les données ne touchent jamais votre index. For example the Source: Récupère un identifiant d'utilisateur d'un service Auth (Okta/Auth0). Uses that ID to fetch billing status from . Stripe API Utiliser cet ID pour récupérer les journaux d’utilisation d’un Redis interne. Plutôt que de gérer les ETL complexes qui se joignent en aval, la Source personnalisée produit une CocoIndex suit l'état de cet objet composite ; si l'utilisateur effectue une mise à niveau dans Stripe modifier leur e-mail dans Auth0, l'index est mis à jour automatiquement. User360 or Le « Legacy Wrapper » (la couche de modernisation) Enterprises often have valuable data locked in systems that are painful to query (SOAP, XML, Mainframes). You get a modern, queryable SQL interface (via the CocoIndex target) on top of a 20-year-old system without rewriting the legacy system itself. Public Data Monitor (Competitive Intelligence) Tracking changes on public websites or APIs that don't offer webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: Using the Vous pouvez déclencher des alertes en aval uniquement lorsqu'un changement de prix de > 5% ou une nouvelle réglementation est publiée, plutôt que de spammer votre base de données avec des résultats d'enquête identiques. The CocoIndex Value: diff Why This Matters Les sources personnalisées étendent ce modèle à API — internal, external, legacy, or real-time. any Cela déclenche un modèle simple mais puissant: Si vous pouvez le récupérer, CocoIndex peut l'indexer, le diffuser et le synchroniser. Si vous pouvez le récupérer, CocoIndex peut l'indexer, le diffuser et le synchroniser. Que vous indexiez HackerNews ou orchestriez des dizaines de services d'entreprise, le cadre vous donne une colonne vertébrale stable avec: État persistant deterministic updates Ligne automatique Exportations ciblées flexibles minimal infrastructure overhead ⭐ Try It, Fork It, Star It If you found this useful, a signifie beaucoup – il aide les autres à découvrir CocoIndex et soutient le développement ultérieur. star on GitHub GitHub