Sa targeted backend system architecture, ang synchronization gap sa pagitan ng external HTTP APIs at relational database targets ay isang pangkalahatang engineering challenge—particularly kapag ang mga responses ng API ay walang native transaction semantics o i-expose non-normalized, deeply nested JSON payloads (comment threads, organizational hierarchies, multi-level BOM structures). Ang CocoIndex's Custom Source abstraction ay nagbibigay ng isang declarative framework para sa pag-convert arbitrary HTTP/gRPC endpoints sa stateful, incrementally-synchronized data streams na may built-in checkpointing, content fingerprinting (SHA256-based deduplication), at differential update propagation. Hindi tulad ng imperative polling scrip • Stateless API→Stateful Stream transformation: I-convert REST GET requests sa append-log semantics • Recursive tree traversal na may cycle detection: Paggamit ng unlimited-depth comment DAGs (Directed Acyclic Graphs) nang walang stack overflow • Schema evolution support: Python dataclass-based typing na may forward compatibility sa pamamagitan ng structural subtyping • Postgres sink connector integration: Paggamit ng psycopg3 async connection pooling + COPY protocol para sa bulk inserts • GIN-indexed full-text search configuration: Materializing tsvector columns sa pamamagitan ng English dictionary stemming + ranking function Sa halimbawa na ito, kami ay bumuo ng isang custom connector para sa HackerNews. Itakda ito ng mga pinakabagong mga kasaysayan + nests comments, index ang mga ito, at i-expose ang isang simpleng paghahanap interface na pinamamahala ng Postgres full-text search. Bakit gamitin ang Custom Source? Sa maraming mga scenario, ang pipelines ay hindi lamang makikita mula sa clean tables. Internal REST mga serbisyo Mga Partner ng APIs Mga sistema ng legacy Non-standard na mga modelo ng data na hindi matatagpuan sa mga tradisyonal na connectors Ang Custom Source API ng CocoIndex ay gumagawa ng mga integrasyon na ito Sa halip ng pagsasalin ng ad-hoc scripts, i-wrap ang iyong API bilang isang "source component," at ang CocoIndex ay umuwi ito mula dito. Deklarasyon Proyekto Walkthrough — Pagbuo ng isang HackerNews Index ang target Tungkol sa HackerNews Search API Ang mga komento ni Fetch Update lamang ang mga modified threads Store content in Postgres I-expose ang text search interface CocoIndex handles change detection, idempotency, lineage, and state sync automatically. ang overview Ang pipeline ay binubuo ng tatlong pangunahing bahagi: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query Lahat ng Only processes changed HN threads and keeps everything in sync. cocoindex update Ang proyekto ay open source at magagamit sa . GitHub Ang mga prerequisite if you don't have one. Install Postgres Paglalarawan ng Data Model Ang bawat custom source ay naglalaman ng dalawang mga uri ng data na madali: Key Type → Unicely identifices isang item Value Type → the full content for that item Sa Hacker News, ang bawat news ay isang thread, at ang bawat thread ay maaaring magkaroon ng ilang mga komento. Para sa HackerNews, tingnan natin ang mga keys tulad ng ito: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str Keys must be: Mga hashtag Seryoso Stable (hindi mababago sa panahon) Ang mga halaga ay naglalaman ng actual dataset: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] Ito ay nagpapakita ng CocoIndex kung ano ang lahat ng HackerNews "item" ay nakikita kapag pinagsimula na. isang post at lahat ng kanyang mga komento, habang Ang mga komento ay individual. _HackerNewsThread _HackerNewsComment Paggawa ng isang Custom Source Connector Ang isang Custom Source ay may dalawang bahagi: — declarative configuration SourceSpec SourceConnector — operational logic para sa pagbabasa ng data Tungkol sa SourceSpec A in CocoIndex is a declarative configuration that tells the system at ang Hindi ito makakuha ng data sa sarili — ito ay ginagamit sa pamamagitan ng source connector. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 ang mga field: tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. Ipasok ang connector I-set up ang configuration ng connector at HTTP session upang ito ay maaaring makakuha ng HackerNews data efficiently. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread Create() ay tinutukoy sa pamamagitan ng CocoIndex upang i-initialize ang connector, at ito ay lumikha ng isang bagong aiohttp.ClientSession para sa paggawa ng HTTP requests. Paglalarawan ng Available Threads ang ang metriko in ang responsibilidad para sa ang mga criteria (tag, max results) at ibalik ang metadata tungkol sa mga ito. CocoIndex gumagamit ito upang Sa tingin mo ay may mga changes. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) mga fetish . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal Purpose: Nagbibigay ng CocoIndex upang i-track kung ano ang mga thread na mayroon at kung ano ang nagbabago nang walang pagkuha ng buong content ng thread. Paglalarawan ng Full Thread Content Ang metriko para sa espasyo-panahong Schwarzschild na may sistemang koordinatong ( (Tulad ng mga komento) sa pamamagitan ng , at i-wrap ang resulta sa a Object — ang strukturong CocoIndex ay gumagamit para sa line-level ingestion. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) get_value() makuha ang buong nilalaman ng isang anumang thread, kabilang ang mga komento. Parses ang raw JSON sa mga strukturadong Python objects (_HackerNewsThread + _HackerNewsComment). Ibalik ang isang PartialSourceRowData na naglalaman ng buong thread. Suporta sa regular Sinasabi ng CocoIndex na ang source na ito ay nagbibigay ng timestamps (ordinals). def provides_ordinal(self) -> bool: return True Ang CocoIndex ay gumagamit ng ordinals upang i-update lamang ang mga tinutukoy na thread, pagbutihin ang efficiency. Parsing JSON into Structured Data Ang metriko para sa espasyo-panahong Schwarzschild na may sistemang koordinatong ( Paggawa ng isang normalized Mga Object na naglalaman: API _HackerNewsThread Ang post (titulo, teksto, metadata) Lahat ng mga komento, flattened sa isang single list Paglalarawan ng Python Datetime Objects Siya ay nagtatrabaho a sa pamamagitan ng comment tree. recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) Converts raw HackerNews API response into and . _HackerNewsThread _HackerNewsComment _add_comments() recursively parses ang mga sumusunod na mga komento. I-combine ang title + text sa main thread content. I-produce ang isang ganap na-structured na object na ready para sa indexing. Pumunta ang lahat sa isang flow Ngayon ang iyong flow ay mag-read na tulad ng isang React component. I-definite ang flow at i-connect ang source @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() Process each thread and collect structured information with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) Process each comment of a thread and collect structured information with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) Export sa mga database tables message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) Ang CocoIndex ngayon ay: polls the HackerNews API tracks changes incrementally ang napili ng mga taga-hanga: exports to Postgres I-support ang live mode Ang iyong app ngayon ay maaaring i-query ito bilang isang real-time search index. Tungkol sa HackerNews Index At this point you are done with the index flow. As the next step, you could define query handlers — so you can run queries in CocoInsight. You can use any library or framework of your choice to perform queries. You can read more in the documentation about . Tungkol sa Handler @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) This code defines a query handler that searches HackerNews threads and comments indexed in CocoIndex. It determines the database table storing the messages, then uses PostgreSQL full-text search ( at ang (Idinirekta ang mga sumusunod na mga question. to_tsvector plainto_tsquery Mga pahinang tumuturo sa pag-aaral ( ) and creation time, formatted into dictionaries, and returned as a structured Sa pangkalahatan, ito ay gumagawa ng isang full-text na paghahanap sa indexed na nilalaman at nagbibigay ng ranking, mga resulta. ts_rank cocoindex.QueryOutput Running Your HackerNews Custom Source Kapag ang iyong custom na source at flow ay nagsisimula, mag-execute ito sa CocoIndex ay simpleng. o sa sa pamamagitan ng HackerNews. update the index on-demand keep it continuously in sync I-install ang mga dependencies Siguraduhin na ikaw ay may Python na naka-install at pagkatapos ay i-install ang iyong proyekto sa editable mode: pip install -e . Ito ay i-install CocoIndex kasama ang lahat ng mga kinakailangan dependencies, na nagbibigay-daan sa iyo upang bumuo at i-update ang connector nang walang reinstall. 2. Update the Target (On-Demand) Upang i-populate ang iyong target (e.g., Postgres) sa mga pinakabagong HackerNews threads: cocoindex update main Ang iba't ibang mga thread na may mga pagbabago ay magproseso. Ang iyong target ay matatagpuan sa mga pinakabagong 500 HackerNews thread. Efficient incremental updates i-save ang oras at mga resource ng computing. Tingnan na sa bawat pagkakataon na i-execute ang update command, CocoIndex lamang ang re-process threads na may pagbabago, at patuloy ang target sa sync sa mga nakaraang 500 threads mula sa HackerNews. Maaari mo rin i-execute ang update command sa live mode, na kung saan ay patuloy ang target sa sync sa source: cocoindex update -L main Runs the flow in , polling HackerNews periodically. live mode Ang CocoIndex ay automatically nagtatrabaho sa mga incremental na pagbabago at nagtatrabaho ang target synchronized. Ideal para sa dashboards, search, o AI pipelines na kailangan ng data real-time. Pagbabago ng Problema at Inspect sa CocoInsight CocoInsight ay nagbibigay sa iyo , see the lineage of your data, and understand what’s happening under the hood. visualize and debug your flow Start the server: cocoindex server -ci main Pagkatapos, i-open ang UI sa iyong browser: https://cocoindex.io/cocoinsight Ang CocoInsight ay may zero pipeline data retention - ito ay safe para sa pag-debug at pag-inspect ng iyong mga stream sa lokal. Ang CocoInsight ay may zero pipeline data retention - ito ay safe para sa pag-debug at pag-inspect ng iyong mga stream sa lokal. Note that this requires QueryHandler setup in previous step. Ano ang maaari mong bumuo next Ang simpleng halimbawa na ito ay nagpapahiwatig ng mga door to a lot more: Paggawa ng isang trending-topic detector Run LLM summarization pipelines sa itaas ng indexed threads Magbigay ng mga embeddings + vector search Mirror HN sa iyong internal data warehouse Paggawa ng isang real-time HN dashboard Extend sa iba pang mga news source (Reddit, Lobsters, at iba pa) Dahil ang buong pipeline ay declarative at incremental, pag-extend ito ay simpleng. Dahil ang Custom Sources ay nagbibigay-daan sa iyo upang wrap Python logic sa isang incremental data stream, ang pinakamahusay na mga kaso ng paggamit ay karaniwang data—sistema na walang mga standard database connectors, may karaniwang pag-nisting, o nangangailangan ng mababang pre-processing. ang lahat "Hard-to-Reach" The Knowledge Aggregator for LLM Context Pagbuo ng isang kontekstong engine para sa isang AI bot ay karaniwang nangangailangan ng pagdiriwang mula sa mga non-standard na mga source ng dokumento. Ang “Composite” Entity (Data Stitching) Ang karamihan ng mga kumpanya ay may mga data ng mga gumagamit na napag-fragmented sa iba't-ibang microservices. Maaari mong bumuo ng isang Custom Source na gumagana bilang isang "virtual join" bago ang data nangyari sa iyong index. For example the Source: Pagkuha ng isang User ID mula sa isang Auth Service (Okta/Auth0). Ginagamit ang ID na ito upang makuha ang status ng billing mula sa Stripe API. Ginagamit ang ID na ito upang makakuha ng mga log ng paggamit mula sa isang Internal Redis. Sa halip ng pag-manage ng mga kompleksong ETL na sumali sa downstream, ang Custom Source ay nagbibigay ng isang single CocoIndex tracks ang estado ng komposit na ito; kung ang user ay pag-upgrade sa Stripe i-change ang kanilang email sa Auth0, ang index update automatically. User360 o sa Ang “Legacy Wrapper” (Modernization Layer) Ang mga negosyo ay karaniwang may mga mahalagang data na naka-locked sa mga sistema na napaka-query (SOAP, XML, Mainframes). Maaari mong makakuha ng isang modernong, queryable SQL interface (sa pamamagitan ng CocoIndex target) sa itaas ng isang sistema na 20 taon na ang nakaraan nang walang rewrite ang legacy system mismo. Public Data Monitor (Competitive Intelligence) I-track ang mga pagbabago sa mga publikong website o APIs na hindi nag-aalok ng webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: Paggamit ng Sa paglipas ng mga kapangyarihan, maaari mong i-activate ang mga alerts downstream lamang kapag nag-post ng isang pagbabago ng presyo sa >5% o isang bagong regulasyon, hindi na spamming ang iyong database na may parehong mga resulta ng polling. The CocoIndex Value: diff Bakit importante ang Custom Sources ipinakita ang modelo na ito sa API — internal, external, legacy, o real-time. ang lahat Ito ay nagpapakita ng isang simpleng ngunit malakas na pattern: Kung maaari mong makuha ito, CocoIndex ay maaaring index ito, diff ito, at i-sync ito. Kung maaari mong makuha ito, CocoIndex ay maaaring index ito, diff ito, at i-sync ito. Kung ikaw ay indexing sa HackerNews o orchestrating mga duta ng mga serbisyo ng enterprise, ang framework ay nagbibigay sa iyo ng isang stable backbone na may: Persistent ang estado Mga Deterministic Update Automatic ang lineage flexible target exports minimal infrastructure overhead I-Try It, Fork It, Star It If you found this useful, a Ito ay nag-aalok ng maraming bagay - ito ay tumutulong sa iba't ibang mga tao na makikita ang CocoIndex at sumusuportahan ang karagdagang pag-unlad. star on ang github ang github