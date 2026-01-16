Complex intake forms contain rich structured data, but that structure is implicit rather than explicit: nested sections, conditional fields, repeated entities, and layout-dependent semantics. Conventional OCR pipelines flatten these documents into plain text, after which regexes or heuristic parsers attempt to reconstruct structure that was already lost. This approach is inherently brittle and fails under schema variation, layout drift, or partial updates. In this article, we present an end-to-end pipeline for direct structured extraction from PDFs that emits typed, Pydantic-validated models, without intermediate text normalization, markdown conversion, or layout-specific rules. direct structured extraction from PDFs typed, Pydantic-validated models The system combines: DSPy for multimodal, schema-constrained extraction using Gemini 2.5 Flash (vision)\nCocoIndex for incremental execution, content-addressed caching, and persistence into databases DSPy for multimodal, schema-constrained extraction using Gemini 2.5 Flash (vision) DSPy CocoIndex for incremental execution, content-addressed caching, and persistence into databases CocoIndex Rather than treating PDFs as unstructured text, we treat them as structured visual inputs and extract data directly into application-level schemas. This enables deterministic validation, incremental reprocessing, schema evolution, and production-grade reliability across heterogeneous document formats. The entire code is open sourced with Apache 2.0 license. To see more examples build with CocoIndex, you could refer to the examples page. ⭐ Star the project if you find it helpful! open sourced with Apache 2.0 license examples ⭐ Star Why DSPy + CocoIndex? This pipeline combines two complementary abstractions: schema-constrained LLM computation and incremental dataflow execution. Each addresses a different failure mode in production document extraction systems. schema-constrained LLM computation incremental dataflow execution DSPy: A programming model for LLM pipelines 🔗 https://github.com/stanfordnlp/dspy https://github.com/stanfordnlp/dspy Most LLM-based extraction systems are built around prompt engineering: hand-written prompts with instructions, few-shot examples, and ad-hoc output parsing. This approach does not scale operationally: Output correctness is sensitive to prompt wording, model upgrades, and input distribution shifts\nBusiness logic is embedded in strings, making it difficult to test, refactor, or version\nValidation and retries are bolted on after the fact Output correctness is sensitive to prompt wording, model upgrades, and input distribution shifts Business logic is embedded in strings, making it difficult to test, refactor, or version Validation and retries are bolted on after the fact DSPy replaces prompts-as-strings with a declarative programming model. Instead of specifying how to prompt the model, you specify what each step must produce: typed inputs, structured outputs, constraints, and validation rules. DSPy then automatically synthesizes and optimizes the underlying prompts to satisfy that specification. declarative programming model how what In practice, this enables: Schema-driven extraction with explicit contracts\nDeterministic validation (e.g. via Pydantic) at every LLM boundary\nComposable, testable LLM modules that behave like regular functions Schema-driven extraction with explicit contracts Deterministic validation (e.g. via Pydantic) at every LLM boundary Composable, testable LLM modules that behave like regular functions This makes multimodal extraction pipelines substantially more robust to change. CocoIndex: Incremental execution engine for AI data pipelines 🔗 https://github.com/cocoindex-ai/cocoindex CocoIndex is a high-performance data processing engine designed for AI workloads with incremental recomputation as a first-class primitive. Developers write straightforward Python transformations over in-memory objects; CocoIndex executes them as a resilient, scalable pipeline backed by a Rust engine. incremental recomputation Key properties: Content-aware incremental recompute: only affected downstream nodes are re-executed when sources or logic change\nLow-latency backfills: cold-start recomputation drops from hours to seconds\nCost efficiency: minimizes redundant LLM and GPU calls\nProduction parity: the same flow defined in a notebook can be promoted directly to live execution Content-aware incremental recompute: only affected downstream nodes are re-executed when sources or logic change Content-aware incremental recompute Low-latency backfills: cold-start recomputation drops from hours to seconds Low-latency backfills Cost efficiency: minimizes redundant LLM and GPU calls Cost efficiency Production parity: the same flow defined in a notebook can be promoted directly to live execution Production parity In production, CocoIndex runs in “live” mode using polling or change data capture. Derived targets—databases, vector stores, or APIs—are continuously kept in sync with evolving unstructured sources such as PDFs, code repositories, and multi-hop API graphs. Because every transformation is tracked with lineage and observability, the system provides built-in auditability and explainability—critical for regulated or compliance-sensitive workflows, but equally valuable for debugging and long-term maintenance. Why the combination works DSPy guarantees correctness at the model boundary.CocoIndex guaranteescorrectness over time. correctness at the model boundary correctness over time Together, they form a pipeline where structured extraction is: Typed and validated\nIncrementally maintained\nObservable and reproducible\nSafe to operate in production under change Typed and validated Incrementally maintained Observable and reproducible Safe to operate in production under change Flow Overview Prerequisites Before getting started, make sure you have the following set up: Install Postgres if you don't have one, ensure you can connect to it from your development environment.\nPython dependencies Install Postgres if you don't have one, ensure you can connect to it from your development environment. Install Postgres Python dependencies pip install -U cocoindex dspy-ai pydantic pymupdf pip install -U cocoindex dspy-ai pydantic pymupdf Create a .env file: Create a .env file: .env # Postgres database address for cocoindex\nCOCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex\n\n# Gemini API key\nGEMINI_API_KEY=YOUR_GEMINI_API_KEY # Postgres database address for cocoindex\nCOCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex\n\n# Gemini API key\nGEMINI_API_KEY=YOUR_GEMINI_API_KEY Pydantic Models: Define the structured schema We defined Pydantic-style classes (Contact, Address, Insurance, etc.) to match a FHIR-inspired patient schema, enabling structured and validated representations of patient data. Each model corresponds to a key aspect of a patient's record, ensuring both type safety and nested relationships. Contact Address Insurance FHIR-inspired patient schema 1. Contact Model class Contact(BaseModel):\n name: str\n phone: str\n relationship: str class Contact(BaseModel):\n name: str\n phone: str\n relationship: str Represents an emergency or personal contact for the patient.\nFields:\n\nname: Contact's full name.\nphone: Contact phone number.\nrelationship: Relation to the patient (e.g., parent, spouse, friend). Represents an emergency or personal contact for the patient. emergency or personal contact Fields:\n\nname: Contact's full name.\nphone: Contact phone number.\nrelationship: Relation to the patient (e.g., parent, spouse, friend). name: Contact's full name.\nphone: Contact phone number.\nrelationship: Relation to the patient (e.g., parent, spouse, friend). name: Contact's full name. name phone: Contact phone number. phone relationship: Relation to the patient (e.g., parent, spouse, friend). relationship 2. Address Model class Address(BaseModel):\n street: str\n city: str\n state: str\n zip_code: str class Address(BaseModel):\n street: str\n city: str\n state: str\n zip_code: str Represents a postal address.\nFields:\n\nstreet, city, state, zip_code: Standard address fields. Represents a postal address. postal address Fields:\n\nstreet, city, state, zip_code: Standard address fields. street, city, state, zip_code: Standard address fields. street, city, state, zip_code: Standard address fields. street city state zip_code 3. Pharmacy Model class Pharmacy(BaseModel):\n name: str\n phone: str\n address: Address class Pharmacy(BaseModel):\n name: str\n phone: str\n address: Address Represents the patient’s preferred pharmacy.\nFields:\n\nname: Pharmacy name.\nphone: Pharmacy contact number.\naddress: Uses the Address model for structured address information. Represents the patient’s preferred pharmacy. patient’s preferred pharmacy Fields:\n\nname: Pharmacy name.\nphone: Pharmacy contact number.\naddress: Uses the Address model for structured address information. name: Pharmacy name.\nphone: Pharmacy contact number.\naddress: Uses the Address model for structured address information. name: Pharmacy name. name phone: Pharmacy contact number. phone address: Uses the Address model for structured address information. address Address 4. Insurance Model class Insurance(BaseModel):\n provider: str\n policy_number: str\n group_number: str | None = None\n policyholder_name: str\n relationship_to_patient: str class Insurance(BaseModel):\n provider: str\n policy_number: str\n group_number: str | None = None\n policyholder_name: str\n relationship_to_patient: str Represents the patient’s insurance information.\nFields:\n\nprovider: Insurance company name.\npolicy_number: Unique policy number.\ngroup_number: Optional group number.\npolicyholder_name: Name of the person covered under the insurance.\nrelationship_to_patient: Relationship to patient (e.g., self, parent). Represents the patient’s insurance information. insurance information Fields:\n\nprovider: Insurance company name.\npolicy_number: Unique policy number.\ngroup_number: Optional group number.\npolicyholder_name: Name of the person covered under the insurance.\nrelationship_to_patient: Relationship to patient (e.g., self, parent). provider: Insurance company name.\npolicy_number: Unique policy number.\ngroup_number: Optional group number.\npolicyholder_name: Name of the person covered under the insurance.\nrelationship_to_patient: Relationship to patient (e.g., self, parent). provider: Insurance company name. provider policy_number: Unique policy number. policy_number group_number: Optional group number. group_number policyholder_name: Name of the person covered under the insurance. policyholder_name relationship_to_patient: Relationship to patient (e.g., self, parent). relationship_to_patient 5. Condition Model class Condition(BaseModel):\n name: str\n diagnosed: bool class Condition(BaseModel):\n name: str\n diagnosed: bool Represents a medical condition.\nFields:\n\nname: Condition name (e.g., Diabetes).\ndiagnosed: Boolean indicating whether it has been officially diagnosed. Represents a medical condition. medical condition Fields:\n\nname: Condition name (e.g., Diabetes).\ndiagnosed: Boolean indicating whether it has been officially diagnosed. name: Condition name (e.g., Diabetes).\ndiagnosed: Boolean indicating whether it has been officially diagnosed. name: Condition name (e.g., Diabetes). name diagnosed: Boolean indicating whether it has been officially diagnosed. diagnosed 6. Medication Model class Medication(BaseModel):\n name: str\n dosage: str class Medication(BaseModel):\n name: str\n dosage: str Represents a current medication the patient is taking.\nFields:\n\nname: Medication name.\ndosage: Dosage information (e.g., "10mg daily"). Represents a current medication the patient is taking. current medication Fields:\n\nname: Medication name.\ndosage: Dosage information (e.g., "10mg daily"). name: Medication name.\ndosage: Dosage information (e.g., "10mg daily"). name: Medication name. name dosage: Dosage information (e.g., "10mg daily"). dosage 7. Allergy Model class Allergy(BaseModel):\n name: str class Allergy(BaseModel):\n name: str Represents a known allergy.\nFields:\n\nname: Name of the allergen (e.g., peanuts, penicillin). Represents a known allergy. known allergy Fields:\n\nname: Name of the allergen (e.g., peanuts, penicillin). name: Name of the allergen (e.g., peanuts, penicillin). name: Name of the allergen (e.g., peanuts, penicillin). name 8. Surgery Model class Surgery(BaseModel):\n name: str\n date: str class Surgery(BaseModel):\n name: str\n date: str Represents a surgery or procedure the patient has undergone.\nFields:\n\nname: Surgery name (e.g., Appendectomy).\ndate: Surgery date (as a string, ideally ISO format). Represents a surgery or procedure the patient has undergone. surgery or procedure Fields:\n\nname: Surgery name (e.g., Appendectomy).\ndate: Surgery date (as a string, ideally ISO format). name: Surgery name (e.g., Appendectomy).\ndate: Surgery date (as a string, ideally ISO format). name: Surgery name (e.g., Appendectomy). name date: Surgery date (as a string, ideally ISO format). date 9. Patient Model class Patient(BaseModel):\n name: str\n dob: datetime.date\n gender: str\n address: Address\n phone: str\n email: str\n preferred_contact_method: str\n emergency_contact: Contact\n insurance: Insurance | None = None\n reason_for_visit: str\n symptoms_duration: str\n past_conditions: list[Condition] = Field(default_factory=list)\n current_medications: list[Medication] = Field(default_factory=list)\n allergies: list[Allergy] = Field(default_factory=list)\n surgeries: list[Surgery] = Field(default_factory=list)\n occupation: str | None = None\n pharmacy: Pharmacy | None = None\n consent_given: bool\n consent_date: str | None = None class Patient(BaseModel):\n name: str\n dob: datetime.date\n gender: str\n address: Address\n phone: str\n email: str\n preferred_contact_method: str\n emergency_contact: Contact\n insurance: Insurance | None = None\n reason_for_visit: str\n symptoms_duration: str\n past_conditions: list[Condition] = Field(default_factory=list)\n current_medications: list[Medication] = Field(default_factory=list)\n allergies: list[Allergy] = Field(default_factory=list)\n surgeries: list[Surgery] = Field(default_factory=list)\n occupation: str | None = None\n pharmacy: Pharmacy | None = None\n consent_given: bool\n consent_date: str | None = None Represents a complete patient record with personal, medical, and administrative information.\nKey fields:\n\nname, dob, gender: Basic personal info.\naddress, phone, email: Contact info.\npreferred_contact_method: How the patient prefers to be reached.\nemergency_contact: Nested Contact model.\ninsurance: Optional nested Insurance model.\nreason_for_visit, symptoms_duration: Visit details.\npast_conditions, current_medications, allergies, surgeries: Lists of nested models for comprehensive medical history.\noccupation: Optional job info.\npharmacy: Optional nested Pharmacy model.\nconsent_given, consent_date: Legal/administrative consent info. Represents a complete patient record with personal, medical, and administrative information. complete patient record Key fields:\n\nname, dob, gender: Basic personal info.\naddress, phone, email: Contact info.\npreferred_contact_method: How the patient prefers to be reached.\nemergency_contact: Nested Contact model.\ninsurance: Optional nested Insurance model.\nreason_for_visit, symptoms_duration: Visit details.\npast_conditions, current_medications, allergies, surgeries: Lists of nested models for comprehensive medical history.\noccupation: Optional job info.\npharmacy: Optional nested Pharmacy model.\nconsent_given, consent_date: Legal/administrative consent info. name, dob, gender: Basic personal info.\naddress, phone, email: Contact info.\npreferred_contact_method: How the patient prefers to be reached.\nemergency_contact: Nested Contact model.\ninsurance: Optional nested Insurance model.\nreason_for_visit, symptoms_duration: Visit details.\npast_conditions, current_medications, allergies, surgeries: Lists of nested models for comprehensive medical history.\noccupation: Optional job info.\npharmacy: Optional nested Pharmacy model.\nconsent_given, consent_date: Legal/administrative consent info. name, dob, gender: Basic personal info. name dob gender address, phone, email: Contact info. address phone email preferred_contact_method: How the patient prefers to be reached. preferred_contact_method emergency_contact: Nested Contact model. emergency_contact Contact insurance: Optional nested Insurance model. insurance Insurance reason_for_visit, symptoms_duration: Visit details. reason_for_visit symptoms_duration past_conditions, current_medications, allergies, surgeries: Lists of nested models for comprehensive medical history. past_conditions current_medications allergies surgeries occupation: Optional job info. occupation pharmacy: Optional nested Pharmacy model. pharmacy Pharmacy consent_given, consent_date: Legal/administrative consent info. consent_given consent_date Why Use Pydantic Here? Validation: Ensures all fields are the correct type (e.g., dob is a date).\nStructured Nested Models: Patient has nested objects like Address, Contact, and Insurance.\nDefault Values & Optional Fields: Handles optional fields and defaults (Field(default_factory=list) ensures empty lists if no data).\nSerialization: Easily convert models to JSON for APIs or databases.\nError Checking: Automatically raises errors if invalid data is provided. Validation: Ensures all fields are the correct type (e.g., dob is a date). Validation: dob date Structured Nested Models: Patient has nested objects like Address, Contact, and Insurance. Structured Nested Models: Address Contact Insurance Default Values & Optional Fields: Handles optional fields and defaults (Field(default_factory=list) ensures empty lists if no data). Default Values & Optional Fields: Field(default_factory=list) Serialization: Easily convert models to JSON for APIs or databases. Serialization: Error Checking: Automatically raises errors if invalid data is provided. Error Checking: DSPy Vision Extractor DSPy Signature Let’s define PatientExtractionSignature. A Signature describes what data your module expects and what it will produce. Think of it as a schema for an AI task. PatientExtractionSignature Signature schema for an AI task PatientExtractionSignature is a dspy.Signature, which is DSPy's way of declaring what the model should do, not how it does it. PatientExtractionSignature dspy.Signature what how # DSPy Signature for patient information extraction from images\nclass PatientExtractionSignature(dspy.Signature):\n """Extract structured patient information from a medical intake form image."""\n\n form_images: list[dspy.Image] = dspy.InputField(\n desc="Images of the patient intake form pages"\n )\n patient: Patient = dspy.OutputField(\n desc="Extracted patient information with all available fields filled"\n ) # DSPy Signature for patient information extraction from images\nclass PatientExtractionSignature(dspy.Signature):\n """Extract structured patient information from a medical intake form image."""\n\n form_images: list[dspy.Image] = dspy.InputField(\n desc="Images of the patient intake form pages"\n )\n patient: Patient = dspy.OutputField(\n desc="Extracted patient information with all available fields filled"\n ) This signature defines task contract for patient information extraction. task contract Inputs: form_images – a list of images of the intake form.\nOutputs: patient – a structured Patient object. Inputs: form_images – a list of images of the intake form. form_images Outputs: patient – a structured Patient object. patient Patient From DSPy's point of view, this Signature is a "spec": a mapping from an image-based context to a structured, Pydantic-backed semantic object that can later be optimized, trained, and composed with other modules. PatientExtractor Module PatientExtractor is a dspy.Module, which in DSPy is a composable, potentially trainable building block that implements the Signature. PatientExtractor dspy.Module class PatientExtractor(dspy.Module):\n """DSPy module for extracting patient information from intake form images."""\n\n def __init__(self) -> None:\n super().__init__()\n self.extract = dspy.ChainOfThought(PatientExtractionSignature)\n\n def forward(self, form_images: list[dspy.Image]) -> Patient:\n """Extract patient information from form images and return as a Pydantic model."""\n result = self.extract(form_images=form_images)\n return result.patient # type: ignore class PatientExtractor(dspy.Module):\n """DSPy module for extracting patient information from intake form images."""\n\n def __init__(self) -> None:\n super().__init__()\n self.extract = dspy.ChainOfThought(PatientExtractionSignature)\n\n def forward(self, form_images: list[dspy.Image]) -> Patient:\n """Extract patient information from form images and return as a Pydantic model."""\n result = self.extract(form_images=form_images)\n return result.patient # type: ignore In __init__, ChainOfThought is a DSPy primitive module that knows how to call an LLM with reasoning-style prompting to satisfy the given Signature. In other words, it is a default "strategy" for solving the "extract patient from images" task.\nThe forward method is DSPy's standard interface for executing a module. You pass form_images into self.extract(). DSPy then handles converting this call into an LLM interaction (or a trained program) that produces a patient field as declared in the Signature. In __init__, ChainOfThought is a DSPy primitive module that knows how to call an LLM with reasoning-style prompting to satisfy the given Signature. In other words, it is a default "strategy" for solving the "extract patient from images" task. __init__ ChainOfThought primitive module The forward method is DSPy's standard interface for executing a module. You pass form_images into self.extract(). DSPy then handles converting this call into an LLM interaction (or a trained program) that produces a patient field as declared in the Signature. forward form_images self.extract() patient Conceptually, PatientExtractor is an ETL operator: the Signature describes the input/output types, and the internal ChainOfThought module is the function that fills that contract. PatientExtractor ETL operator ChainOfThought Single-Step Extraction Now let’s wire the DSPy Module to extract from a single PDF. From high level, The extractor receives PDF bytes directly\nInternally converts PDF pages to DSPy Image objects using PyMuPDF\nProcesses images with vision model\nReturns Pydantic model directly The extractor receives PDF bytes directly Internally converts PDF pages to DSPy Image objects using PyMuPDF Processes images with vision model Returns Pydantic model directly @cocoindex.op.function(cache=True, behavior_version=1)\ndef extract_patient(pdf_content: bytes) -> Patient:\n """Extract patient information from PDF content."""\n\n # Convert PDF pages to DSPy Image objects\n pdf_doc = pymupdf.open(stream=pdf_content, filetype="pdf")\n\n form_images = []\n for page in pdf_doc:\n # Render page to pixmap (image) at 2x resolution for better quality\n pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2))\n # Convert to PNG bytes\n img_bytes = pix.tobytes("png")\n # Create DSPy Image from bytes\n form_images.append(dspy.Image(img_bytes))\n\n pdf_doc.close()\n\n # Extract patient information using DSPy with vision\n extractor = PatientExtractor()\n patient = extractor(form_images=form_images)\n\n return patient # type: ignore @cocoindex.op.function(cache=True, behavior_version=1)\ndef extract_patient(pdf_content: bytes) -> Patient:\n """Extract patient information from PDF content."""\n\n # Convert PDF pages to DSPy Image objects\n pdf_doc = pymupdf.open(stream=pdf_content, filetype="pdf")\n\n form_images = []\n for page in pdf_doc:\n # Render page to pixmap (image) at 2x resolution for better quality\n pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2))\n # Convert to PNG bytes\n img_bytes = pix.tobytes("png")\n # Create DSPy Image from bytes\n form_images.append(dspy.Image(img_bytes))\n\n pdf_doc.close()\n\n # Extract patient information using DSPy with vision\n extractor = PatientExtractor()\n patient = extractor(form_images=form_images)\n\n return patient # type: ignore This function is a CocoIndex function (decorated with @cocoindex.op.function) that takes PDF bytes as input and returns a fully structured Patient Pydantic object. CocoIndex function @cocoindex.op.function PDF bytes Patient cache=True allows repeated calls with the same PDF to reuse results.\nbehavior_version=1 ensures versioning of the function for reproducibility. cache=True allows repeated calls with the same PDF to reuse results. cache=True reuse results behavior_version=1 ensures versioning of the function for reproducibility. behavior_version=1 Create DSPy Image objects We open PDF from bytes using PyMuPDF (pymupdf), then we iterate over each page. PyMuPDF pymupdf Useful trick: Render the page as a high-resolution image (2x) for better OCR/vision performance.\nConvert the rendered page to PNG bytes.\nWrap the PNG bytes in a DSPy Image object. Useful trick: Render the page as a high-resolution image (2x) for better OCR/vision performance. high-resolution image 2x Convert the rendered page to PNG bytes. PNG bytes Wrap the PNG bytes in a DSPy Image object. DSPy Image DSPy Extraction The list of form_images is passed to the DSPy module: form_images ChainOfThought reasoning interprets each image.\nVision + NLP extract relevant text fields.\nPopulate Pydantic Patient object with structured patient info. ChainOfThought reasoning interprets each image. ChainOfThought reasoning Vision + NLP extract relevant text fields. Vision + NLP Populate Pydantic Patient object with structured patient info. Populate Pydantic Patient CocoIndex Flow Loads PDFs from local directory as binary\nFor each document, applies single transform: PDF bytes → Patient data\nExports the results in a PostgreSQL table Loads PDFs from local directory as binary For each document, applies single transform: PDF bytes → Patient data Exports the results in a PostgreSQL table Declare Flow Declare a CocoIndex flow, connect to the source, add a data collector to collect processed data. @cocoindex.flow_def(name="PatientIntakeExtractionDSPy")\ndef patient_intake_extraction_dspy_flow(\n flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope\n) -> None:\n data_scope["documents"] = flow_builder.add_source(\n cocoindex.sources.LocalFile(path="data/patient_forms", binary=True)\n )\n\n patients_index = data_scope.add_collector() @cocoindex.flow_def(name="PatientIntakeExtractionDSPy")\ndef patient_intake_extraction_dspy_flow(\n flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope\n) -> None:\n data_scope["documents"] = flow_builder.add_source(\n cocoindex.sources.LocalFile(path="data/patient_forms", binary=True)\n )\n\n patients_index = data_scope.add_collector() @cocoindex.flow_def tells CocoIndex that this function is a flow definition, not regular runtime code.\nadd_source() registers a LocalFile source that traverses data/patient_forms directory and creates a logical table named documents @cocoindex.flow_def tells CocoIndex that this function is a flow definition, not regular runtime code. @cocoindex.flow_def add_source() registers a LocalFile source that traverses data/patient_forms directory and creates a logical table named documents add_source() LocalFile data/patient_forms documents You can connect to various sources, or even custom source with CocoIndex if native connectors are not available. CocoIndex is designed to keep your indexes synchronized with your data sources. This is achieved through a feature called live updates, which automatically detects changes in your sources and updates your indexes accordingly. This ensures that your search results and data analysis are always based on the most current information. You can read more here https://cocoindex.io/docs/tutorials/live_updates live updates https://cocoindex.io/docs/tutorials/live_updates Process documents with data_scope["documents"].row() as doc:\n # Extract patient information directly from PDF using DSPy with vision\n # (PDF->Image conversion happens inside the extractor)\n doc["patient_info"] = doc["content"].transform(extract_patient)\n\n # Collect the extracted patient information\n patients_index.collect(\n filename=doc["filename"],\n patient_info=doc["patient_info"],\n ) with data_scope["documents"].row() as doc:\n # Extract patient information directly from PDF using DSPy with vision\n # (PDF->Image conversion happens inside the extractor)\n doc["patient_info"] = doc["content"].transform(extract_patient)\n\n # Collect the extracted patient information\n patients_index.collect(\n filename=doc["filename"],\n patient_info=doc["patient_info"],\n ) This iterates over each document. We transform doc["content"] (the bytes) by our extract_patient function. The result is stored in a new field patient_info. doc["content"] extract_patient patient_info Then we collect a row with the filename and extracted patient_info. filename patient_info Export to Postgres patients_index.export(\n "patients",\n cocoindex.storages.Postgres(table_name="patients_info_dspy"),\n primary_key_fields=["filename"],\n) patients_index.export(\n "patients",\n cocoindex.storages.Postgres(table_name="patients_info_dspy"),\n primary_key_fields=["filename"],\n) We export the collected index to Postgres. This will create/maintain a table patients keyed by filename, automatically deleting or updating rows if inputs change. patients Because CocoIndex tracks data lineage, it will handle updates/deletions of source files incrementally. Configure CocoIndex settings Define a CocoIndex settings function that configures the AI model for DSPy: CocoIndex settings function @cocoindex.settings\ndef cocoindex_settings() -> cocoindex.Settings:\n # Configure the model used in DSPy\n lm = dspy.LM("gemini/gemini-2.5-flash")\n dspy.configure(lm=lm)\n\n return cocoindex.Settings.from_env() @cocoindex.settings\ndef cocoindex_settings() -> cocoindex.Settings:\n # Configure the model used in DSPy\n lm = dspy.LM("gemini/gemini-2.5-flash")\n dspy.configure(lm=lm)\n\n return cocoindex.Settings.from_env() It returns a cocoindex.Settings object initialized from environment variables, enabling the system to use the configured model and environment settings for all DSPy operations. cocoindex.Settings Running the Pipeline Update the index: cocoindex update main cocoindex update main CocoInsight I used CocoInsight (Free beta now) to troubleshoot the index generation and understand the data lineage of the pipeline. It just connects to your local CocoIndex server, with zero pipeline data retention. cocoindex server -ci main cocoindex server -ci main Scalable open ecosystem (not a closed stack) CocoIndex is designed to be composable by default. It provides a high-performance, incremental execution engine and a minimal flow abstraction, without coupling users to a specific model provider, vector database, transformation framework, or orchestration system. composable by default All core concepts—sources, operators, and storage targets—are expressed as pluggable interfaces rather than proprietary primitives. A flow can ingest data from local files, object storage (e.g. S3), APIs, or custom sources; apply arbitrary transformation logic (SQL, DSPy modules, generated parsers, or general Python); and materialize results into relational databases, vector stores, search indexes, or custom sinks via the storage layer. sources, operators, and storage targets This design keeps CocoIndex opinionated about execution semantics (incrementality, caching, lineage), but intentionally unopinionated about modeling choices. execution semantics modeling choices Why DSPy + CocoIndex aligns with this model DSPy is itself a compositional programming framework for LLMs. Developers define typed Signatures and reusable Modules, and DSPy learns how to implement those contracts using the underlying model. This makes the LLM layer explicit, testable, and optimizable, rather than embedded in opaque prompt strings. compositional programming framework for LLMs Signatures Modules CocoIndex treats DSPy modules as first-class operators in a dataflow graph. This creates a clean separation of responsibilities: first-class operators DSPy defines what an LLM computation must produce and how it can be optimized\nCocoIndex defines when that computation runs, how results are cached, how failures are retried, and how outputs propagate to downstream targets as inputs change DSPy defines what an LLM computation must produce and how it can be optimized DSPy what CocoIndex defines when that computation runs, how results are cached, how failures are retried, and how outputs propagate to downstream targets as inputs change CocoIndex when Neither system attempts to own the full stack. CocoIndex does not prescribe a prompt or LLM framework, and DSPy does not provide a pipeline engine. Instead, they interlock: DSPy modules become composable, typed transformation nodes inside CocoIndex flows, while CocoIndex supplies production guarantees such as batching, retries, incremental recomputation, and live updates over evolving PDFs, codebases, or APIs. Support CocoIndex ❤️ If this example was helpful, the easiest way to support CocoIndex is to give the project a ⭐ on GitHub. give the project a ⭐ on GitHub Your stars help us grow the community, stay motivated, and keep shipping better tools for real-time data ingestion and transformation.