In gedistribueerde bestuursarchitektuur verteenwoordig die gesynchronisasie gaping tussen eksterne HTTP-API's en relatiewe database-doelwitte 'n aanhoudende ingenieurswese uitdaging - veral wanneer API-reaksies native transaksie semantiek ontbreek of onthul nie-normale, diep geïmplaseerde JSON-gehelde (kommentaar thread, organisasiële hiërarchies, multi-vlakke BOM strukture). CocoIndex se Custom Source abstraction bied 'n deklaratiewe raamwerk vir die omskakel van willekeurige HTTP / gRPC eindpunte in staatlike, incrementele-synchroniseerde data-strome met ingebouwde checkpointing, inhoud fingerprinting (SHA256-gebaseerde dedup • Statelose API→Stateful Stream transformasie: REST GET versoekings omskep in append-log semantiek • Recursiewe boomkruis met siklusdeteksie: Handhawing van onbeperkte-diepte kommentaar DAGs (Directed Acyclic Graphs) sonder stapel oorstroom • Skema evolusie ondersteuning: Python dataclass-gebaseerde typing met vooruit verenigbaarheid deur middel van strukturele subtyping • Postgres sink connector integrasie: gebruik van psycopg3 async verbinding pooling + COPY protokolle vir bulk insette • GIN-geïndexeerde volledige tekssoekkonfigurasie: materialisering van tsvector-kolomme met Engelse woordeboekstemming + ranking-funksie In hierdie voorbeeld bou ons 'n aangepaste connector vir HackerNews. Dit vang onlangse stories + ingebedde kommentaar, indexeer hulle en blootstel 'n eenvoudige soektog wat deur Postgres se volle tekssoek aangedryf word. Hoekom gebruik jy 'n Custom Source? In baie scenario's lees pipelines nie net van skoon tafels nie. Interne REST dienste Partner van die APIs Legacy stelsels Nie-standaard data modelle wat nie ooreenstem met tradisionele verbindings CocoIndex se Custom Source API maak hierdie integrasies In plaas van ad-hoc-skripte te skryf, wrap jy jou API as 'n "bronkomponent", en CocoIndex neem dit vandaan. Deklarasie Project Walkthrough - Bou 'n HackerNews Index Doelwitte Gebruik die HackerNews Search API Fetch neergeskryf kommentaar Update slegs gemodifiseerde thread Groot inhoud in Postgres Verwys na 'n tekssoek-interface CocoIndex hanteer veranderingsdeteksie, idempotency, lineage en state-sync outomaties. Oorsig Die pijpleiding bestaan uit drie hoofdele: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query Each slegs prosesse verander HN drade en hou alles in sinchronisasie. cocoindex update Die projek is open source en beskikbaar op . Die github Voorwaardes if you don't have one. Install Postgres Definieer die data model Elke aangepaste bron definieer twee ligte datatype: Sleutel tipe identifiseer 'n item uniek Value Type → die volledige inhoud vir die item In Hacker News is elke nuus 'n thread, en elke thread kan verskeie kommentaar hê. For HackerNews, let’s define keys like this: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str Die sleutels moet wees: hashable Serialeer Stabiel (verandert nie oor tyd nie) Waarde hou die werklike dataset: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] Dit vertel CocoIndex presies wat elke HackerNews "item" lyk as dit ten volle opgeneem word. hou 'n post en al sy kommentaar, terwyl represents individual comments. _HackerNewsThread _HackerNewsComment Gebruik 'n Custom Source Connector 'N Custom Source het twee dele: SourceSpec – Deklaratiewe Konfigurasie SourceConnector – bedryfslogika vir die lees van data Skryf die SourceSpec a in CocoIndex is 'n deklaratiewe konfigurasie wat die stelsel vertel en Dit kry nie data self nie - dit word deur die bronverbinding hanteer. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 Die veld: tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. Definieer die connector Stel die konnektor se konfigurasie en HTTP-sessie in sodat dit HackerNews data doeltreffend kan opneem. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread create() word geroep deur CocoIndex om die verbinding te initieel, en dit stel 'n nuwe aiohttp.ClientSession op vir die maak van HTTP versoekings. Verwys na die beskikbare thread die Die metode in is responsible for wat ooreenstem met die gegewe kriteria (tag, max resultate) en metadata oor hulle retourneer. En wat dalk verander het. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) Fetiese . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal Doel: laat CocoIndex toelaat om te volg watter draad bestaan en wat verander het sonder om volle draadinhoud te kry. Verwyder die volledige inhoud Hierdie asynchroniese metode het 'n (Inclusief sy kommentaar) van die , en wraak die resultaat in 'n Object - die struktuur wat CocoIndex gebruik vir ry-vlak ingeslag. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) get_value() vang die volledige inhoud van 'n spesifieke thread, insluitend kommentaar. Parseer die ruwe JSON in gestruktureerde Python-objekte (_HackerNewsThread + _HackerNewsComment). Gee 'n PartialSourceRowData wat die volledige thread bevat. Gewone ondersteuning Vertel CocoIndex dat hierdie bron timestamps (ordinaal) bied. def provides_ordinal(self) -> bool: return True CocoIndex uses ordinals to incrementally update only changed threads, improving efficiency. Parseer JSON in gestruktureerde data Hierdie statiese metode neem die ruwe JSON reaksie van die en verander dit in 'n normaliserende Die voorwerp bevat: API _HackerNewsThread Die pos (titel, teks, metadata) Alle ingesluit kommentaar, gevloei in 'n enkele lys Python datetime voorwerpe Hy verrig a van die kommentaarboom. recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) Converts raw HackerNews API response into and . _HackerNewsThread _HackerNewsComment _add_comments() herhalend parseer ingebedde kommentaar. Combines + into the main thread content. title text Produser 'n volledig gestruktureerde voorwerp gereed vir indeksering. Putting It All Together in a Flow Jou vloei lees nou presies soos 'n React-komponent. Define the flow and connect source @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() Process each thread and collect structured information with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) Verwerk elke kommentaar van 'n thread en versamel gestruktureerde inligting with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) Exporteer na databasetabelle message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) CocoIndex now: Kyk na die HackerNews API Tracks verander geleidelik flattens nested comments Uitvoer na Postgres Ondersteun live mode Jou app kan dit nou as 'n real-time soektog vra. Vra en soek die HackerNews-indeks Op hierdie punt is jy voltooi met die indeks vloei. As die volgende stap, kan jy query beheerders te definieer — sodat jy kan vrae uit te voer in CocoInsight. Jy kan enige biblioteek of raamwerk van jou keuse gebruik om vrae uit te voer. . Vra die handelaar @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) Hierdie kode definieer 'n query handleider wat HackerNews-drome en kommentaar wat in CocoIndex geïndexeer word, soek. and om die rekens te vind wat ooreenstem met die query. to_tsvector plainto_tsquery Die resultate word gerangskik deur relevansie ( ) en skepping tyd, geformateer in woordeboeke, en teruggekeer as 'n gestruktureerde In wese voer dit 'n volledige tekssoek oor die geïndexeerde inhoud uit en lewer gerangschikte, gestruktureerde resultate. ts_rank cocoindex.QueryOutput Running Your HackerNews Custom Source Sodra u aangepaste bron en vloei gereed is, is dit maklik om dit met CocoIndex uit te voer. of with HackerNews. update the index on-demand keep it continuously in sync 1 Bepaal afhanklikheid Maak seker jy het Python geïnstalleer en installeer dan jou projek in bewerkbare modus: pip install -e . This installs CocoIndex along with all required dependencies, letting you develop and update the connector without reinstalling. 2. Update the Target (On-Demand) Om jou doelwit (bv. Postgres) te vul met die nuutste HackerNews-drome: cocoindex update main Slegs draad wat verander het, sal opnieuw verwerk word. Jou doelwit bly gesynchroniseer met die nuutste 500 HackerNews-drome. Effektiewe toenemende opdaterings bespaar tyd en rekenaarbronne. Let daarop dat elke keer as jy die update-bevel uitvoer, sal CocoIndex slegs draad wat verander het, herverwerk en die doelwit in sinkroon hou met die onlangse 500 draad van HackerNews. cocoindex update -L main Runs the flow in , polling HackerNews periodically. live mode CocoIndex hanteer outomaties toenemende veranderinge en hou die teiken gesynchroniseer. Ideaal vir dashboards, soektog of AI-pijpleine wat real-time data benodig. Troubleshoot & Inspect met CocoInsight CocoInsight laat jou , sien die lineage van jou data, en verstaan wat onder die hoed gebeur. visualize and debug your flow Start the server: cocoindex server -ci main Open dan die UI in jou leser: https://cocoindex.io/cocoinsight CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. Let daarop dat dit die QueryHandler-opstelling in die vorige stap vereis. Wat jy volgende kan bou Hierdie eenvoudige voorbeeld maak die deur oop vir baie meer: Maak 'n trending-topic detector Run LLM summarization pipelines on top of indexed threads Add embeddings + vector search Mirror HN in jou interne data-opslag Maak 'n real-time HN dashboard Extend to other news sources (Reddit, Lobsters, etc.) Because the whole pipeline is declarative and incremental, extending it is straightforward. Omdat Custom Sources jou toelaat om te wrap Python logic into an incremental data stream, the best use cases are usually data-stelsels wat nie standaard databasiskoppelings het nie, komplekse nesting het of swaar voorverwerking vereis. any "Hard-to-Reach" Die Kennis Aggregator vir LLM konteks Die bou van 'n konteksmotor vir 'n AI-bot vereis dikwels om van nie-standaarddokumentasie bronne te trek. Die “Composite” Entiteit (Data Stitching) Die meeste maatskappye het gebruikersdata gefragmenteer oor verskeie microservices.Jy kan 'n Aangepaste bron bou wat optree as 'n "virtuele aansluiting" voordat die data ooit jou indeks tref. For example the Source: Herhaal 'n gebruiker-ID van 'n Auth-diens (Okta/Auth0). Gebruik die ID om die rekeningstatus van die Stripe API te kry. Gebruik daardie ID om gebruik logboeke van 'n Internal Redis te kry. Instead of managing complex ETL joins downstream, the Custom Source yields a single CocoIndex volg die toestand van hierdie samestelde voorwerp; as die gebruiker opgrader in Stripe verander hul e-pos in Auth0, die indeks word outomaties opgedateer. User360 of The "Legacy Wrapper" (Modernization Layer) Ondernemings het dikwels waardevolle data gesluit in stelsels wat pynlik is om te queryer (SOAP, XML, Mainframes). Jy kry 'n moderne, queryable SQL-interface (via die CocoIndex-doel) op 'n 20-jarige stelsel sonder om die oorlewende stelsel self te herschryf. Openbare Data Monitor (Competitive Intelligence) Tracking changes on public websites or APIs that don't offer webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: Using the U kan downstream waarschuwings slegs veroorsaak wanneer 'n prysverandering met > 5% of 'n nuwe regulasie geplaas word, eerder as om u databasis met identiese pollresultate te spamming. The CocoIndex Value: diff Hoekom dit saak maak Custom Sources brei hierdie model uit na API - interne, eksterne, oorlewende of real-time. enigiets Dit ontbloot 'n eenvoudige maar kragtige patroon: As jy dit kan hanteer, kan CocoIndex dit indekseer, diffuseer en sinchroniseer. If you can fetch it, CocoIndex can index it, diff it, and sync it. Whether you’re indexing HackerNews or orchestrating dozens of enterprise services, the framework gives you a stable backbone with: Permanente staat deterministic updates Automatiese ligging Fleksibele doelwitte eksport Minimale infrastruktuur oorhead Probeer dit, Fork dit, Star dit As jy dit nuttig vind, a Dit beteken baie – dit help ander om CocoIndex te ontdek en ondersteun verdere ontwikkeling. star on GitHub GitHub