En la arquitectura de sistemas distribuidos personalizados, la brecha de sincronización entre las APIs HTTP externas y los objetivos de bases de datos relacionales representa un desafío de ingeniería persistente, especialmente cuando las respuestas de API carecen de semántica transaccional nativa o exponen cargas de datos JSON no normalizadas y profundamente enraizadas (fragmentos de comentarios, jerarquías organizativas, estructuras de BOM de varios niveles). La abstracción de fuente personalizada de CocoIndex proporciona un marco declarativo para convertir los puntos finales arbitrarios de HTTP/gRPC en flujos de datos estatales, incrementalmente sincronizados con un marcado de control integrado, impresión digital de contenido (deduplicación basada en SHA256) y propagación de contenido diferencial. A diferencia • Stateless API→Stateful Stream transformation: Converting REST GET requests into append-log semantics • Recursive tree traversal with cycle detection: Handling unbounded-depth comment DAGs (Directed Acyclic Graphs) without stack overflow • Soporte para la evolución de esquemas: Tipado basado en clases de datos de Python con compatibilidad avanzada a través de subtipos estructurales • Integración de conectores de sink de Postgres: aprovechando el agrupamiento de conexiones asíncicas de psycopg3 + el protocolo COPY para inserciones en masa • GIN-indexed full-text search configuration: Materializing tsvector columns with English dictionary stemming + ranking functions En este ejemplo, construimos un conector personalizado para HackerNews. recoge historias recientes + comentarios enmarcados, los indexa y expone una interfaz de búsqueda simple alimentada por la búsqueda de texto completo de Postgres. Why Use a Custom Source? In many scenarios, pipelines don't just read from clean tables. They depend on: Internal REST services Partner APIs Sistemas de legado Modelos de datos no estándar que no se ajustan a los conectores tradicionales CocoIndex’s Custom Source API makes these integrations , incremental, and safe by default. Instead of writing ad-hoc scripts, you wrap your API as a “source component,” and CocoIndex takes it from there. declarative Project Walkthrough — Building a HackerNews Index Objetivos Call HackerNews Search API Fechos de los comentarios Actualizaciones sólo en threads modificados Store content in Postgres Exponer una interfaz de búsqueda de texto CocoIndex handles change detection, idempotency, lineage, and state sync automatically. Overview The pipeline consists of three major parts: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query Cada uno only processes changed HN threads and keeps everything in sync. cocoindex update The project is open source and available on . GitHub Prerequisites if you don't have one. Install Postgres Definición del modelo de datos Every custom source defines two lightweight data types: Tipo de clave → identifica de forma única un elemento Value Type → the full content for that item In Hacker News, each news is a thread, and each thread can have multiple comments. For HackerNews, let’s define keys like this: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str Keys must be: hashable serializable Estable (no cambia con el tiempo) Values hold the actual dataset: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] This tells CocoIndex exactly what every HackerNews “item” looks like when fully fetched. un post y todos sus comentarios, mientras represents individual comments. _HackerNewsThread _HackerNewsComment Building a Custom Source Connector A Custom Source has two parts: — declarative configuration SourceSpec SourceConnector – lógica operativa para leer datos Escribir el SourceSpec A in CocoIndex is a declarative configuration that tells the system and to a source. It doesn’t fetch data itself — that’s handled by the source connector. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 Fields: tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. Defining the connector Sets up the connector's configuration and HTTP session so it can fetch HackerNews data efficiently. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread create() es llamado por CocoIndex para inicializar el conector, y establece una nueva aiohttp.ClientSession para hacer solicitudes HTTP. Listing Available Threads The method in Es responsable de que coinciden con los criterios dados (tag, resultados máximos) y que devuelven metadatos sobre ellos. and which may have changed. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) fetches . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal Propósito: Permite a CocoIndex rastrear qué threads existen y cuáles han cambiado sin obtener el contenido completo del thread. Fetching Full Thread Content This async method fetches a (including its comments) from the , y envuelve el resultado en un object — the structure CocoIndex uses for row-level ingestion. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) fetches the , including comments. get_value() full content of a specific thread Parses the raw JSON into structured Python objects ( + ). _HackerNewsThread _HackerNewsComment Returns a containing the full thread. PartialSourceRowData Ordinal Support Tells CocoIndex that this source provides timestamps (ordinals). def provides_ordinal(self) -> bool: return True CocoIndex uses ordinals to incrementally update only changed threads, improving efficiency. Convertir JSON en datos estructurados This static method takes the raw JSON response from the que lo convierte en una norma Objeto que contiene: API _HackerNewsThread La publicación (título, texto, metadatos) All nested comments, flattened into a single list Objetos de Python en el tiempo de fecha Se realiza a of the comment tree. recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) Converts raw HackerNews API response into and . _HackerNewsThread _HackerNewsComment _add_comments() retrasa recursivamente los comentarios enmarcados. Combines + into the main thread content. title text Produces a fully structured object ready for indexing. Ponerlo todo juntos en un flujo Su flujo ahora se lee exactamente como un componente de React. Define el flujo y conecte la fuente @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() Procesar cada thread y recopilar información estructurada with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) Process each comment of a thread and collect structured information with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) Export to database tables message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) CocoIndex now: Más información sobre HackerNews API Las rutas cambian progresivamente Comentarios desactivados en Florencio Exportaciones a Postgres Soporte para el Live Mode Su aplicación ahora puede consultarlo como un índice de búsqueda en tiempo real. Buscar y buscar el índice HackerNews At this point you are done with the index flow. As the next step, you could define query handlers — so you can run queries in CocoInsight. You can use any library or framework of your choice to perform queries. You can read more in the documentation about . Quería manejar @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) Este código define un manipulador de consultas que busca los temas de HackerNews y los comentarios indexados en CocoIndex. Determina la tabla de bases de datos que almacena los mensajes, luego utiliza la búsqueda de texto completo de PostgreSQL ( y ) to find rows matching the query. to_tsvector plainto_tsquery Los resultados se clasifican por relevancia ( ) and creation time, formatted into dictionaries, and returned as a structured Básicamente, realiza una búsqueda de texto completo sobre el contenido indexado y proporciona resultados clasificados y estructurados. ts_rank cocoindex.QueryOutput Running Your HackerNews Custom Source Once your custom source and flow are ready, running it with CocoIndex is straightforward. You can either or with HackerNews. update the index on-demand keep it continuously in sync 1. Install Dependencies Make sure you have Python installed and then install your project in editable mode: pip install -e . This installs CocoIndex along with all required dependencies, letting you develop and update the connector without reinstalling. 2. Update the Target (On-Demand) Para llenar tu meta (por ejemplo, Postgres) con los últimos threads de HackerNews: cocoindex update main Only threads that will be re-processed. have changed Your target remains in sync with the . most recent 500 HackerNews threads Las actualizaciones incrementales eficientes ahorran tiempo y recursos de computación. Note that each time when you run the update command, CocoIndex will only re-process threads that have changed, and keep the target in sync with the recent 500 threads from HackerNews. You can also run update command in live mode, which will keep the target in sync with the source continuously: cocoindex update -L main Runs the flow in , polling HackerNews periodically. live mode CocoIndex automatically handles incremental changes and keeps the target synchronized. Ideal para dashboards, búsquedas o tuberías de IA que requieren datos en tiempo real. 3. Troubleshoot & Inspect with CocoInsight CocoInsight lets you , see the lineage of your data, and understand what’s happening under the hood. visualize and debug your flow Start the server: cocoindex server -ci main A continuación, abra la UI en su navegador: https://cocoindex.io/cocoinsight CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. Tenga en cuenta que esto requiere la configuración de QueryHandler en el paso anterior. What You Can Build Next Este sencillo ejemplo abre la puerta a mucho más: Crea un detector de tendencias Run LLM summarization pipelines on top of indexed threads Add embeddings + vector search Mirror HN into your internal data warehouse Build a real-time HN dashboard Extend to other news sources (Reddit, Lobsters, etc.) Because the whole pipeline is declarative and incremental, extending it is straightforward. Dado que las fuentes personalizadas te permiten envolver La lógica de Python en un flujo de datos incremental, los mejores casos de uso suelen ser datos: Sistemas que no tienen conectores de base de datos estándar, tienen un nicho complejo o requieren un preprocesamiento pesado. any "Hard-to-Reach" El agregador de conocimientos para el contexto LLM Building a context engine for an AI bot often requires pulling from non-standard documentation sources. The "Composite" Entity (Data Stitching) La mayoría de las empresas tienen datos de usuarios fragmentados en múltiples microservicios.Puede crear una fuente personalizada que actúe como una "junta virtual" antes de que los datos alcancen su índice. For example the Source: Recibe un ID de usuario de un servicio de Auth (Okta/Auth0). Utiliza ese ID para obtener el estado de facturación de la API de Stripe. Uses that ID to fetch usage logs from an . Internal Redis Instead of managing complex ETL joins downstream, the Custom Source yields a single object. CocoIndex tracks the state of this composite object; if the user upgrades in Stripe changes their email in Auth0, the index updates automatically. User360 o The "Legacy Wrapper" (Modernization Layer) Enterprises often have valuable data locked in systems that are painful to query (SOAP, XML, Mainframes). You get a modern, queryable SQL interface (via the CocoIndex target) on top of a 20-year-old system without rewriting the legacy system itself. Public Data Monitor (Competitive Intelligence) Tracking changes on public websites or APIs that don't offer webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: Usando el Puede desencadenar alertas a continuación sólo cuando se publica un cambio de precio de >5% o una nueva regulación, en lugar de enviar spam a su base de datos con resultados de encuestas idénticos. The CocoIndex Value: diff Why This Matters Las fuentes personalizadas extienden este modelo a API: interno, externo, legado o en tiempo real. any Esto desbloquea un patrón simple pero poderoso: Si puede recuperarlo, CocoIndex puede indexarlo, difundirlo y sincronizarlo. If you can fetch it, CocoIndex can index it, diff it, and sync it. Whether you’re indexing HackerNews or orchestrating dozens of enterprise services, the framework gives you a stable backbone with: Estado persistente deterministic updates automatic lineage flexible target exports minimal infrastructure overhead ⭐ Try It, Fork It, Star It If you found this useful, a significa mucho: ayuda a otros a descubrir CocoIndex y apoya el desarrollo adicional. star on GitHub GitHub