Bring your own destination. Export anywhere. Bring your own destination. Export anywhere. We’re excited to announce that CocoIndex now officially supports custom targets — giving you the power to export data to any destination, whether it's a local file, cloud storage, a REST API, or your own bespoke system. CocoIndex now officially supports custom targets This new capability unlocks a whole new level of flexibility for integrating CocoIndex into your pipelines and allows you to bring your own “LEGO bricks” into our flow model. Huge thanks to the community for the suggestions and early feedback — your ideas shaped this! Pipelines as LEGO: Plug, Play, and Compose CocoIndex was built with a clear vision: Data pipelines should be like LEGO — composable, interchangeable, and fun to build. Data pipelines should be like LEGO — composable, interchangeable, and fun to build. That’s why CocoIndex comes with native built-ins for common sources, targets, and transformations. Whether you're pulling from a local directory, S3, or a SQL table — or exporting to cloud storage or a vector DB — the interface stays consistent and declarative. native built-ins In most cases, it’s a one-line code switch to swap between components. For example: one-line code switch Source: flow_builder.add_source(cocoindex.sources.S3(...)) # 1-line switch to LocalFile flow_builder.add_source(cocoindex.sources.LocalFile(...)) flow_builder.add_source(cocoindex.sources.S3(...)) # 1-line switch to LocalFile flow_builder.add_source(cocoindex.sources.LocalFile(...)) Targets: doc_embeddings.export( "doc_embeddings", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) # 1-line switch to Qdrant doc_embeddings.export( "doc_embeddings", cocoindex.targets.Qdrant(collection_name=QDRANT_COLLECTION), primary_key_fields=["id"], ) doc_embeddings.export( "doc_embeddings", cocoindex.targets.Postgres(), primary_key_fields=["id"], ) # 1-line switch to Qdrant doc_embeddings.export( "doc_embeddings", cocoindex.targets.Qdrant(collection_name=QDRANT_COLLECTION), primary_key_fields=["id"], ) This standardized interface not only speeds up iteration, but also encourages clean, modular, and reusable flow definitions. But we also know that not every system is the same. That’s why CocoIndex supports “bring your own LEGO” — allowing you to add your own custom source, target, or transformation and have it integrate seamlessly into the ecosystem. “bring your own LEGO” With custom targets, you can now write your own export logic and still benefit from CocoIndex’s flow control, tracking, and incremental updates. 🚀 What’s a Custom Target? A custom target allows CocoIndex flows to export data wherever you want — beyond built-in connectors like S3 or databases. You define two things: custom target Target Spec – how to configure the target (such as setting a file path or API key). Target Connector – how to write data to that target (the logic). Target Spec – how to configure the target (such as setting a file path or API key). Target Spec Target Connector – how to write data to that target (the logic). Target Connector You can think of this as plugging in your own target with a few lines of Python. 1. Target Spec This defines the configuration needed to use your custom target: class CustomTarget(cocoindex.op.TargetSpec): param1: str param2: int | None = None class CustomTarget(cocoindex.op.TargetSpec): param1: str param2: int | None = None It's similar to a dataclass. Simply declare the parameters you need, and CocoIndex takes care of the rest. 2. Target Connector This is where you implement the logic for exporting data. A target connector manages the actual data export operations for your custom target. It defines how data should be written to your target. Target connectors implement two categories of methods: Setup methods for managing target infrastructure (similar to DDL operations in databases), and Data methods for handling specific data operations (similar to DML operations). Setup methods for managing target infrastructure (similar to DDL operations in databases), and Setup methods Data methods for handling specific data operations (similar to DML operations). Data methods @cocoindex.op.target_connector(spec_cls=CustomTarget) class CustomTargetConnector: # Setup methods @staticmethod def get_persistent_key(spec: CustomTarget, target_name: str) -> PersistentKey: """Required. Return a persistent key that uniquely identifies this target instance.""" ... @staticmethod def apply_setup_change( key: PersistentKey, previous: CustomTarget | None, current: CustomTarget | None ) -> None: """Required. Apply setup changes to the target.""" ... # Data methods @staticmethod def mutate( *all_mutations: tuple[PreparedCustomTarget, dict[DataKeyType, DataValueType | None]], ) -> None: """Required. Apply fdata mutations to the target.""" ... @cocoindex.op.target_connector(spec_cls=CustomTarget) class CustomTargetConnector: # Setup methods @staticmethod def get_persistent_key(spec: CustomTarget, target_name: str) -> PersistentKey: """Required. Return a persistent key that uniquely identifies this target instance.""" ... @staticmethod def apply_setup_change( key: PersistentKey, previous: CustomTarget | None, current: CustomTarget | None ) -> None: """Required. Apply setup changes to the target.""" ... # Data methods @staticmethod def mutate( *all_mutations: tuple[PreparedCustomTarget, dict[DataKeyType, DataValueType | None]], ) -> None: """Required. Apply fdata mutations to the target.""" ... You can also implement optional methods like prepare() and describe() to handle environment preparation, validation, or logging. prepare() describe() For more detailed documentation and best practices, please refer to this documentation: https://cocoindex.io/docs/custom_ops/custom_targets https://cocoindex.io/docs/custom_ops/custom_targets ✨ Example: Export Markdown Files to Local HTML Let’s walk through a simple example—exporting .md files as .html using a custom file-based target. This project monitors folder changes and continuously converts markdown to HTML incrementally. Check out the full source code. .md .html source code The overall flow is simple: This example focuses on how to configure your custom target the flow effortless picks up the changes in the source, recomputes only what's changed and export to the target how to configure your custom target the flow effortless picks up the changes in the source, recomputes only what's changed and export to the target Step 1: Ingest files Ingest a list of markdown files: @cocoindex.flow_def(name="CustomOutputFiles") def custom_output_files( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: """ Define an example flow that exports markdown files to HTML files. """ data_scope["documents"] = flow_builder.add_source( cocoindex.sources.LocalFile(path="data", included_patterns=["*.md"]), refresh_interval=timedelta(seconds=5), ) @cocoindex.flow_def(name="CustomOutputFiles") def custom_output_files( flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope ) -> None: """ Define an example flow that exports markdown files to HTML files. """ data_scope["documents"] = flow_builder.add_source( cocoindex.sources.LocalFile(path="data", included_patterns=["*.md"]), refresh_interval=timedelta(seconds=5), ) This ingestion creates a table with filename and content fields. filename content Step 2: Process each file and collect Define custom function that converts markdown to HTML @cocoindex.op.function() def markdown_to_html(text: str) -> str: return _markdown_it.render(text) @cocoindex.op.function() def markdown_to_html(text: str) -> str: return _markdown_it.render(text) Define data collector and transform each document to html. output_html = data_scope.add_collector() with data_scope["documents"].row() as doc: doc["html"] = doc["content"].transform(markdown_to_html) output_html.collect(filename=doc["filename"], html=doc["html"]) output_html = data_scope.add_collector() with data_scope["documents"].row() as doc: doc["html"] = doc["content"].transform(markdown_to_html) output_html.collect(filename=doc["filename"], html=doc["html"]) Step 3: Define the custom target 3.1 Define the target spec The target spec contains a directory for output files: class LocalFileTarget(cocoindex.op.TargetSpec): directory: str class LocalFileTarget(cocoindex.op.TargetSpec): directory: str 3.2: Implement the connector get_persistent_key() defines the persistent key, which uniquely identifies the target for change tracking and incremental updates. Here, we simply use the target directory as the key (e.g., ./data/output). get_persistent_key() ./data/output @cocoindex.op.target_connector(spec_cls=LocalFileTarget) class LocalFileTargetConnector: @staticmethod def get_persistent_key(spec: LocalFileTarget, target_name: str) -> str: """Use the directory path as the persistent key for this target.""" return spec.directory @cocoindex.op.target_connector(spec_cls=LocalFileTarget) class LocalFileTargetConnector: @staticmethod def get_persistent_key(spec: LocalFileTarget, target_name: str) -> str: """Use the directory path as the persistent key for this target.""" return spec.directory The describe() method returns a human-readable string that describes the target, which is displayed in the CLI logs. For example, it prints: describe() Target: Local directory ./data/output Target: Local directory ./data/output @staticmethod def describe(key: str) -> str: """(Optional) Return a human-readable description of the target.""" return f"Local directory {key}" @staticmethod def describe(key: str) -> str: """(Optional) Return a human-readable description of the target.""" return f"Local directory {key}" apply_setup_change() applies setup changes to the backend. The previous and current specs are passed as arguments, and the method is expected to update the backend setup to match the current state. apply_setup_change() A None spec indicates non-existence, so when previous is None, we need to create it, and when current is None, we need to delete it. None previous None current None @staticmethod def apply_setup_change( key: str, previous: LocalFileTarget | None, current: LocalFileTarget | None ) -> None: """ Apply setup changes to the target. Best practice: keep all actions idempotent. """ # Create the directory if it didn't exist. if previous is None and current is not None: os.makedirs(current.directory, exist_ok=True) # Delete the directory with its contents if it no longer exists. if previous is not None and current is None: if os.path.isdir(previous.directory): for filename in os.listdir(previous.directory): if filename.endswith(".html"): os.remove(os.path.join(previous.directory, filename)) os.rmdir(previous.directory) @staticmethod def apply_setup_change( key: str, previous: LocalFileTarget | None, current: LocalFileTarget | None ) -> None: """ Apply setup changes to the target. Best practice: keep all actions idempotent. """ # Create the directory if it didn't exist. if previous is None and current is not None: os.makedirs(current.directory, exist_ok=True) # Delete the directory with its contents if it no longer exists. if previous is not None and current is None: if os.path.isdir(previous.directory): for filename in os.listdir(previous.directory): if filename.endswith(".html"): os.remove(os.path.join(previous.directory, filename)) os.rmdir(previous.directory) The mutate() method is called by CocoIndex to apply data changes to the target, batching mutations to potentially multiple targets of the same type. This allows the target connector flexibility in implementation (e.g., atomic commits, or processing items with dependencies in a specific order). mutate() Each element in the batch corresponds to a specific target and is represented by a tuple containing: the target specification all mutations for the target, represented by a dict mapping primary keys to value fields. Value fields can be represented by a dataclass—LocalFileTargetValues in this case: the target specification all mutations for the target, represented by a dict mapping primary keys to value fields. Value fields can be represented by a dataclass—LocalFileTargetValues in this case: dict LocalFileTargetValues @dataclasses.dataclass class LocalFileTargetValues: """Represents value fields of exported data. Used in `mutate` method below.""" html: str @dataclasses.dataclass class LocalFileTargetValues: """Represents value fields of exported data. Used in `mutate` method below.""" html: str The value type of the dict is LocalFileTargetValues | None, where a non-None value means an upsert and None value means a delete. Similar to apply_setup_changes(), idempotency is expected here. dict LocalFileTargetValues | None None None apply_setup_changes() @staticmethod def mutate( *all_mutations: tuple[LocalFileTarget, dict[str, LocalFileTargetValues | None]], ) -> None: """ Mutate the target. """ for spec, mutations in all_mutations: for filename, mutation in mutations.items(): full_path = os.path.join(spec.directory, filename) + ".html" if mutation is None: # Delete the file try: os.remove(full_path) except FileNotFoundError: pass else: # Create/update the file with open(full_path, "w") as f: f.write(mutation.html) @staticmethod def mutate( *all_mutations: tuple[LocalFileTarget, dict[str, LocalFileTargetValues | None]], ) -> None: """ Mutate the target. """ for spec, mutations in all_mutations: for filename, mutation in mutations.items(): full_path = os.path.join(spec.directory, filename) + ".html" if mutation is None: # Delete the file try: os.remove(full_path) except FileNotFoundError: pass else: # Create/update the file with open(full_path, "w") as f: f.write(mutation.html) 3.3: Use it in the Flow output_html.export( "OutputHtml", LocalFileTarget(directory="output_html"), primary_key_fields=["filename"], ) output_html.export( "OutputHtml", LocalFileTarget(directory="output_html"), primary_key_fields=["filename"], ) Run the example Once your pipeline is set up, keeping your knowledge graph updated is simple: pip install -e . cocoindex update --setup main.py pip install -e . cocoindex update --setup main.py You can add, modify, or remove files in the data/ directory — CocoIndex will only reprocess the changed files and update the target accordingly. data/ For real-time updates, run in live mode: real-time updates cocoindex update --setup -L main.py cocoindex update --setup -L main.py This keeps your knowledge graph continuously synchronized with your document source — perfect for fast-changing environments like internal wikis or technical documentation. Best Practices Idempotency matters: apply_setup_change() and mutate() should be safe to run multiple times without unintended effects. Prepare once, mutate many: If you need setup (such as establishing a connection), use prepare() to avoid repeating work. Use structured types: For primary keys or values, CocoIndex supports simple types as well as dataclasses and NamedTuples. Idempotency matters: apply_setup_change() and mutate() should be safe to run multiple times without unintended effects. Idempotency matters apply_setup_change() mutate() Prepare once, mutate many: If you need setup (such as establishing a connection), use prepare() to avoid repeating work. Prepare once, mutate many prepare() Use structured types: For primary keys or values, CocoIndex supports simple types as well as dataclasses and NamedTuples. Use structured types Support us: We’re constantly adding more examples and improving our runtime. If you found this helpful, please ⭐ star CocoIndex on GitHub and share it with others. CocoIndex on GitHub Suggestions for more native ‘LEGO’ pieces? Just let us know! We are moving full speed ahead to support you!