W architekturze układów dystrybucyjnych, przepaść synchronizacji między zewnętrznymi API HTTP i relacyjnymi celami bazy danych stanowi trwałe wyzwanie inżynieryjne – zwłaszcza gdy odpowiedzi API nie posiadają native transaction semantics lub eksponują nienormalizowane, głęboko nieszczelone obciążenia JSON (drzwi komentarzy, hierarchie organizacyjne, struktury BOM na wielu poziomach). Abstrakcja zasobów niestandardowych CocoIndex zapewnia deklaratywne ramy konwersji arbitralnych punktów końcowych HTTP/gRPC w stanowe, stopniowo synchronizowane strumienie danych z wbudowanym czekiem, odciskiem palca treści (duplikacja oparta na SHA256) i rozpowszechnianiem • Stateless API→Stateful Stream Transformation: Konwertowanie żądań REST GET na semantykę logowania załączników • Przechodzenie przez drzewo z wykryciem cyklu: obsługa nieograniczonej głębokości komentarza DAG (Directed Acyclic Graphs) bez przepływu stacków • Wsparcie dla ewolucji schematu: typowanie oparte na klasie danych Python z kompatybilnością z przodu za pośrednictwem subtypowania strukturalnego • Integracja złącza poślizgowego Postgres: wykorzystanie łącza asynchronicznego psycopg3 + protokołu COPY do wkładek masowych GIN-indeksowana konfiguracja wyszukiwania pełnego tekstu: materializacja kolumn tsvector z angielskim słownictwem stemming + funkcje rankingowe W tym przykładzie zbudujemy niestandardowy złącze dla HackerNews. Pobiera najnowsze historie + niestandardowe komentarze, indeksuje je i ujawnia prosty interfejs wyszukiwania obsługiwany przez wyszukiwanie pełnoprawne Postgres. Dlaczego warto korzystać z niestandardowego źródła? W wielu scenariuszach rurociągi nie tylko czytają z czystych tabel. Usługi wewnętrzne REST Partnerzy APIs systemy dziedziczenia Niestandardowe modele danych, które nie pasują do tradycyjnych złączy CocoIndex Custom Source API umożliwia te integracje Zamiast pisać skrypty ad-hoc, pakujesz swój API jako „komponent źródłowy”, a CocoIndex bierze go stamtąd. Deklaracja Projekt Walkthrough – budowanie indeksu HackerNews celów Nazwa HackerNews Search API Fetch komentarze Aktualizacja tylko zmodyfikowanych treści Wielka zawartość w Postgres Wyświetlanie interfejsu wyszukiwania tekstu CocoIndex automatycznie obsługuje wykrywanie zmian, idempotencję, liniowanie i synchronizację stanu. Przegląd Rurociąg składa się z trzech głównych części: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query Każdy Tylko procesy zmieniły pasma HN i zachowują wszystko w synchronizacji. cocoindex update Projekt jest otwarty i dostępny na . GitHub Wymagania if you don't have one. Install Postgres Defining the Data Model Każde niestandardowe źródło danych definiuje dwa typy danych lekkich: Typ klucza → unikalnie identyfikuje element Typ wartości → pełna zawartość tego elementu In Hacker News, each news is a thread, and each thread can have multiple comments. For HackerNews, let’s define keys like this: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str Klucze muszą być: hashable serializacja stable (doesn’t change over time) Wartości zawierają rzeczywisty zestaw danych: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] To mówi CocoIndex dokładnie, jak każdy HackerNews „element” wygląda, gdy jest w pełni pobrany. pisze list i wszystkie jego komentarze, podczas gdy Przedstawiamy indywidualne komentarze. _HackerNewsThread _HackerNewsComment Tworzenie niestandardowego połączenia źródłowego Custom Source ma dwie części: SourceSpec — konfiguracja deklaratywna SourceConnector – logika operacyjna do odczytu danych Tworzenie SourceSpec a w CocoIndex jest konfiguracją deklaratywną, która mówi systemowi i Nie pobiera danych samodzielnie – jest to obsługiwane przez złącze źródłowe. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 Na polach : tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. Definicja Connector Ustaw konfigurację złącza i sesję HTTP, aby można było efektywnie pobierać dane HackerNews. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread create() jest wywoływany przez CocoIndex, aby zainicjować złącze, i ustawia świeży aiohttp.ClientSession do tworzenia żądań HTTP. Wyświetlanie dostępnych threadów o Metoda w Jest odpowiedzialny za które odpowiadają danym kryteriom (tag, maksymalne wyniki) i zwracają metadane na ich temat. które mogły się zmienić. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) fetyszy . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal Cel: pozwala CocoIndex na śledzenie, jakie wątki istnieją i które się zmieniły bez pobierania pełnej zawartości wątku. Wyświetlanie pełnej treści Ta metoda asynchroniczna pobiera a (z uwzględnieniem jego komentarzy) z , and wraps the result in a object — the structure CocoIndex uses for row-level ingestion. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) get_value() pobiera pełną zawartość określonego thread, w tym komentarze. Parsuje surowy JSON w ustrukturyzowane obiekty Python (_HackerNewsThread + _HackerNewsComment). Zwraca element PartialSourceRowData zawierający pełny wątek. Zwykłe wsparcie CocoIndex informuje, że to źródło dostarcza znaczniki czasowe (zwykłe). def provides_ordinal(self) -> bool: return True CocoIndex używa zwykłych, aby stopniowo aktualizować tylko zmienione pasma, poprawiając wydajność. Przekształcanie JSON w dane strukturalne Ta metoda statyczna pobiera surową odpowiedź JSON z Przekształca się w normalizację Obiekt zawierający: API _HackerNewsThread Tytuł artykułu (tytuł, tekst i metadane) Wszystkie komentarze wbudowane, spłaszczone na jedną listę Proper Python datetime objects Wykonuje się a z drzewa komentarza. recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) Converts raw HackerNews API response into and . _HackerNewsThread _HackerNewsComment recursively parses nested comments. _add_comments() Łączy tytuł + tekst w główną zawartość. Produkuje w pełni ustrukturyzowany obiekt gotowy do indeksowania. Putting It All Together in a Flow Twój przepływ odczytuje się teraz dokładnie jak komponent React. Definiuj przepływ i podłącz źródło @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() Przetwarzaj każdy thread i zbieraj zorganizowane informacje with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) Przetwarzanie każdego komentarza w wątku i zbieranie zorganizowanych informacji with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) Eksport do tabel bazy danych message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) CocoIndex już teraz: Informacje o HackerNews API Ścieżki zmieniają się stopniowo flattens nested comments Eksport do Postgres supports live mode Twoja aplikacja może teraz wyszukiwać go jako indeks wyszukiwania w czasie rzeczywistym. Wyszukiwanie i wyszukiwanie HackerNews Index At this point you are done with the index flow. As the next step, you could define query handlers — so you can run queries in CocoInsight. You can use any library or framework of your choice to perform queries. You can read more in the documentation about . Query Handler @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) Ten kod definiuje przetwarzacz zapytań, który wyszukuje tagi HackerNews i komentarze indeksowane w CocoIndex. Określa tabelę bazy danych przechowującą wiadomości, a następnie używa wyszukiwania PostgreSQL w pełnym tekście ( i ) to find rows matching the query. to_tsvector plainto_tsquery Results are ranked by relevance ( ) i czas tworzenia, sformatowane w słownikach i zwrócone jako ustrukturyzowane Zasadniczo wykonuje wyszukiwanie w pełnym tekście nad indeksowaną zawartością i dostarcza sklasyfikowanych, zorganizowanych wyników. ts_rank cocoindex.QueryOutput Uruchamianie Twojego HackerNews Custom Source Once your custom source and flow are ready, running it with CocoIndex is straightforward. You can either or Na podstawie HackerNews. update the index on-demand keep it continuously in sync 1 Instalacja uzależnień Upewnij się, że masz zainstalowany Python, a następnie zainstaluj swój projekt w trybie edycyjnym: pip install -e . This installs CocoIndex along with all required dependencies, letting you develop and update the connector without reinstalling. 2. Update the Target (On-Demand) To populate your target (e.g., Postgres) with the latest HackerNews threads: cocoindex update main Tylko te, które zostały zmienione, zostaną ponownie przetworzone. Your target remains in sync with the . most recent 500 HackerNews threads Efficient incremental updates save time and compute resources. Note that each time when you run the update command, CocoIndex will only re-process threads that have changed, and keep the target in sync with the recent 500 threads from HackerNews. You can also run update command in live mode, which will keep the target in sync with the source continuously: cocoindex update -L main Uruchamia strumień w trybie na żywo, okresowo ankietując HackerNews. CocoIndex automatycznie obsługuje stopniowe zmiany i synchronizuje cel. Idealny do paneli kontrolnych, wyszukiwania lub rurociągów AI, które wymagają danych w czasie rzeczywistym. 3. Troubleshoot & Inspect with CocoInsight CocoInsight pozwala , zobacz linię swoich danych i zrozum, co dzieje się pod kapturem. visualize and debug your flow Zacznij od serwera: cocoindex server -ci main Następnie otwórz UI w swojej przeglądarce: https://cocoindex.io/cocoinsight CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. CocoInsight ma zero przechowywania danych rurociągów - jest bezpieczny do debugowania i inspekcji przepływów lokalnie. Należy zauważyć, że wymaga to instalacji QueryHandler w poprzednim kroku. Co możesz zbudować dalej Ten prosty przykład otwiera drzwi do znacznie więcej: Build a trending-topic detector Run LLM summarization pipelines on top of indexed threads Dodaj wkłady + wyszukiwanie wektorowe Mirror HN do Twojego wewnętrznego magazynu danych Tworzenie pulpitu HN w czasie rzeczywistym Rozszerz się na inne źródła wiadomości (Reddit, Lobsters itp.) Ponieważ cały rurociąg jest deklaratywny i stopniowy, jego rozszerzenie jest proste. Since Custom Sources allow you to wrap Logika Pythona w stopniowym strumieniu danych, najlepsze przypadki użycia są zwykle dane — systemy, które nie mają standardowych złączy baz danych, mają złożone gniazdo lub wymagają ciężkiego przetwarzania. Wszyscy "Hard-to-Reach" The Knowledge Aggregator for LLM Context Budowa silnika kontekstowego dla botu AI często wymaga czerpania z niestandardowych źródeł dokumentacji. „Kompozytowy” podmiot (Data Stitching) Większość firm ma dane użytkowników rozdrobnione w wielu mikroserwisach. Możesz zbudować niestandardowe źródło, które działa jako „wirtualne połączenie”, zanim dane kiedykolwiek trafią do indeksu. For example the Source: Fetches a User ID from an (Okta/Auth0). Auth Service Używa tego identyfikatora do pobierania statusu rozliczeń z API Stripe. Używa tego identyfikatora do pobierania dzienników użytkowania z programu Internal Redis. Zamiast zarządzać złożonymi ETL-ami w dół, niestandardowe źródło generuje pojedynczy CocoIndex śledzi stan tego kompozytowego obiektu; jeśli użytkownik uaktualni w Stripe zmieniają swoją wiadomość e-mail w Auth0, indeks aktualizuje się automatycznie. User360 lub The "Legacy Wrapper" (Modernization Layer) Firmy często mają cenne dane zamknięte w systemach, które są bolesne do zapytania (SOAP, XML, Mainframes). Możesz uzyskać nowoczesny, interfejs SQL do zapytania (za pośrednictwem celu CocoIndex) na szczycie 20-letniego systemu bez ponownego napisania samego systemu. Monitor danych publicznych (Intelligence Competitive) Śledzenie zmian na publicznych stronach internetowych lub interfejsach API, które nie oferują webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: Korzystając z Możesz wywołać powiadomienia w dół tylko wtedy, gdy zmieni się cena o > 5% lub zostanie opublikowana nowa regulacja, zamiast wysyłać spamu do bazy danych z identycznymi wynikami ankiet. The CocoIndex Value: diff Dlaczego to ma znaczenie Custom Sources extend this model to API – wewnętrzne, zewnętrzne, dziedziczne lub w czasie rzeczywistym. Wszyscy To uwalnia prosty, ale potężny wzorzec: Jeśli możesz go odzyskać, CocoIndex może go indeksować, dyfować i synchronizować. Jeśli możesz go odzyskać, CocoIndex może go indeksować, dyfować i synchronizować. Niezależnie od tego, czy indeksujesz HackerNews, czy zorganizujesz dziesiątki usług dla przedsiębiorstw, ramy te dają ci stabilny rdzeń: persistent state Deterministyczne aktualizacje Linia automatyczna flexible target exports minimal infrastructure overhead Spróbuj, Fork It, Star It If you found this useful, a znaczy wiele – pomaga innym odkryć CocoIndex i wspiera dalszy rozwój. star on GitHub GitHub