The library to be introduced - - is a Kafka Streams State Store implementation that persists data to . Java thriving-dev/kafka-streams-cassandra-state-store Apache Cassandra It's a 'drop-in' replacement for the official Kafka Streams state store solutions, notably (default) and . RocksDB InMemory By moving the state to an datastore the (from a deployment point of view) - which greatly . external stateful streams app effectively becomes stateless improves elasticity, and reduces rebalancing downtimes & failure recovery Cassandra/ScyllaDB is horizontally scalable and allows for which provides a boost to your existing Kafka Streams application with very little change to the source code. huge amounts of data In addition to the this post will also cover all out-of-the-box state store solutions, and explain individual characteristics, benefits, drawbacks, and limitations in detail. CassandraKeyValueStore ⏯️ Following the introduction and getting started guide, there's also a available. If you don't want to wait, feel free to head over to the . demo Thriving.dev YouTube Channel The first public release was on 9 January 2023. When writing this blog post the latest version was: - available on ! 0.4.0 Maven Central Table of contents Basics Recap Purpose Usage Example Under the Hood Known Limitations Next Steps Conclusion Basics Recap (Feel free to skip straight to the next section if you're already familiar with Kafka Streams and Apache Cassandra…) Kafka Streams Quoting : Apache Kafka - Wikipedia “Kafka Streams (or Streams API) is a stream-processing library written in Java. It was added in the Kafka 0.10.0.0 release. The library allows for the development of stateful stream-processing applications that are scalable, elastic, and fully fault-tolerant. The main API is a stream-processing domain-specific language (DSL) that offers high-level operators like filter, map, grouping, windowing, aggregation, joins, and the notion of tables. Additionally, the Processor API can be used to implement custom operators for a more low-level development approach. The DSL and Processor API can be mixed, too. For stateful stream processing, Kafka Streams uses RocksDB to maintain local operator state. Because RocksDB can write to disk, the maintained state can be larger than available main memory. For fault-tolerance, all updates to local state stores are also written into a topic in the Kafka cluster. This allows recreating state by reading those topics and feed all data into RocksDB.” In case you are entirely new to Kafka Streams, I recommend getting started by reading some official materials provided by Confluent, e.g. Introduction Kafka Streams API Apache Cassandra Quoting : Apache Cassandra - Wikipedia “Apache Cassandra is a free and open-source, distributed, wide-column store, NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.” Purpose While Wikipedia’s summary (see above) only mentions RocksDB, Kafka Streams ships with following implementations: KeyValueStore org.apache.kafka.streams.state.internals.RocksDBStore org.apache.kafka.streams.state.internals.InMemoryKeyValueStore org.apache.kafka.streams.state.internals.MemoryLRUCache Let’s look at the traits of each store implementation in more detail… RocksDBStore RocksDB is the default state store for Kafka Streams. The is a key-value store based on (surprise!). State is flushed to disk, allowing the state to exceed the size of available memory. RocksDBStore persistent RocksDB Since the state is persisted to disk, it can be re-used and does not need to be restored (changelog topic replay) when the application instance comes up after a restart (e.g. following an upgrade, instance migration, or failure). The RocksDB state store provides good performance and is well configured out of the box, but might need to be tuned for certain use cases (which is no small feat and requires an understanding of RocksDB configuration). Writing to and reading from disk comes with I/O, for performance reasons buffering and caching patterns are in place. The record cache (on heap) is particularly useful for optimizing writes by reducing the number of updates to local state and changelog topics. The RocksDB block cache (off heap) optimizes reads. In a typical modern setup, stateful Kafka Streams applications run on Kubernetes as a with persistent state stores (RocksDB) on . StatefulSet PersistentVolumes InMemoryKeyValueStore The , as the name suggests, maintains state (RAM). InMemoryKeyValueStore in-memory One obvious benefit is that the pure in-memory stores come with good performance (operates in RAM…). Further, hosting and operating are simpler compared to RocksDB, since there is no requirement to provide and manage disks. Drawbacks to having the store in-memory are limitations in store size and increased infrastructure costs (RAM is more expensive than disk storage). Further, state always is lost on application restart and therefore first needs to be restored from changelog topics (recovery takes longer). When low rebalance downtimes / quick recovery is concerned, using standby replicas ( ) help to reduce recovery time. num.standby.replicas MemoryLRUCache ( ) Stores.lruMap The is an store based on . The term comes from the (least recently used) behavior combined with the cap (per streams task!). MemoryLRUCache in-memory HashMap cache LRU maxCacheSize It’s a rather uncommon choice but can be a valid fit for certain use cases. Same as the state always is lost on application restart and is restored from changelog topics. InMemoryKeyValueStore applies client-side only (in-memory HashMap, per streams task state store -> the least recently used entry is dropped when the underlying HashMap’s capacity is breached) but does not ‘cleanup’ the changelog topic (send ). The ( ) changelog topic keeps growing in size while the state available to processing is constrained by . maxCacheSize tombstones compacted maxCacheSize Therefore, it is recommended to use in combination with custom changelog topic config (also ) to have a time-based retention in place that satisfies your functional data requirements (if possible). cleanup.policy=[compact,delete] retention.ms The is applied per streams task (~input topic partitions), so take into consideration when calculating total capacity, memory requirements per app instance, … maxCacheSize CassandraKeyValueStore Now finally we get to the subject of this blog post, the custom implementation of a state store that persists data to Apache Cassandra. With data is in an external database -> Apache Cassandra <- *or compatible solutions (e.g. ScyllaDB). Apache Cassandra is a distributed, clustered data store that allows to scale horizontally to enable up to Petabytes of data, thus can be accommodated. CassandraKeyValueStore persistently stored very large Kafka Streams state Moving the state into an data store - outside the application so to say - allows you to effectively run the app in a fashion. Further, with logging disabled, there's -> required which enables fluent rebalancing and helps and . external stateless no changelog topic no state restore reduce rebalance downtimes recovery time This greatly of your application, which opens up more possibilities such as e.g. efficient & fluent autoscaling... improves the elasticity and scalability It can also help ease/avoid known problems with the 'Kafka Streams’ specific task assignment such as 'uneven load distribution' and 'idle consumers' (I'm thinking about writing a separate blog post on these issues...). Kafka Streams property allows to achieve low rebalance downtimes by telling the consumers to send a request to the group leader on graceful shutdown. internal.leave.group.on.close=true LeaveGroup For more information on such kafka internals I can recommend to watch+read following Confluent developer guide: . Consumer Group Protocol Note that this property is also used & explained in the . demo ⚠ Please be aware this is an in-official property (not part of the public API), thus can be deprecated or dropped any time. Adding an external, 3rd party Software to the heart (or rather stomach?) to your stream processing application, adds a new, additional to your architecture. single point of failure Usage Example Get it! The artifact is available on Maven Central: Maven <dependency> <groupId>dev.thriving.oss</groupId> <artifactId>kafka-streams-cassandra-state-store</artifactId> <version>${version}</version> </dependency> Gradle (Groovy DSL) implementation 'dev.thriving.oss:kafka-streams-cassandra-state-store:${version}’ Classes of this library are in the package . dev.thriving.oss.kafka.streams.cassandra.state.store Quick Start High-level DSL <> StoreSupplier When using the high-level DSL, i.e., , users create s that can be further customized via . StreamsBuilder StoreSupplier Materialized For example, a topic read as can be materialized into a Cassandra k/v store with custom key/value Serdes, with logging and caching disabled: KTable StreamsBuilder builder = new StreamsBuilder(); KTable<Long,String> table = builder.table( "topicName", Materialized.<Long,String>as( CassandraStores.builder(session, "store-name") .keyValueStore() ) .withKeySerde(Serdes.Long()) .withValueSerde(Serdes.String()) .withLoggingDisabled() .withCachingDisabled()); Processor API <> StoreBuilder When using the Processor API, i.e., , users create s that can be attached to s. Topology StoreBuilder Processor For example, you can create a Cassandra with custom Serdes, logging, and caching disabled like: KeyValueStore<String, Long> Topology topology = new Topology(); StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder( CassandraStores.builder(session, "store-name") .keyValueStore(), Serdes.String(), Serdes.Long()) .withLoggingDisabled() .withCachingDisabled(); topology.addStateStore(storeBuilder); Demo Features the notorious ( ), written as a , running in a fully . 'word-count example' ref quarkus application clustered docker-compose localstack Source code for this demo: kafka-streams-cassandra-state-store/examples/word-count-quarkus (at 0.4.0) https://www.youtube.com/watch?v=2Co9-8E-uJE&embedable=true Store Types kafka-streams-cassandra-state-store comes with 2 different store types: keyValueStore globalKeyValueStore keyValueStore (recommended default) A persistent . The underlying cassandra table is the store context . Therefore, all CRUD operations against this store always query by and return results for a single stream task. KeyValueStore<Bytes, byte[]> partitioned by task partition globalKeyValueStore A persistent . The underlying cassandra table uses the . Therefore, all CRUD operations against this store work from any streams task and therefore always are “global”. KeyValueStore<Bytes, byte[]> record key as sole /PRIMARY KEY/ Due to the nature of cassandra tables having a single PK (no clustering key), this store supports only a limited number of operations. If you're planning to use this store type, please make sure to get a full understanding of the specifics by reading the to understand its behaviour. relevant docs Advanced For more detailed documentation, please visit the GitHub project… Store types, supported operations Builder usage + config options Under the hood Implemented/compiled with Java 17 kafka-streams 3.4 datastax java-driver-core 4.15.0 Supported client-libs Kafka Streams 2.7.0+ (maybe even earlier versions, but wasn’t tested further back) Datastax java client (v4) 'com.datastax.oss:java-driver-core:4.15.0' ScyllaDB shard-aware datastax java client (v4) fork 'com.scylladb:java-driver-core:4.14.1.0' Supported databases Apache Cassandra 3.11 Apache Cassandra 4.0, 4.1 ScyllaDB (should work from 4.3+) Underlying CQL Schema keyValueStore Using defaults, for a state store named "word-count" the following CQL Schema applies: CREATE TABLE IF NOT EXISTS word_count_kstreams_store ( partition int, key blob, time timestamp, value blob, PRIMARY KEY ((partition), key) ) WITH compaction = { 'class' : 'LeveledCompactionStrategy' } globalKeyValueStore Using defaults, for a state store named "clicks-global" the following CQL Schema applies: CREATE TABLE IF NOT EXISTS clicks_global_kstreams_store ( key blob, time timestamp, value blob, PRIMARY KEY (key) ) WITH compaction = { 'class' : 'LeveledCompactionStrategy' } Feat: Cassandra table with default TTL Cassandra has a table option (default expiration time (“TTL”) in seconds for a table) which can be useful for certain use cases where data (state) expires after a known period. default_time_to_live ℹ️ that writes to Cassandra are made with system time. The table based on -> the time of the current record being processed ( ). Please note TTL is applied 'time of write' != stream time The can be defined via the builder method, e.g.: default_time_to_live withTableOptions CassandraStores.builder(session, "word-grouped-count") .withTableOptions(""" compaction = { 'class' : 'LeveledCompactionStrategy' } AND default_time_to_live = 86400 """) .keyValueStore() Cassandra table partitioning (avoiding large partitions) Kafka is persisting data in segments and is built for sequential r/w. As long as there’s sufficient disk storage space available to brokers, a high number of messages for a single topic partition is not a problem. Apache Cassandra on the other hand can get inefficient (up to severe failures such as load shedding, dropped messages, and crashed and downed nodes) when the partition size grows too large. The reason is that searching becomes too slow as the search within a partition is slow. Also, it puts a lot of pressure on the (JVM) heap. The community has offered a standard recommendation for Cassandra users to keep Partitions under 400MB, and preferably under 100MB. For the current implementation, the Cassandra table created for the ‘default’ key-value store is partitioned by the Kafka (“wide partition pattern”). partition key Please keep these issues in mind when working with relevant data volumes. In case you don’t need to query your store / only lookup by key (‘range’, ‘prefixScan’; ref ) it’s recommended to use rather than since it is partitioned by the (:= primary key). Supported operations by store type globalKeyValueStore keyValueStore event key References blog post on Wide Partitions in Apache Cassandra 3.11 in case anyone has funded knowledge if/how this has changed with Cassandra 4, please share in the comments below!! Note: stackoverflow question Known Limitations Adding additional infrastructure for data persistence external to Kafka comes with certain risks and constraints. Consistency Kafka Streams supports and processing guarantees. At-least-once semantics is enabled by default. at-least-once exactly-once Kafka Streams processing guarantees is using Kafka transactions. These transactions wrap the entirety of processing a message throughout your streams topology, including messages published to outbound topic(s), changelog topic(s), and consumer offsets topic(s). exactly-once This is possible through transactional interaction with a single distributed system (Apache Kafka). Bringing an external system (Cassandra) into play breaks this pattern. Once data is written to the database it can’t be rolled back in the event of a subsequent error / failure to complete the current message processing. => If you need strong consistency, have processing enabled (streams config: ), and/or your processing logic is not fully idempotent then using is discouraged! exactly-once processing.guarantee="exactly_once_v2" kafka-streams-cassandra-state-store ℹ️ Please note this is also the case when using kafka-streams with the native state stores (RocksDB/InMemory) with processing.guarantee (default). at-least-once For more information on Kafka Streams processing guarantees, check the sources referenced below. https://medium.com/lydtech-consulting/kafka-streams-transactions-exactly-once-messaging-82194b50900a https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#processing-guarantee https://docs.confluent.io/platform/current/streams/concepts.html#processing-guarantees Incomplete Implementation of Interfaces StateStore For now, only is supported (vs. e.g. / ). Also, not all methods have been implemented. KeyValueStore WindowStore SessionStore Next Steps Here are some of the tasks (high level) in the current backlog: Features Implement (coming soon with Kafka 3.5.0 release) KIP-889: Versioned State Stores Add a simple (optional) InMemory read cache -> ? Caffeine Support / WindowStore SessionStore Non-functional Benchmark Add metrics Ops GitHub actions to release + publish to maven central (snapshot / releases) Add Renovate Conclusion It's been a fun journey so far, starting from an initial POC to a working library published to maven central - though still to be considered 'experimental', since it's not been production-tested yet. The out-of-the-box state stores satisfy most requirements, no need to switch without necessity. Still, it's a usable piece of software that may fill a gap for specific requirements. I'm looking forward to working on the next steps such as benchmarking/ . load testing Feedback is very welcome, also, if you are planning to, or have decided to use the library in a project, please leave a comment below. Footnotes At the time of writing this blog post, the latest versions of relevant libs were: Kafka / Streams API: 3.4.0 Cassandra java-driver-core: 4.15.0 kafka-streams-cassandra-state-store: 0.4.0 References 4. Stateful Processing - Mastering Kafka Streams and ksqlDB Book Kafka Streams Memory Management | Confluent Documentation Also published . here