CocoIndex is designed to be production-ready from day one—built to process data in parallel, maximizing throughput while keeping your systems safe. Today, we’ll look at how to optimize performance without overloading your environment. With CocoIndex, it’s just one configuration away. CocoIndex production-ready from day one optimize performance without overloading your environment one configuration away 🌟 Star CocoIndex if you like it - https://github.com/cocoindex-io/cocoindex https://github.com/cocoindex-io/cocoindex CocoIndex’s parallelism model boosts speed by processing multiple data items at once, but more parallelism isn’t always better. Left unconstrained, excessive concurrency can strain—or even destabilize—your systems. That’s why CocoIndex includes built-in concurrency control mechanisms that strike the right balance between raw performance and system stability, even at massive scale. more parallelism isn’t always better built-in concurrency control mechanisms raw performance system stability Processing too many items simultaneously can cause: Memory exhaustion – large datasets loaded at once consume massive amounts of RAM. Resource contention – CPU, disk I/O, and network bandwidth get overwhelmed by competing operations. System instability – timeouts, degraded performance, or outright crashes. Memory exhaustion – large datasets loaded at once consume massive amounts of RAM. Memory exhaustion Resource contention – CPU, disk I/O, and network bandwidth get overwhelmed by competing operations. Resource contention System instability – timeouts, degraded performance, or outright crashes. System instability Unlike generic concurrency features, CocoIndex lets you: Constrain both data volume (rows) and memory usage (bytes). Set limits at multiple layers: global, per source, and per-row iteration. Combine controls: all specified constraints must be satisfied before processing proceeds. Constrain both data volume (rows) and memory usage (bytes). Set limits at multiple layers: global, per source, and per-row iteration. Combine controls: all specified constraints must be satisfied before processing proceeds. all This layered approach ensures that resource-heavy sources don’t overwhelm the system, and nested tasks (such as splitting documents into chunks) remain predictable and safe. You can review the full documentation here. CocoIndex is powering users’ processes at the scale of millions in production. here Concurrency Options CocoIndex provides two primary settings: Option Purpose Unit max_inflight_rows Maximum number of rows processed concurrently. rows max_inflight_bytes Maximum memory footprint of concurrently processed data (before transformations). bytes Option Purpose Unit max_inflight_rows Maximum number of rows processed concurrently. rows max_inflight_bytes Maximum memory footprint of concurrently processed data (before transformations). bytes Option Purpose Unit Option Option Purpose Purpose Unit Unit max_inflight_rows Maximum number of rows processed concurrently. rows max_inflight_rows max_inflight_rows max_inflight_rows Maximum number of rows processed concurrently. Maximum number of rows processed concurrently. rows rows max_inflight_bytes Maximum memory footprint of concurrently processed data (before transformations). bytes max_inflight_bytes max_inflight_bytes max_inflight_bytes Maximum memory footprint of concurrently processed data (before transformations). Maximum memory footprint of concurrently processed data (before transformations). bytes bytes When a limit is reached, CocoIndex pauses new processing until some existing work completes. This keeps throughput high without pushing your system past its limits. pauses new processing For simplicity, max_inflight_bytes only measures the size of data already in memory before any transformations—it does not include temporary memory used during processing steps. For simplicity, max_inflight_bytes only measures the size of data already in memory before any transformations—it does not include temporary memory used during processing steps. max_inflight_bytes not Where to Apply Concurrency Controls 1. Source Level Controls how many rows from a data source are processed simultaneously. This prevents overwhelming your system when ingesting large datasets. Source level control happens at two different granularities Global, in which all sources across all indexing flows share the same budget. Per-source, in which each source has its own budget. Global, in which all sources across all indexing flows share the same budget. Per-source, in which each source has its own budget. Both global and per-source limits must pass before a new row is processed—providing two layers of safety. global per-source Global Concurrency: One Setting to Shield All Flows Global limits ensure your system never overshoots safe operating thresholds, even if individual flows attempt higher concurrency. Apply system-wide protections either via environment variables or programmatic control: The easiest way is to control it via environment variables: COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS=256 COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES=1048576 COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS=256 COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES=1048576 Programmatically, configure it when calling cocoindex.init(), which will take precedence over the environment variable: cocoindex.init() from cocoindex import GlobalExecutionOptions cocoindex.init( cocoindex.Settings( ..., global_execution_options = GlobalExecutionOptions( source_max_inflight_rows=256, source_max_inflight_bytes=1_048_576 ) ) ) from cocoindex import GlobalExecutionOptions cocoindex.init( cocoindex.Settings( ..., global_execution_options = GlobalExecutionOptions( source_max_inflight_rows=256, source_max_inflight_bytes=1_048_576 ) ) ) Currently, CocoIndex uses 1024 as the default value of global max inflight rows, if you don't explicitly set it. Per-Source Concurrency: Granular Customization Set different limits for each source according to workload and data characteristics: @cocoindex.flow_def(name="DemoFlow") def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): data_scope["documents"] = flow_builder.add_source( DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024 # 100 MB ) @cocoindex.flow_def(name="DemoFlow") def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): data_scope["documents"] = flow_builder.add_source( DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024 # 100 MB ) 2. Nested Iteration Level Concurrency: Deep Structural Control When processing nested rows, such as processing each chunk of each document, you can configure the maximum concurrent rows and/or bytes: with data_scope["documents"].row() as doc: doc["chunks"] = doc["content"].transform(SplitRecursively(...)) with doc["chunks"].row(max_inflight_rows=100, max_inflight_bytes=100*1000*1000): # Process up to 100 chunks in parallel per document ... with data_scope["documents"].row() as doc: doc["chunks"] = doc["content"].transform(SplitRecursively(...)) with doc["chunks"].row(max_inflight_rows=100, max_inflight_bytes=100*1000*1000): # Process up to 100 chunks in parallel per document ... Summary Table: Concurrency Configuration in CocoIndex Level Configuration Path Applies To Global Environment variables, or pass GlobalExecutionOptions to cocoindex.init() All sources, all flows, added together Per-Source Arguments to FlowBuilder.add_source() Specific source/flow Row Iteration Arguments to DataSlice.row(max_inflight_rows=...) Nested iterations Level Configuration Path Applies To Global Environment variables, or pass GlobalExecutionOptions to cocoindex.init() All sources, all flows, added together Per-Source Arguments to FlowBuilder.add_source() Specific source/flow Row Iteration Arguments to DataSlice.row(max_inflight_rows=...) Nested iterations Level Configuration Path Applies To Level Level Configuration Path Configuration Path Applies To Applies To Global Environment variables, or pass GlobalExecutionOptions to cocoindex.init() All sources, all flows, added together Global Global Environment variables, or pass GlobalExecutionOptions to cocoindex.init() Environment variables, or pass GlobalExecutionOptions to cocoindex.init() GlobalExecutionOptions cocoindex.init() All sources, all flows, added together All sources, all flows, added together Per-Source Arguments to FlowBuilder.add_source() Specific source/flow Per-Source Per-Source Arguments to FlowBuilder.add_source() Arguments to FlowBuilder.add_source() FlowBuilder.add_source() Specific source/flow Specific source/flow Row Iteration Arguments to DataSlice.row(max_inflight_rows=...) Nested iterations Row Iteration Row Iteration Arguments to DataSlice.row(max_inflight_rows=...) Arguments to DataSlice.row(max_inflight_rows=...) row(max_inflight_rows=...) Nested iterations Nested iterations Best PracticesExpand commentComment on line R138ResolvedCode has comments. Press enter to view. In actual incremental pipelines, the processing bottleneck is usually at a few heavy operations, such as running inference using an AI model locally or via a remote API. It's common to keep more data in memory even if it cannot be processed immediately—in this way, once the busy backend becomes available, new workloads can be taken on right away to keep the backends busy. However, we need a reasonable bound on this to prevent memory exhaustion and similar issues. That's where concurrency control comes in. In many cases, the default global source max rows limit (1024) is already sufficient. It fits the situation described above: loading more than what heavy operations can consume, but still within a reasonable bound. You don't need to do anything. You can adjust the global source row limit if the default does not work perfectly. For example, if you observe memory overuse or timeouts in certain operations, reduce the limit; on the other hand, if the system is already stable but you want it to run faster, increase the limit to see if it helps. If the distribution of your input data size varies greatly (e.g., it follows a long-tail distribution rather than a normal distribution), setting the max bytes limit can help prevent a small number of abnormally large inputs from overloading the system. If you want to run multiple flows within the same process, or have multiple sources within the same flow, and they vary in processing complexity (e.g., one source goes through a very heavy and slow model, and another only does simple data movement), you may leverage per-source control to more strictly manage the heavy path. If you have a high number of nested rows to process, and the specific number varies significantly, tweak concurrency control options on nested iterations. In many cases, the default global source max rows limit (1024) is already sufficient. It fits the situation described above: loading more than what heavy operations can consume, but still within a reasonable bound. You don't need to do anything. You can adjust the global source row limit if the default does not work perfectly. For example, if you observe memory overuse or timeouts in certain operations, reduce the limit; on the other hand, if the system is already stable but you want it to run faster, increase the limit to see if it helps. If the distribution of your input data size varies greatly (e.g., it follows a long-tail distribution rather than a normal distribution), setting the max bytes limit can help prevent a small number of abnormally large inputs from overloading the system. If you want to run multiple flows within the same process, or have multiple sources within the same flow, and they vary in processing complexity (e.g., one source goes through a very heavy and slow model, and another only does simple data movement), you may leverage per-source control to more strictly manage the heavy path. If you have a high number of nested rows to process, and the specific number varies significantly, tweak concurrency control options on nested iterations. This concurrency control framework gives you safe, scalable, and customizable flow performance. You gain flexibility (configure per-flow), control (set global limits), and the confidence to scale cocoindex flows smoothly across diverse workloads. safe, scalable, and customizable flow performance Support us We’re constantly improving our runtime. Please ⭐ star CocoIndex on GitHub and share it with others. CocoIndex on GitHub Need help crafting a more detailed code snippet, or insight into using byte-based or default concurrency settings? Just let me know!