I den målrettede backend-arkitektur repræsenterer synkroniseringsgapet mellem eksterne HTTP-API'er og relationsbaserede databasemål en vedvarende ingeniørudfordring – især når API-responser mangler indfødte transaktionelle semantik eller udsætter ikke-normaliserede, dybt forankrede JSON-payloads (kommentartråd, organisatoriske hierarkier, multi-level BOM-strukturer). CocoIndex's Custom Source abstraktion giver en deklarativ ramme for at konvertere vilkårlige HTTP/gRPC-endpoint til statlige, incrementelt synkroniserede datastrømme med indbygget checkpointing, indholdsfingerprinting (SHA256-baseret deduplication) og differentialopdatering. I modsætning til • Stateløs API→Stateful Stream-transformation: Konvertering af REST GET-forespørgsler til append-log-semantik • Recursiv træoverskridelse med cyklusdetektion: Håndtering af ubegrænset dybdekommentarer (Directed Acyclic Graphs) uden stack overflow • Skema evolution support: Python dataclass-baseret typing med forward-kompatibilitet via strukturel subtyping • Postgres sink connector integration: Udnyttelse af psycopg3 async forbindelse pooling + COPY protokol for bulk indsætninger • GIN-indekseret fuldtekstsøgningskonfiguration: Materialisering af tsvektorkolonner med engelsk ordbogstemming + placeringsfunktioner I dette eksempel bygger vi en brugerdefineret konnektor til HackerNews. Den henter seneste historier + indlejrede kommentarer, indekserer dem og udsætter en simpel søgeflade, der drives af Postgres fuldtekstsøgning. Hvorfor bruge en Custom Source? I mange scenarier læser rørledninger ikke kun fra rene tabeller. Interne REST tjenester Partner af APIs Legacy systemer Ikke-standardiserede datamodeller, der ikke passer til traditionelle forbindelser CocoIndex Custom Source API gør disse integrationer I stedet for at skrive ad-hoc scripts, pakker du din API som en "kildekomponent", og CocoIndex tager det derfra. erklærede Projekt Walkthrough - Opbygning af et HackerNews Index Målsætninger Læs mere om HackerNews Search API Fetch nested comments Opdater kun ændrede tråde Store indhold i Postgres Eksponer et tekstsøgningsgrænseflade CocoIndex håndterer automatisk ændringsdetektion, idempotens, lineage og statssynkronisering. Overblik Rørledningen består af tre hoveddele: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query Each Kun processer ændrer HN-tråd og holder alt i synkronisering. cocoindex update Projektet er open source og tilgængeligt på . af Github Forudsætninger if you don't have one. Install Postgres Defining the Data Model Hver brugerdefineret kilde definerer to lette datatyper: Nøgletype → identificerer unikt et element Værdi Type → det fulde indhold for dette element I Hacker News er hver nyhed en tråd, og hver tråd kan have flere kommentarer. For HackerNews, lad os definere nøgler som dette: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str Nøgler skal være: Hashable Serialiseret Stabil (forandrer sig ikke over tid) Værdier holder det faktiske datasæt: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] Dette fortæller CocoIndex nøjagtigt, hvordan hver HackerNews "item" ser ud, når det er fuldt ud hentet. Jeg har skrevet et indlæg og alle mine kommentarer, mens jeg Det er individuelle bemærkninger. _HackerNewsThread _HackerNewsComment Brug af en Custom Source Connector En Custom Source har to dele: SourceSpec – deklarativ konfiguration SourceConnector – operationel logik til læsning af data Skrivning af SourceSpec A er i CocoIndex er en deklarativ konfiguration, der fortæller systemet og Det henter ikke data selv - det håndteres af kildeforbindelsen. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 Fælles felter: tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. Definere forbindelsen Opsæt konnektorens konfiguration og HTTP-session, så den kan hente HackerNews-data effektivt. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread create() kaldes af CocoIndex for at initialisere forbindelsen, og den opretter en frisk aiohttp.ClientSession til at lave HTTP-forespørgsler. Liste over tilgængelige tråde Den Metoden i er ansvarlig for that match the given criteria (tag, max results) and returning metadata about them. CocoIndex uses this to Og som måske har ændret sig. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) Fælles . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal Formål: gør det muligt for CocoIndex at spore, hvilke tråde der eksisterer, og hvilke tråde der er ændret, uden at hente fuldt trådeindhold. Forklaring af fuldt indhold Denne async-metode opnår en (herunder kommentarer fra , og indsætter resultatet i en objekt – strukturen CocoIndex bruger til række-niveau indtagelse. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) get_value() indhenter det fulde indhold af en bestemt tråd, herunder kommentarer. Parser den rå JSON til strukturerede Python-objekter (_HackerNewsThread + _HackerNewsComment). Returns a containing the full thread. PartialSourceRowData Almindelig støtte Fortæller CocoIndex, at denne kilde giver timestamps (ordinarer). def provides_ordinal(self) -> bool: return True CocoIndex uses ordinals to incrementally update only changed threads, improving efficiency. Partering af JSON til strukturerede data Denne statiske metode tager den rå JSON-respons fra Gør det til en normaliseret Objekter der indeholder: API _HackerNewsThread Oplysningerne (titel, tekst og metadata) Alle kommentarer, flettet til en enkelt liste Korrekt Python datetime objekter Det udfører en af kommentartræet. recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) Konverterer det rå HackerNews API-svar til _HackerNewsThread og _HackerNewsComment. recursively parses nested comments. _add_comments() Kombiner titel + tekst i hovedtrådsindholdet. Producerer et fuldt struktureret objekt klar til indeksering. Putting It All Together in a Flow Din flow læser nu præcis som en React-komponent. Define the flow and connect source @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() Behandle hver tråd og indsamle strukturerede oplysninger with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) Process each comment of a thread and collect structured information with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) Eksport til databasetabeller message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) CocoIndex lige nu: Læs mere om HackerNews API Trail ændrer sig gradvist Nørre Nørre Kommentarer Eksport til Postgres Understøtter live mode Din app kan nu forespørge det som et realtidssøgningsindeks. Søgning og søgning i HackerNews Index På dette tidspunkt er du færdig med indeksflowet. Som næste trin kan du definere forespørgselsbehandlere – så du kan køre forespørgsler i CocoInsight. Du kan bruge et hvilket som helst bibliotek eller ramme af dit valg til at udføre forespørgsler. . Ønsker håndtering @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) Denne kode definerer en forespørgselshåndtering, der søger HackerNews threads og kommentarer indekseret i CocoIndex. Det bestemmer databasetabellen, der gemmer meddelelserne, derefter bruger PostgreSQL fuldtekstsøgning ( og for at finde rækker, der matcher forespørgslen. to_tsvector plainto_tsquery Resultaterne er sorteret efter relevans ( ) and creation time, formatted into dictionaries, and returned as a structured . Essentially, it performs a full-text search over the indexed content and delivers ranked, structured results. ts_rank cocoindex.QueryOutput Running Your HackerNews Custom Source Once your custom source and flow are ready, running it with CocoIndex is straightforward. You can either eller af HackerNews. update the index on-demand keep it continuously in sync 1. opbygge afhængigheder Sørg for, at du har Python installeret og derefter installere dit projekt i redigerbar tilstand: pip install -e . Dette installerer CocoIndex sammen med alle nødvendige afhængigheder, så du kan udvikle og opdatere kontakten uden at geninstallere. Opdatering af målet (on demand) To populate your target (e.g., Postgres) with the latest HackerNews threads: cocoindex update main Kun tråde, der er ændret, vil blive behandlet igen. Dit mål forbliver synkroniseret med de seneste 500 HackerNews-tråde. Effektive incrementelle opdateringer sparer tid og beregningsressourcer. Bemærk, at hver gang du kører opdateringskommandoen, vil CocoIndex kun genprocessere tråde, der er ændret, og holde målet i synkronisering med de seneste 500 tråde fra HackerNews. cocoindex update -L main Kører strømmen i live-tilstand, afstemning HackerNews regelmæssigt. CocoIndex håndterer automatisk incrementelle ændringer og holder målet synkroniseret. Ideel til dashboards, søgninger eller AI-rørledninger, der kræver realtidsdata. 3. Troubleshoot & Inspect with CocoInsight CocoInsight lets you , see the lineage of your data, and understand what’s happening under the hood. visualize and debug your flow Start the server: cocoindex server -ci main Så åbner du UI i din browser: https://cocoindex.io/cocoinsight CocoInsight har nul rørledningsdataopbevaring - det er sikkert til debugging og inspektion af dine strømninger lokalt. CocoInsight har nul rørledningsdataopbevaring - det er sikkert til debugging og inspektion af dine strømninger lokalt. Bemærk, at dette kræver QueryHandler-installation i det foregående trin. Hvad du kan bygge næste This simple example opens the door to a lot more: Opbyg en trending-topic detektor Kør LLM summarization pipelines på toppen af indekserede tråde Tilføj embeddings + vektor søgning Spejl HN ind i dit interne data lager Build a real-time HN dashboard Udvid til andre nyhedskilder (Reddit, Lobsters osv.) Because the whole pipeline is declarative and incremental, extending it is straightforward. Da Custom Sources giver dig mulighed for at pakke Python logik i en incrementel datastrøm, de bedste brugssager er normalt data—systems that don't have standard database connectors, have complex nesting, or require heavy pre-processing. Enhver "Hard-to-Reach" Den viden aggregator for LLM kontekst Opbygning af en kontekstmotor til en AI-bot kræver ofte at trække fra ikke-standardiserede dokumentationskilder. Den sammensatte enhed (Data Stitching) Most companies have user data fragmented across multiple microservices. You can build a Custom Source that acts as a "virtual join" before the data ever hits your index. For example the Source: Fetches a User ID from an (Okta/Auth0). Auth Service Bruger dette ID til at hente faktureringsstatus fra Stripe API. Bruger det ID til at hente brugslogger fra en Internal Redis. Instead of managing complex ETL joins downstream, the Custom Source yields a single CocoIndex sporer tilstanden af dette sammensatte objekt; hvis brugeren opgraderer i Stripe ændrer deres e-mail i Auth0, opdaterer indekset automatisk. User360 eller Den "Legacy Wrapper" (Modernization Layer) Virksomheder har ofte værdifulde data låst i systemer, der er smertefulde at forespørge (SOAP, XML, Mainframes). Du får en moderne, forespørgselsbar SQL-grænseflade (via CocoIndex-målet) på toppen af et 20 år gammelt system uden at omskrive det gamle system selv. Public Data Monitor (Competitive Intelligence) Sporing af ændringer på offentlige websteder eller API'er, der ikke tilbyder webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: Brug af den capabilities, you can trigger downstream alerts only when a price changes by >5% or a new regulation is posted, rather than spamming your database with identical polling results. The CocoIndex Value: diff Hvorfor det betyder noget Custom Sources udvider denne model til API — internal, external, legacy, or real-time. Enhver Dette åbner et simpelt, men kraftfuldt mønster: Hvis du kan hente det, kan CocoIndex indeksere det, differe det og synkronisere det. Hvis du kan hente det, kan CocoIndex indeksere det, differe det og synkronisere det. Uanset om du indekserer HackerNews eller orkestrerer snesevis af virksomhedstjenester, giver rammen dig en stabil rygsøjle med: Den vedvarende stat Deterministiske opdateringer Automatisk linje Fleksible eksportmål Minimal overhead infrastruktur ⭐ Try It, Fork It, Star It Hvis du finder dette nyttigt, a betyder meget – det hjælper andre med at opdage CocoIndex og understøtter yderligere udvikling. star on GitHub GitHub