У дистрибуираној архитектури дистрибуираних система, синхронизацијски јаз између спољашњих ХТТП АПИ-ја и релационих база података циљева представља упоран инжењерски изазов – нарочито када АПИ одговори немају природну трансакцијску семантику или излажу не-нормализоване, дубоко укопане ЈСОН корисне оптерећења (коментарне нити, организационе хијерархије, мулти-ниво БОМ структуре). ЦоцоИндекс-ова Апстракција прилагођеног извора пружа декларативни оквир за конверзију произвољних ХТТП/гРПЦ крајњих тачака у државне, интензивно синхронизоване подаци са угра • Стателесс АПИ→Статефул Стреам трансформација: Претварање РЕСТ ГЕТ захтјева у семантику прилога-логова • Рекурзивно прелазак дрвећа са откривањем циклуса: руковање неограниченом дубином коментара ДАГ-а (директни ациклични графикони) без преласка стека • Schema evolution support: Python dataclass-based typing with forward compatibility via structural subtyping • Постгрес синк конектор интеграција: Коришћење псицопг3 асинхенк повезивање пуллинг + ЦОПИ протокол за масовне уметке • GIN-indexed full-text search configuration: Materializing tsvector columns with English dictionary stemming + ranking functions In this example, we build a custom connector for HackerNews. It fetches recent stories + nested comments, indexes them, and exposes a simple search interface powered by Postgres full-text search. Зашто користити прилагођени извор? In many scenarios, pipelines don't just read from clean tables. They depend on: Internal REST services Партнери АПИ Legacy systems Нестандардни модели података који не одговарају традиционалним конекторима CocoIndex’s Custom Source API makes these integrations , incremental, and safe by default. Instead of writing ad-hoc scripts, you wrap your API as a “source component,” and CocoIndex takes it from there. declarative Пројекат Валктроугх - Изградња индекса ХацкерНовости Циљеви Позовите HackerNews Search API Fetch nested comments Update only modified threads Store content in Postgres Expose a text search interface CocoIndex аутоматски управља детекцијом промена, идемпотенцијом, линијом и синхронизацијом стања. Преглед The pipeline consists of three major parts: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query Сваки Само процеси мењају ХН нити и држи све у синхронизацији. cocoindex update The project is open source and available on . GitHub Prerequisites if you don't have one. Install Postgres Defining the Data Model Every custom source defines two lightweight data types: Key Type → uniquely identifies an item Value Type → the full content for that item У Хацкер Невс-у, свака вест је низа, а свака низа може имати више коментара. For HackerNews, let’s define keys like this: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str Кључеви морају бити: Хашић serializable stable (doesn’t change over time) Values hold the actual dataset: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] Ово каже ЦоцоИндек-у тачно како изгледа сваки ХацкерНевс "елемент" када се у потпуности покупи. holds a post and all its comments, while represents individual comments. _HackerNewsThread _HackerNewsComment Изградња прилагођеног изворног конектора A Custom Source has two parts: — declarative configuration SourceSpec — operational logic for reading data SourceConnector Writing the SourceSpec A у CocoIndex је декларативна конфигурација која каже систему и to a source. It doesn’t fetch data itself — that’s handled by the source connector. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 Поља : tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. Defining the connector Sets up the connector's configuration and HTTP session so it can fetch HackerNews data efficiently. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread create() позива ЦоцоИндекс да инициализује конектор, и поставља свеж aiohttp.ClientSession за израду ХТТП захтева. Листа доступних трендова Тхе method in is responsible for који одговарају датим критеријумима (таг, максималне резултате) и враћају метаподатке о њима. and which may have changed. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) fetches . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal allows CocoIndex to track what threads exist and which have changed without fetching full thread content. Purpose: Fetching Full Thread Content Овај асинхрон метод узима а (including its comments) from the , and wraps the result in a object — the structure CocoIndex uses for row-level ingestion. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) fetches the , including comments. get_value() full content of a specific thread Parses the raw JSON into structured Python objects ( + ). _HackerNewsThread _HackerNewsComment Vraća PartialSourceRowData koji sadrži ceo thread. Ordinal Support Каже ЦоцоИндек да овај извор пружа временске ознаке (ординале). def provides_ordinal(self) -> bool: return True КоцоИндек користи обичне да постепено ажурира само промењене нити, побољшавајући ефикасност. Parsing JSON into Structured Data This static method takes the raw JSON response from the and turns it into a normalized object containing: API _HackerNewsThread The post (title, text, metadata) Svi komentari, ujedinjeni u jednu listu Proper Python datetime objects It performs a of the comment tree. recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) Converts raw HackerNews API response into and . _HackerNewsThread _HackerNewsComment recursively parses nested comments. _add_comments() Combines + into the main thread content. title text Произведе потпуно структурирани објекат спреман за индексирање. Putting It All Together in a Flow Your flow now reads exactly like a React component. Define the flow and connect source @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() Process each thread and collect structured information with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) Process each comment of a thread and collect structured information with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) Експорт у бази података Табеле message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) CocoIndex now: Преузети HackerNews API tracks changes incrementally Smeštaj komentariše Izvoz u Postgres Podrška za live mode Your app can now query it as a real-time search index. Претраживање и претраживање HackerNews индекса At this point you are done with the index flow. As the next step, you could define query handlers — so you can run queries in CocoInsight. You can use any library or framework of your choice to perform queries. You can read more in the documentation about . Query Handler @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) This code defines a query handler that searches HackerNews threads and comments indexed in CocoIndex. It determines the database table storing the messages, then uses PostgreSQL full-text search ( and ) да пронађете редове који одговарају упиту. to_tsvector plainto_tsquery Results are ranked by relevance ( ) and creation time, formatted into dictionaries, and returned as a structured . Essentially, it performs a full-text search over the indexed content and delivers ranked, structured results. ts_rank cocoindex.QueryOutput Running Your HackerNews Custom Source Када су ваш прилагођени извор и ток спремни, покретање са ЦоцоИндек-ом је једноставно. or with HackerNews. update the index on-demand keep it continuously in sync 1. Install Dependencies Уверите се да имате Питхон инсталиран, а затим инсталирајте свој пројекат у уређивом режиму: pip install -e . This installs CocoIndex along with all required dependencies, letting you develop and update the connector without reinstalling. 2. Update the Target (On-Demand) To populate your target (e.g., Postgres) with the latest HackerNews threads: cocoindex update main Only threads that will be re-processed. have changed Your target remains in sync with the . most recent 500 HackerNews threads Efikasna incrementalna ažuriranja štede vreme i računalne resurse. Note that each time when you run the update command, CocoIndex will only re-process threads that have changed, and keep the target in sync with the recent 500 threads from HackerNews. You can also run update command in live mode, which will keep the target in sync with the source continuously: cocoindex update -L main Runs the flow in , polling HackerNews periodically. live mode CocoIndex automatski rukuje progresivnim promenama i održava cilj sinhronizovan. Ideal for dashboards, search, or AI pipelines that require real-time data. 3. Troubleshoot & Inspect with CocoInsight CocoInsight lets you , see the lineage of your data, and understand what’s happening under the hood. visualize and debug your flow Покрените сервер: cocoindex server -ci main Онда отворите УИ у вашем претраживачу: https://cocoindex.io/cocoinsight CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. Note that this requires QueryHandler setup in previous step. What You Can Build Next This simple example opens the door to a lot more: Build a trending-topic detector Run LLM summarization pipelines on top of indexed threads Додајте уграђења + претраживање вектора Mirror HN into your internal data warehouse Build a real-time HN dashboard Проширите се на друге изворе вести (Реддит, Лобстерс, итд.) Пошто је цео гасовод декларативан и инкорпоративан, проширење је једноставно. Пошто прилагођени извори омогућавају вам да обмотате Python logika u incrementalni tok podataka, najbolji slučajevi korišćenja su obično data—systems that don't have standard database connectors, have complex nesting, or require heavy pre-processing. any "Hard-to-Reach" Агрегатор знања за ЛЛМ контекст Building a context engine for an AI bot often requires pulling from non-standard documentation sources. The "Composite" Entity (Data Stitching) Most companies have user data fragmented across multiple microservices. You can build a Custom Source that acts as a "virtual join" before the data ever hits your index. For example the Source: Fetches a User ID from an (Okta/Auth0). Auth Service Uses that ID to fetch billing status from . Stripe API Uses that ID to fetch usage logs from an . Internal Redis Уместо управљања сложеним ЕТЛ спојевима надоле, прилагођени извор даје једну object. CocoIndex tracks the state of this composite object; if the user upgrades in Stripe menjaju svoju e-poštu u Auth0, indeks se automatski ažurira. User360 or The "Legacy Wrapper" (Модернизациони слој) Enterprises often have valuable data locked in systems that are painful to query (SOAP, XML, Mainframes). You get a modern, queryable SQL interface (via the CocoIndex target) on top of a 20-year-old system without rewriting the legacy system itself. Public Data Monitor (Competitive Intelligence) Праћење промена на јавним веб-сајтовима или АПИ-има који не нуде веб-хаоке. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: Користећи capabilities, you can trigger downstream alerts only when a price changes by >5% or a new regulation is posted, rather than spamming your database with identical polling results. The CocoIndex Value: diff Zašto je ovo važno Кориснички извори проширују овај модел на API – интерни, спољни, наследни или у реалном времену. any This unlocks a simple but powerful pattern: If you can fetch it, CocoIndex can index it, diff it, and sync it. If you can fetch it, CocoIndex can index it, diff it, and sync it. Whether you’re indexing HackerNews or orchestrating dozens of enterprise services, the framework gives you a stable backbone with: persistent state deterministic updates automatic lineage flexible target exports minimal infrastructure overhead Покушај то, Форк то, Стар то If you found this useful, a means a lot — it helps others discover CocoIndex and supports further development. star on GitHub GitHub