ターゲットディスプレイシステムアーキテクチャでは、外部HTTP APIと関係データベースターゲットの間の同期差は、継続的なエンジニアリングの課題を表しています - 特にAPI応答がネイティブなトランザクションセマンティクスを欠いている場合、または組み込まれたチェックポイント、コンテンツの指紋印刷(SHA256ベースのデダプライクション)、複数のレベルのBOM構造の間の非正常化されたJSONパイロットを露出している場合。 CocoIndexのカスタムソース抽象は、任意のHTTP/gRPCエンドポイントをステータスに変換するための宣言フレームワークを提供します。CocoIndexのカスタムソース抽象スクリプト Stateless API→Stateful Stream Transformation: REST GET リクエストを append-log セマンティクスに変換する • サイクル検出を搭載した回転式ツリートラベル: Stack overflow なしで無制限深度コメント DAGs (Directed Acyclic Graphs) を処理 • Schema evolution support: Python dataclass-based typing with forward compatibility via structural subtyping (構造サブティーピング) Postgres sink connector integration: Leveraging psycopg3 async connection pooling + COPY protocol for bulk inserts (ポストグレース・サンク・コネクター・インテグレーション) • GIN-indexed full-text search configuration: Materializing tsvector columns with English dictionary stemming + rank function この例では、HackerNews 用のカスタムコネクタを構築し、最近のストーリー + ネストされたコメントを取得し、それらをインデックスし、Postgres のフルテキスト検索によって動作するシンプルな検索インターフェイスを露出します。 なぜ Custom Source を使うのか? 多くのシナリオでは、パイプラインはクリーンテーブルから読むだけではありません。 Internal REST サービス APIsパートナー 遺産システム 従来のコネクタに合わない非標準データモデル CocoIndexのカスタムソースAPIは、これらの統合を実現します。 アドホックスクリプトを書く代わりに、あなたはあなたのAPIを「ソースコンポーネント」として包装し、CocoIndexはそこからそれを取ります。 宣言 Project Walkthrough — Building a HackerNews Index (プロジェクトウォークトローグ) 目標 ハッカーニュース検索 API Fetch コメント Update only modified threads(修正されたトレードのみを更新) ポストグレスのコンテンツ テキスト検索インターフェイス CocoIndex は、変更検出、idempotency、lineage、および state sync を自動的に処理します。 概要 パイプラインは3つの主要な部分から構成されています: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query 各 プロセスだけが HN トレードを変更し、すべてを同期させます。 cocoindex update プロジェクトはオープンソースであり、 . GitHub 前提条件 if you don't have one. Install Postgres データモデルの定義 Every custom source defines two lightweight data types: Key Type → ユニークにアイテムを識別する Value Type → the full content for that item. この項目の完全なコンテンツ Hacker News では、各ニュースはトレードであり、各トレードには複数のコメントが含まれます。 ハッカーニュースでは、このようなキーを定義しましょう: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str キーは必須: ハッシュ シリアル 安定(時間の経過とともに変化しない) 値は実際のデータセットを保持します: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] This tells CocoIndex exactly what every HackerNews “item” looks like when fully fetched. すべての記事とコメントをまとめながら、 個々のコメントを表示します。 _HackerNewsThread _HackerNewsComment Custom Source Connectorの構築 Custom Source には 2 つの部分があります。 SourceSpec — 宣言構成 SourceConnector — データを読み取るための操作論理 Writing the SourceSpec A in CocoIndex は、システムを示す宣言構成です。 そして ソースにアクセスします. It does not retrieve data itself — that is handled by the source connector. ソースコネクタによって処理されます。 SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 フィールド: tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. コネクターの定義 コネクタの構成と HTTP セッションを設定して、HackerNews データを効率的に取得できます。 @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread create() は CocoIndex によってコネクタを初期化するために呼び出され、新しい aiohttp.ClientSession を設定して HTTP リクエストを作成します。 Available Threads List THE 方法 in 責任あるのは 指定された基準(タグ、マックス結果)と一致し、それに関するメタデータを返します。 もしかしたら変わったのかもしれない。 list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) フェチ . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal 目的: CocoIndex が完全なトレードコンテンツを取得することなく、どのようなトレードが存在し、どのようなトレードが変更されたかを追跡できます。 Fetching Full Thread コンテンツ この async method は a (コメントを含む) from the , and wraps the result in a object - CocoIndexが行レベルの摂取に使用する構造。 single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) fetches the , including comments. get_value() full content of a specific thread 原始の JSON を構造化された Python オブジェクト(_HackerNewsThread + _HackerNewsComment)に分解します。 完全なトレードを含む PartialSourceRowData を返します。 通常サポート CocoIndex によると、このソースはタイムスタンプ (ordinals) を提供しています。 def provides_ordinal(self) -> bool: return True CocoIndex では、通常数を使用して、変更されたトレードのみを段階的に更新し、効率を向上させます。 JSON を構造化データに変換 この静的方法は、原始の JSON 応答を それを正常化するために、 Object 含むもの: API _HackerNewsThread タイトル(タイトル、テキスト、メタデータ) All nested comments, flattened into a single list Python Datetime オブジェクト 実施するA コメントの木です。 recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) ハッカーニュースの API 応答を _HackerNewsThread と _HackerNewsComment に変換します。 _add_comments() recursively parses nested comments. _add_comments() recursively parses nested comments. _add_comments() recursively parses nested comments. タイトル+テキストを主なトレードコンテンツに組み合わせる。 インデックスするための完全に構造化されたオブジェクトを生成します。 「Putting It All Together in a Flow」 Your flow now reads exactly like a React component. Define the flow and connect source @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() 各トレードを処理し、構造化された情報を収集する with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) トレードの各コメントを処理し、構造化された情報を収集する with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) データベーステーブルへのエクスポート message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) ココインデックス現在: HackerNews APIについて トラックの変化 徐々に フランシスコ コメント exports to Postgres LIVE MODE サポート あなたのアプリは今、リアルタイムの検索インデックスとしてそれをクエリすることができます。 Querying & Searching the HackerNews Index この時点で、インデックス フローを完了しました。次のステップとして、クエリ マネージャーを定義できます - クエリを CocoInsight で実行できます。クエリを実行するには、選択したライブラリまたはフレームワークを使用できます。 . ハンドラー Query @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) このコードは、ハッカーニュースのトレードやコメントをインデックスしたCocoIndexで検索するクエリハーダーを定義します。 and ) to find rows matching the query. to_tsvector plainto_tsquery 関連性(関連性) )と作成時間、辞書にフォーマットされ、構造化されたものとして返します。 基本的に、インデックスされたコンテンツを完全なテキストで検索し、ランク付け、構造化された結果を提供します。 ts_rank cocoindex.QueryOutput あなたの HackerNews Custom Sourceを実行する あなたのカスタマイズされたソースとフローが準備できたら、CocoIndexで実行するのは簡単です。 or ハッカーニュース update the index on-demand keep it continuously in sync 1. Install Dependencies Python がインストールされていることを確認し、プロジェクトを編集モードでインストールしてください。 pip install -e . これにより、必要なすべての依存性とともに CocoIndex をインストールし、再インストールすることなくコネクタを開発し、更新できます。 2. Update the Target (On-Demand) あなたのターゲット(例えば、Postgres)を最新の HackerNews トレードで満たすには: cocoindex update main 変更されたトレードだけが再処理されます。 あなたのターゲットは、最新の 500 HackerNews トレードと同期しています。 効率的な増加アップデートにより、時間とコンピューティングリソースが節約されます。 更新コマンドを実行するたびに、CocoIndexは変更したトレードのみを再処理し、ターゲットをHackerNewsからの最近の500トレードと同期させます。 cocoindex update -L main フローをライブモードで実行し、HackerNewsを定期的に投票します。 CocoIndex automatically handles incremental changes and keeps the target synchronized. Ideal for dashboards, search, or AI pipelines that require real-time data. 3. CocoInsight でトラブルシューティング&インスピレーション CocoInsightはあなたを あなたのデータの流れを見て、キャップの下で何が起こっているかを理解してください。 visualize and debug your flow Start the server: cocoindex server -ci main その後、あなたのブラウザで UI を開きます: https://cocoindex.io/cocoinsight CocoInsight はパイプラインデータの保持ゼロを備えています - ローカルであなたのフローをデバッグし、検査するのに安全です。 CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. Note that this requires QueryHandler setup in previous step. 次に建てられるもの This simple example opens the door to a lot more: トレンドトピック探知器の構築 Run LLM summarization pipelines on top of indexed threads Add embeddings + vector search 内部データ倉庫に Mirror HN リアルタイムのHNダッシュボードを作成 他のニュースソース(Reddit、Lobstersなど)に拡張する パイプライン全体が宣言的で増加的であるため、それを拡張することは単純です。 Custom Sources により、Wrap Python 論理を増加型データストリームに変え、最良の用例は通常 データ - 標準データベースコネクタを持たない、複雑な巣立つ、または重いプレプロセスが必要なシステム。 any "Hard-to-Reach" LLMコンテキストのための知識の集計 AI ボットのためのコンテキスト エンジンを構築するには、しばしば非標準的な文書源を引く必要があります。 The “Composite” Entity (Data Stitching) ほとんどの企業は、複数のマイクロサービスでユーザーデータを断片化しています. You can build a Custom Source that acts as a "virtual join" before the data ever hits your index. ユーザーデータは、データがあなたのインデックスに到達する前に「仮想結合」として機能します。 For example the Source: Auth サービス (Okta/Auth0) からユーザ ID を取得します。 この ID を使用して Stripe API から請求状態を取得します。 この ID を使用して Internal Redis から使用記録を取得します。 複雑な ETL をダウンストリームに統合する代わりに、Custom Source は単一のコードを生成します。 CocoIndex は、この複合オブジェクトの状態を追跡します。 Auth0 でメールを変更し、インデックスが自動的に更新されます。 User360 または 「Legacy Wrapper」(近代化層) 企業はしばしば、クエリするのに苦しいシステム(SOAP、XML、メインフレーム)に貴重なデータを閉じ込められており、古いシステム自体を書き直すことなく、近代的でクエリ可能なSQLインターフェイス(CocoIndexターゲットを介して)を20年前に設定します。 Public Data Monitor(競争情報) Tracking changes on public websites or APIs that don't offer webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: 使用する The capabilities, you can trigger downstream alerts only when a price changes by >5% or a new regulation is posted, rather than spamming your database with identical polling results. The CocoIndex Value: diff なぜこれが重要なのか Custom Sources extend this model to API — internal, external, legacy, or real-time. どんな This unlocks a simple but powerful pattern: If you can fetch it, CocoIndex can index it, diff it, and sync it. あなたがそれを取得できるなら、CocoIndexはそれをインデックスし、それをディフにし、それを同期することができます。 HackerNews をインデックスするか、数十のエンタープライズ サービスをオーケストラするかに関わらず、このフレームワークは、以下のような安定した背骨を提供します。 持続国家 Deterministic 更新 自動ライン 柔軟な輸出目標 最低限のインフラストラクチャ 「Try It, Fork It, Star It」 If you found this useful, a means a lot — it helps others discover CocoIndex and supports further development. star on GitHub GitHub