في تصميمات النظام المستقل، فإن الفجوة في التاسع من نوعها بين HTTP APIs الخارجية والهدفات البيانات العلاقة تشكل تحديًا تجاريًا مستمرًا، خصوصًا عندما تتوفر استجابة API على الترتيبات التداولية الأصلية أو تظهر أدوات البيانات غير المستقرة، التي تمزقها بشكل عميق (الخطوط البيانية، التعديلات التنظيمية، وتكوينات BOM متعددة المستويات). بدلاً من سجلات التحقق المطلوبة (العملات المخصصة التي تدعو إلى curl + jq + psql INSERT) تتوفر إطارًا إعلاميًا لتغيير نقاط نهاية HTTP / gRPC ذاتيًا إلى أدوات البيانات المستقرة، التي تمزق بشكل مت • تحويل API→Stateful Stream: تحويل طلبات REST GET إلى التسمية المطبوعة • تحويل الأشجار المرتفعة مع اكتشاف دورة: التعامل مع DAGs غير المحددة من التعليقات على عمق (المنتجعات المرتفعة المرتفعة) دون تكلفة المرتفعات • الدعم لتطور النماذج: كتابة أساسية على طبقات البيانات في Python مع التوافق المستمر من خلال التعديلات الدقيقة الهيكلية • تدمير متصفح Postgres Sink: الاستفادة من جمعية الاتصال async psycopg3 + بروتوكول COPY لمتصفحات المجموعات • تصميم البحث باللغة الكاملة مع إشارة GIN: تكوين قوائم tsvector باستخدام وظائف اللغة الإنجليزية + وظائف الترتيب في هذه الحالة، نحن نخلق اتصال خصيصا لـ HackerNews.هذا يكتشف القصص الحديثة + التعليقات المرتبطة، وتصنيفها، وتظهر رابط البحث البسيط التي تستخدم البحث الكامل Postgres. لماذا استخدام مصدر مخصص؟ في العديد من السيناريوهات ، لا يقرر الطائرات فقط من الأقراص النظيفة. خدمات REST الداخلية شركاء APIs أنظمة التراث أنماط البيانات غير القياسية التي لا تتوافق مع المكونات التقليدية CocoIndex’s Custom Source API makes these integrations بدلاً من كتابة الملفات المتعلقة بالطباعة، يمكنك تغطية API الخاص بك ك "جزء مصدر"، و CocoIndex يأخذها من هناك. الإعلانات Project Walkthrough - بناء مؤشر HackerNews الأهداف Call HackerNews Search API ردود الفعل على الردود تحديث القنوات المتعددة فقط محتوى كبير في Postgres إزالة رابط البحث عن النص يعمل CocoIndex على تشخيص التغيير ، و idempotency ، و lineage ، و state sync تلقائيا. نظرة عامة يتكون الطائرات من ثلاث أجزاء رئيسية: Define a custom source ( ) HackerNewsConnector Calls HackerNews API Emits rows for changed/updated threads Pulls full thread + comment tree Build an index with CocoIndex Flow Collect thread content Collect all comments recursively Export to a Postgres table ( ) hn_messages Add a lightweight query handler Uses PostgreSQL full-text search Returns ranked matches for a keyword query كل واحد فقط العمليات تغيرت أساليب HN وحافظت على كل شيء في المساواة. cocoindex update المشروع هو مصدر مفتوح ومتاحة على . Github متطلبات if you don't have one. Install Postgres تحديد نموذج البيانات يحدد كل مصدر مخصص اثنين من أنواع البيانات الصغيرة: نوع المفاتيح يحدد الفرد نوع القيمة → المحتوى الكامل لهذا البند في Hacker News ، كل أخبار هو موضوع ، ويمكن أن يكون لكل موضوع العديد من التعليقات. بالنسبة إلى HackerNews ، دعونا نحدد المفتاحات مثل هذا: class _HackerNewsThreadKey(NamedTuple): """Row key type for HackerNews source.""" thread_id: str يجب أن تكون مفتاحية: هشام Serializable ثابت (لا يتغير مع مرور الوقت) القيمة تحتوي على مجموعة البيانات الحقيقية: @dataclasses.dataclass class _HackerNewsComment: id: str author: str | None text: str | None created_at: datetime | None @dataclasses.dataclass class _HackerNewsThread: """Value type for HackerNews source.""" author: str | None text: str url: str | None created_at: datetime | None comments: list[_HackerNewsComment] هذا يخبر CocoIndex بالضبط كيف تبدو كل "المنتج" من HackerNews عندما يتم تسجيلها بالكامل. إرسال رسالة و كل تعليقاتها ، في حين أن وتشير إلى تعليقات فردية. _HackerNewsThread _HackerNewsComment بناء اتصال مصدر مخصص يحتوي مصدر مخصص على جزءين: SourceSpec - تحديد الإعدادات SourceConnector – منطق التشغيل للقراءة البيانات كتابة SourceSpec A في CocoIndex هي تصنيف مكتوب يعطي النظام و لا تحصل على البيانات نفسها - وهذا يتم معالجتها من قبل متصفح المصدر. SourceSpec what data to fetch how to connect class HackerNewsSource(SourceSpec): """Source spec for HackerNews API.""" tag: str | None = None max_results: int = 100 الأماكن : tag Optional filter for the type of HackerNews content. Example: , , . "story" "job" "poll" If , it fetches all types. None max_results Maximum number of threads to fetch from HackerNews at a time. Helps limit the size of the index for performance or testing. تحديد الاتصال قم بتحديد تكوين الـ Connector ومرحلة HTTP حتى تتمكن من الحصول على بيانات HackerNews بشكل فعال. @source_connector( spec_cls=HackerNewsSource, key_type=_HackerNewsThreadKey, value_type=_HackerNewsThread, ) class HackerNewsConnector: """Custom source connector for HackerNews API.""" _spec: HackerNewsSource _session: aiohttp.ClientSession def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession): self._spec = spec self._session = session @staticmethod async def create(spec: HackerNewsSource) -> "HackerNewsConnector": """Create a HackerNews connector from the spec.""" return HackerNewsConnector(spec, aiohttp.ClientSession()) tells CocoIndex that this class is a . It specifies: source_connector custom source connector : the configuration class ( ) spec_cls HackerNewsSource : how individual items are identified ( ) key_type _HackerNewsThreadKey : the structure of the data returned ( ) value_type _HackerNewsThread يتم إرسال create() من قبل CocoIndex لتأسيس الاتصال ، ويقوم بتحديد aiohttp.ClientSession الجديد لتقديم طلبات HTTP. قائمة الملفات المتاحة ذاك الطريقة في هو المسؤول عن التي تتوافق مع المعايير المحددة (التعليقات، النتائج الكبيرة) وتعود البيانات المتعددة حولها. ما الذي قد يتغير. list() HackerNewsConnector discovering all available HackerNews threads know which threads exist async def list( self, ) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]: """List HackerNews threads using the search API.""" # Use HackerNews search API search_url = "https://hn.algolia.com/api/v1/search_by_date" params: dict[str, Any] = {"hitsPerPage": self._spec.max_results} if self._spec.tag: params["tags"] = self._spec.tag async with self._session.get(search_url, params=params) as response: response.raise_for_status() data = await response.json() for hit in data.get("hits", []): if thread_id := hit.get("objectID", None): utime = hit.get("updated_at") ordinal = ( int(datetime.fromisoformat(utime).timestamp()) if utime else NO_ORDINAL ) yield PartialSourceRow( key=_HackerNewsThreadKey(thread_id=thread_id), data=PartialSourceRowData(ordinal=ordinal), ) فتاة . list() metadata for all recent HackerNews threads For each thread: It generates a with: PartialSourceRow : the thread ID key : the last updated timestamp ordinal الهدف: يتيح لـ CocoIndex مراقبة ما هي الرموز موجودة والتي تغيرت دون الحصول على محتوى الرموز الكامل. تحميل محتوى الشريط الكامل هذا النموذج async يصل إلى (بما في ذلك تعليقاته) من ويدخل النتيجة في A موضوع - هي الهيكل الذي يستخدمه CocoIndex لتسخين مستوى الجانب. single HackerNews thread API PartialSourceRowData async def get_value( self, key: _HackerNewsThreadKey ) -> PartialSourceRowData[_HackerNewsThread]: """Get a specific HackerNews thread by ID using the items API.""" # Use HackerNews items API to get full thread with comments item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}" async with self._session.get(item_url) as response: response.raise_for_status() data = await response.json() if not data: return PartialSourceRowData( value=NON_EXISTENCE, ordinal=NO_ORDINAL, content_version_fp=None, ) return PartialSourceRowData( value=HackerNewsConnector._parse_hackernews_thread(data) ) get_value() يحصل على المحتوى الكامل لبرنامج معين، بما في ذلك التعليقات. تحويل JSON خام إلى أجهزت Python المكونة (_HackerNewsThread + _HackerNewsComment). يعود ملف PartialSourceRowData الذي يحتوي على العناوين الكاملة. الدعم العادي يقول CocoIndex أن هذا المصدر يوفر العلامات الزمنية (العلامات العادية). def provides_ordinal(self) -> bool: return True يستخدم CocoIndex المفاهيم العادية لتحديث المفاهيم التي تغيرت فقط بشكل متزايد ، مما يزيد من الكفاءة. تحويل JSON إلى البيانات الهيكلية هذه الطريقة الإستراتيجية تتخذ إجابة JSON خام من ثم تحوّل إلى طبيعي الموضوع الذي يحتوي على: API _HackerNewsThread المقالة (الوصف، النص، الملفات) جميع التعليقات المرتبطة ، مزيج إلى قائمة واحدة أداة Python Datetime ويعمل على من شجرة التعليق recursive traversal @staticmethod def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread: comments: list[_HackerNewsComment] = [] def _add_comments(parent: dict[str, Any]) -> None: children = parent.get("children", None) if not children: return for child in children: ctime = child.get("created_at") if comment_id := child.get("id", None): comments.append( _HackerNewsComment( id=str(comment_id), author=child.get("author", ""), text=child.get("text", ""), created_at=datetime.fromisoformat(ctime) if ctime else None, ) ) _add_comments(child) _add_comments(data) ctime = data.get("created_at") text = data.get("title", "") if more_text := data.get("text", None): text += "\n\n" + more_text return _HackerNewsThread( author=data.get("author"), text=text, url=data.get("url"), created_at=datetime.fromisoformat(ctime) if ctime else None, comments=comments, ) Converts raw HackerNews API response into and . _HackerNewsThread _HackerNewsComment recursively parses nested comments. _add_comments() يجمع عنوان + النص إلى المحتوى الرئيسي. إنتاج أداة مركبة بالكامل مستعدة لتسجيل. وضع كل شيء معا في نهر واحد يقرر تدفقك الآن بالضبط مثل عنصر React. تحديد حركة المرور والاتصال بالمصدر @cocoindex.flow_def(name="HackerNewsIndex") def hackernews_flow( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: # Add the custom source to the flow data_scope["threads"] = flow_builder.add_source( HackerNewsSource(tag="story", max_results=500), refresh_interval=timedelta(minutes=1), ) # Create collectors for different types of searchable content message_index = data_scope.add_collector() معالجة كل حلقات وجمع المعلومات الهيكلية with data_scope["threads"].row() as thread: # Index the main thread content message_index.collect( id=thread["thread_id"], thread_id=thread["thread_id"], content_type="thread", author=thread["author"], text=thread["text"], url=thread["url"], created_at=thread["created_at"], ) معالجة كل تعليق من نطاق وجمع المعلومات الهيكلية with thread["comments"].row() as comment: message_index.collect( id=comment["id"], thread_id=thread["thread_id"], content_type="comment", author=comment["author"], text=comment["text"], created_at=comment["created_at"], ) تصدير إلى جدولات قاعدة البيانات message_index.export( "hn_messages", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) CocoIndex الآن: تحميل برنامج HackerNews API التغيرات المتسارعة في الطرق الوسائط المتعددة التعليقات exports to Postgres دعم Live Mode يمكنك الآن استفسار التطبيق كمعيار البحث في الوقت الحقيقي. البحث والتحقق من HackerNews Index في هذه اللحظة تم الانتهاء من تدفق الأندرويد. كخطوة التالية، يمكنك تحديد ممارسات الأسئلة - لذلك يمكنك تشغيل الأسئلة في CocoInsight. يمكنك استخدام أي مكتبة أو إطار من اختيارك لإجراء الأسئلة. يمكنك قراءة المزيد في الوثائق حول . تريد التعامل مع @hackernews_flow.query_handler() def search_text(query: str) -> cocoindex.QueryOutput: """Search HackerNews threads by title and content.""" table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages") with connection_pool().connection() as conn: with conn.cursor() as cur: # Simple text search using PostgreSQL's text search capabilities cur.execute( f""" SELECT id, thread_id, author, content_type, text, created_at, ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank FROM {table_name} WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s) ORDER BY rank DESC, created_at DESC """, (query, query), ) results = [] for row in cur.fetchall(): results.append( { "id": row[0], "thread_id": row[1], "author": row[2], "content_type": row[3], "text": row[4], "created_at": row[5].isoformat(), } ) return cocoindex.QueryOutput(results=results) في هذا المقال سنقوم بتحديد ما إذا كان من الممكن إرسال رسائل البريد الإلكتروني أو رسائل البريد الإلكتروني أو رسائل البريد الإلكتروني أو رسائل البريد الإلكتروني أو رسائل البريد الإلكتروني أو رسائل البريد الإلكتروني. و البحث عن الصفحات التي تتوافق مع الطلب. to_tsvector plainto_tsquery يتم تقييم النتائج حسب الخصائص ( ) و وقت إنشاء ، والتي تم تشكيلها في الكتب ، وتعود كجزء من الهيكلية . Essentially, it performs a full-text search over the indexed content and delivers ranked, structured results. ts_rank cocoindex.QueryOutput Running Your HackerNews Custom Source بمجرد أن تكون مصدرك المختص وموجاتك جاهزة، فإن تشغيلها مع CocoIndex بسيط. or with HackerNews. update the index on-demand keep it continuously in sync 1 - تثبيت القيم تأكد من تثبيت Python ثم تثبيت مشروعك في وضع قابل للتعديل: pip install -e . هذا يتم تثبيت CocoIndex مع جميع التوازنات المطلوبة ، مما يسمح لك بتطوير وتحديث الاتصال دون إعادة تثبيتها. تحديث الهدف (على الطلب) لتشمل الهدف الخاص بك (على سبيل المثال، Postgres) مع أحدث العناوين HackerNews: cocoindex update main Only threads that will be re-processed. have changed Your target remains in sync with the . most recent 500 HackerNews threads Efficient incremental updates save time and compute resources. يرجى ملاحظة أن كل مرة تقوم بتشغيل إجراءات التحديث، سيقوم CocoIndex فقط بتنفيذ إجراءات التحديث التي تغيرت، ويحافظ على الهدف مع 500 الإجراءات الأخيرة من HackerNews. يمكنك أيضًا إجراء إجراءات التحديث في الوضع الحي، والذي سيحافظ على الهدف مع المصدر باستمرار: cocoindex update -L main يقوم بتشغيل الدورة في الوضع الحي ، مع استطلاع HackerNews في وقت متأخر. CocoIndex automatically handles incremental changes and keeps the target synchronized. مثالي للطائرات، والبحث، أو طائرات AI التي تتطلب بيانات في الوقت الحقيقي. 3. Troubleshoot & Inspect with CocoInsight CocoInsight يتيح لك ، انظر لغة البيانات الخاصة بك ، وتفهم ما يحدث تحت القفص. visualize and debug your flow Start the server: cocoindex server -ci main ثم ابدأ في إرسالها إلى المستخدم: https://cocoindex.io/cocoinsight CocoInsight لديه الحد الأدنى من الاحتفاظ بالبيانات من الطائرات - إنه آمن للتحقق من وتقييم تدفقاتك المحلية. CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally. ملاحظة أن هذا يتطلب إعداد QueryHandler في الخطوة السابقة. ما يمكنك بناء في النهاية هذا مثال بسيط يفتح الباب لأكثر من ذلك بكثير: Build a trending-topic detector تشغيل خطوط إجمالية LLM فوق العناوين المستندة إضافات + البحث السكاني Mirror HN في مخزون البيانات الداخلية الخاص بك بناء لوحة HN في الوقت الحقيقي تنتشر إلى مصادر أخرى من الأخبار (Reddit، Lobsters، وما إلى ذلك) نظرًا لأن جميع أنحاء الطائرات هي إعلانية ومستمرة ، فإن توسيعها هو بسيط. Since Custom Sources allow you to wrap Python logic into an incremental data stream, the best use cases are usually data—systems that don't have standard database connectors, have complex nesting, or require heavy pre-processing. أي "Hard-to-Reach" المشترك من المعرفة لفاعلية LLM Building a context engine for an AI bot often requires pulling from non-standard documentation sources. The "Composite" Entity (Data Stitching) معظم الشركات لديها البيانات المستخدمة مفصلة على عدة ميكروسوفتات، ويمكنك إنشاء مصدر مخصص يعمل ك "الانضمام الفعلي" قبل أن تصل البيانات إلى مؤشرك. For example the Source: يحصل على ID المستخدم من خدمة Auth (Okta/Auth0). يستخدم هذا ID للحصول على حالة الحساب من API Stripe. Uses that ID to fetch usage logs from an . Internal Redis بدلاً من إدارة ETL المعقدة الانضمام إلى النطاق التالي ، فإن المصدر المخصّص يعود إلى واحد. CocoIndex يتبع حالة هذا الكائن المكون؛ إذا تم تحديث المستخدم في Stripe تغيير بريدك الإلكتروني في Auth0, تحديث المؤشر تلقائيًا. User360 أو The "Legacy Wrapper" (صندوق التحديث) غالبا ما يكون لدى الشركات بيانات قيمة محاطة في الأنظمة التي من الصعب استفسارها (SOAP، XML، Mainframes). يمكنك الحصول على رابط SQL الحديث، الذي يمكن الاستفسار عنه (من خلال هدف CocoIndex) فوق نظام عمره 20 عامًا دون إعادة كتابة النظام القديم نفسه. Public Data Monitor (Competitive Intelligence) Tracking changes on public websites or APIs that don't offer webhooks. The Source: Scraping e-commerce product pages. Competitor Pricing: Polling a government RSS feed or FDA drug approval database. Regulatory Feeds: Hitting a CoinGecko or Yahoo Finance API. Crypto/Stocks: باستخدام يمكنك تفعيل الإشعارات السريعة فقط عند تغير أسعار > 5٪ أو نشر قوانين جديدة ، بدلاً من إرسال بيانات البريد الإلكتروني الخاصة بك مع نتائج استطلاعات مشابهة. The CocoIndex Value: diff Why This Matters الموارد المخصصة تطوير هذا النموذج إلى API - داخلي أو خارجي أو ذكي أو في الوقت الحقيقي. أي This unlocks a simple but powerful pattern: If you can fetch it, CocoIndex can index it, diff it, and sync it. إذا تمكنت من الحصول على ذلك، يمكن لـ CocoIndex أن تصفحها، وتقسيمها، وتسجيلها. Whether you’re indexing HackerNews or orchestrating dozens of enterprise services, the framework gives you a stable backbone with: persistent state التحديثات الدقيقة الخطوط التلقائية تحديد الأهداف المرجوة للتصدير الحد الأدنى من البنية التحتية ⭐ Try It, Fork It, Star It If you found this useful, a means a lot — it helps others discover CocoIndex and supports further development. star on Github Github