In distributed systems architecture, the synchronization gap between external HTTP APIs and relational database targets represents a persistent engineering challenge—particularly when API responses lack native transactional semantics or expose non-normalized, deeply nested JSON payloads (comment threads, organizational hierarchies, multi-level BOM structures). CocoIndex's Custom Source abstraction provides a declarative framework for converting arbitrary HTTP/gRPC endpoints into stateful, incrementally-synced data streams with built-in checkpointing, content fingerprinting (SHA256-based deduplication), and differential update propagation. Unlike imperative polling scripts (cron jobs invoking curl + jq + psql INSERT), Custom Sources implement a **pull-based CDC (Change Data Capture) model** via ordinal watermarking—analogous to Kafka Connect source connectors, but optimized for HTTP APIs without native offset management. The framework maintains persistent state (last-seen timestamp vectors, content hashes) in a local metadata store (SQLite by default, configurable to Postgres/Redis for multi-process deployments), enabling exactly-once delivery semantics and automatic retry logic with exponential backoff. This tutorial demonstrates building a production-ready Custom Source targeting the HackerNews Algolia Search API (hn.algolia.com/api/v1), implementing: • Stateless API→Stateful Stream transformation: Converting REST GET requests into append-log semantics • د چرګانو د څارنې سره recursive tree traversal: د بیلابیلو عمومي تبصرې DAGs (Directed Acyclic Graphs) پرته د stack overflow کاروي • Schema evolution ملاتړ: د Python ډاټا کلاس پر بنسټ د ټیپ کولو سره د جوړښت subtyping له لارې forward مطابقت • Postgres سینک کنیکټر انټرنټ: د psycopg3 async کنیکټینګ پولینګ + COPY پروتوکول د بیلابیلو انټرنټونو لپاره کارول • GIN-indexed full-text search configuration: Materializing tsvector columns with English dictionary stemming + ranking functions In this example, we build a custom connector for HackerNews. It fetches recent stories + nested comments, indexes them, and exposes a simple search interface powered by Postgres full-text search. Why Use a Custom Source? In many scenarios, pipelines don't just read from clean tables. They depend on: د داخلي REST خدمتونه Partner APIs Legacy systems Non-standard data models that don’t fit traditional connectors د CocoIndex Custom Source API د دې انټرنټونو لپاره کار کوي , incremental, and safe by default. Instead of writing ad-hoc scripts, you wrap your API as a “source component,” and CocoIndex takes it from there. declarative د پروژې Walkthrough - د HackerNews Index جوړولو هدفونه د HackerNews Search API په اړه Fetch nested comments یوازې د ترمیم شوي ټریډونه Update only modified threads Store content in Postgres Expose a text search interface CocoIndex په اتوماتيک ډول د بدلون تشخیص، idempotency، lineage، او حالت sync کاروي. په اړه The pipeline consists of three major parts: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query هر یوازې پروسسونه د HN ټریډونه بدلون کوي او د ټولو سره سمبالوي. cocoindex update The project is open source and available on . GitHub Prerequisites if you don't have one. Install Postgres Defining the Data Model Every custom source defines two lightweight data types: د کلید ډول → یوځای ډول یو موضوع ته شناسایی کوي Value Type → د دې توکي بشپړ محتویات In Hacker News, each news is a thread, and each thread can have multiple comments. For HackerNews, let’s define keys like this: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str Keys must be: هټۍ د serialization stable (doesn’t change over time) Values hold the actual dataset: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] دا د CocoIndex په سمه توګه چمتو کوي چې هر HackerNews "پړاو" کله چې په بشپړه توګه راټول شوی دی. holds a post and all its comments, while represents individual comments. _HackerNewsThread _HackerNewsComment د Custom Source Connector جوړولو A Custom Source has two parts: — declarative configuration SourceSpec SourceConnector - د معلوماتو په اړه د عملیاتو منطق د SourceSpec لیکنه A in CocoIndex is a declarative configuration that tells the system او to a source. It doesn’t fetch data itself — that’s handled by the source connector. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 د ځمکې: tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. Defining the connector Sets up the connector's configuration and HTTP session so it can fetch HackerNews data efficiently. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread is called by CocoIndex to initialize the connector, and it sets up a fresh for making HTTP requests. create() aiohttp.ClientSession د موجودو ټریډونو لیست The په د مسلکي that match the given criteria (tag, max results) and returning metadata about them. CocoIndex uses this to and which may have changed. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) نندارې . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal allows CocoIndex to track what threads exist and which have changed without fetching full thread content. Purpose: Fetching Full Thread Content This async method fetches a (including its comments) from the , and wraps the result in a object — the structure CocoIndex uses for row-level ingestion. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) get_value() د ځانګړي ټیم، په شمول د تبصرې بشپړ محتویات راټولوي. Parses the raw JSON into structured Python objects ( + ). _HackerNewsThread _HackerNewsComment د PartialSourceRowData چې د بشپړ ټیم لري راځي. Ordinal Support CocoIndex وايي چې دا سرچينې د وخت ټیمپونه (ordinals) وړاندې کوي. def provides_ordinal(self) -> bool: return True CocoIndex کاروي ordinals ته په کثافاتو وختونو کې تازه یوازې بدل شوي ټریډونه، د اغیزمنې د ښه. د JSON په جوړ شوي معلوماتو کې Parsing This static method takes the raw JSON response from the and turns it into a normalized object containing: API _HackerNewsThread The post (title, text, metadata) All nested comments, flattened into a single list Proper Python datetime objects It performs a of the comment tree. recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) د خام HackerNews API ځواب په _HackerNewsThread او _HackerNewsComment بدل کړئ. recursively parses nested comments. _add_comments() د نوم + متن په اصلي ټیم موادو کې ترکیب کوي. د انډیز کولو لپاره چمتو یو بشپړ جوړ شوي اجزا تولید کوي. Putting It All Together in a Flow ستاسو د جریان اوس د React برخې په څیر ښکاري. د جریان د تعریف او د سرچینې اړیکه @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() Process each thread and collect structured information with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) Process each comment of a thread and collect structured information with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) د ډاټا ډاټا جدولونو ته صادراتو message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) CocoIndex اوس: د HackerNews API د پړاو بدلون flattens nested comments exports to Postgres د ژوند موډل ملاتړ Your app can now query it as a real-time search index. Querying & Searching the HackerNews Index At this point you are done with the index flow. As the next step, you could define query handlers — so you can run queries in CocoInsight. You can use any library or framework of your choice to perform queries. You can read more in the documentation about . د غوښتنلیک Handler @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) This code defines a query handler that searches HackerNews threads and comments indexed in CocoIndex. It determines the database table storing the messages, then uses PostgreSQL full-text search ( او ) to find rows matching the query. to_tsvector plainto_tsquery د پایلو په اړه د relevance ( ) and creation time, formatted into dictionaries, and returned as a structured . Essentially, it performs a full-text search over the indexed content and delivers ranked, structured results. ts_rank cocoindex.QueryOutput د خپل HackerNews Custom سرچینې چلولو Once your custom source and flow are ready, running it with CocoIndex is straightforward. You can either یا with HackerNews. update the index on-demand keep it continuously in sync 1. Install Dependencies Make sure you have Python installed and then install your project in editable mode: pip install -e . This installs CocoIndex along with all required dependencies, letting you develop and update the connector without reinstalling. 2. Update the Target (On-Demand) ستاسو هدف (د مثال په توګه، Postgres) سره د وروستیو HackerNews نندارتونونه پوښښ: cocoindex update main یوازې د ټریډونه چې بدل شوي دي به د دوبې پروسس شي. Your target remains in sync with the . most recent 500 HackerNews threads Efficient incremental updates save time and compute resources. په یاد ولرئ چې هر وخت کله چې تاسو د تازه کولو امر ترسره کړئ، CocoIndex به یوازې د بدلون شوي ټریډونه بیا پروسسوي، او د ټریډ د وروستیو 500 ټریډونو څخه د HackerNews سره سمبالوي. تاسو کولی شئ د تازه کولو ټریډ په ژوندۍ حالت کې هم ترسره کړئ، کوم چې به د ټریډ د سرچینې سره سمبالوي په دوامداره توګه: cocoindex update -L main د جریان په ژوندۍ حالت کې چلول کیږي، د HackerNews په دوامداره توګه ووځي. CocoIndex به په اتوماتيک ډول د اضافي بدلونونو په کارولو سره وده ورکوي او د هدف سره سمون ورکوي. د ډیزاین بورډونو، د څیړنې یا AI پایپولونو لپاره مناسب دی چې د واقعي وخت ډاټا ته اړتیا لري. 3. Troubleshoot & Inspect with CocoInsight CocoInsight lets you , see the lineage of your data, and understand what’s happening under the hood. visualize and debug your flow د سرور پیل کړئ: cocoindex server -ci main بیا په خپل براؤزر کې د UI پیل کړئ: https://cocoindex.io/cocoinsight CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. Note that this requires QueryHandler setup in previous step. هغه څه چې تاسو کولی شئ وروسته جوړ کړئ This simple example opens the door to a lot more: Build a trending-topic detector Run LLM summarization pipelines on top of indexed threads Add embeddings + vector search Mirror HN into your internal data warehouse Build a real-time HN dashboard Extend to other news sources (Reddit, Lobsters, etc.) Because the whole pipeline is declarative and incremental, extending it is straightforward. Since Custom Sources allow you to wrap Python logic into an incremental data stream, the best use cases are usually data—systems that don't have standard database connectors, have complex nesting, or require heavy pre-processing. any "Hard-to-Reach" The Knowledge Aggregator for LLM Context Building a context engine for an AI bot often requires pulling from non-standard documentation sources. د "Composite" انټیت (Data Stitching) ډیری شرکتونه د کاروونکي ډاټا په ډیرو microservices کې پراختیا لري. تاسو کولی شئ د ګمرک شوي سرچینه جوړ کړئ چې د "ویورټل یوځای" په توګه کار کوي مخکې چې ډاټا ستاسو په انډیزه کې ځي. For example the Source: د Auth خدمت (Okta / Auth0) څخه د کارن ID ترلاسه کوي. د دې ID کاروي چې د Stripe API څخه د حساب کولو حالت ترلاسه کړي. د دې ID کاروي چې د Internal Redis څخه د کارولو ریکارډونه ترلاسه کړي. Instead of managing complex ETL joins downstream, the Custom Source yields a single object. CocoIndex tracks the state of this composite object; if the user upgrades in Stripe changes their email in Auth0, the index updates automatically. User360 یا The "Legacy Wrapper" (Modernization Layer) شرکتونه اغیزمن ډاټا لري چې په سیسټمونو کې بند شوي دي چې د پوښتنې لپاره دردناک دي (SOAP، XML، Mainframes). تاسو د 20 کاله عمر لرونکي سیسټم په پرتله یو مدرن، پوښتنې وړ SQL انټرنیټ ترلاسه کړئ (په CocoIndex هدف کې). د مسلکي معلوماتو څارنه (Competitive Intelligence) Tracking changes on public websites or APIs that don't offer webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: د استعمال capabilities, you can trigger downstream alerts only when a price changes by >5% or a new regulation is posted, rather than spamming your database with identical polling results. The CocoIndex Value: diff Why This Matters Custom Sources extend this model to API - داخلي، خارجي، وارث، او یا په واقعي وخت کې. any This unlocks a simple but powerful pattern: که تاسو کولی شئ دا ترلاسه کړئ، CocoIndex کولی شي دا index، diff او sync. که تاسو کولی شئ دا ترلاسه کړئ، CocoIndex کولی شي دا index، diff او sync. Whether you’re indexing HackerNews or orchestrating dozens of enterprise services, the framework gives you a stable backbone with: persistent state د Deterministic updates automatic lineage flexible target exports minimal infrastructure overhead ⭐ Try It, Fork It, Star It که تاسو دا ګټور وګورئ، a means a lot — it helps others discover CocoIndex and supports further development. star on GitHub د GitHub