is one of the most versatile data streaming open-source solution that exists. It supports all the primary functions of a typical batch processing system such as SQL, Connectors to Hive, Group By, etc. while providing fault-tolerance and exactly-once semantics. Hence, you can create a multitude of push-based applications using it. Apache Flink However, one of the significant drawbacks of the Apache Flink has been the inability to modify the checkpointed state of the program. Let’s first see what I mean by that. Checkpointing Flink provides fault-tolerance by using a mechanism called checkpointing. It periodically takes a snapshot of all the stateful operators/functions of your programs and stores them in a highly durable store such as HDFS. Checkpointing allows the Flink program to resume from this snapshot. This is helpful in the cases of failures due to some error such as a simple exception not handled or a loss of data node in your YARN/Mesos/k8s cluster. This snapshot is stored in a binary format only understood by Flink, which makes it difficult to modify the state before restart. Why would you need to modify the data? There can be multiple cases where you might need only partial data from the checkpoint and may want to update the other. An example job being Reading numerical data from one Kafka topicAggregate over a window of 1 hourClassify using some statistical thresholds provided by config stored in your operator’s state. { { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); Properties kafkaConsumerProperties = Properties(); kafkaConsumerProperties.setProperty( , ); kafkaConsumerProperties.setProperty( , ); ObjectMapper objectMapper = ObjectMapper(); FlinkKafkaConsumer010<String> kafkaConsumer010 = FlinkKafkaConsumer010<>( , SimpleStringSchema(), kafkaConsumerProperties); DataStream<String> kafkaSource = env.addSource(kafkaConsumer010).name( ).uid( ); DataStream<TestData> aggregatedStream = kafkaSource .map(row -> objectMapper.readValue(row, TestData.class)) .keyBy(TestData::getKey) .timeWindow(Time.hours( )) .reduce((rowA, rowB) -> { TestData result = TestData(); result.setKey(rowA.getKey()); result.setValue(rowA.getValue() + rowB.getValue()); result.setCreatedAt(System.currentTimeMillis()); result; }).name( ).uid( ); DataStream<LabeledTestData> labeledTestDataDataStream = aggregatedStream.keyBy(TestData::getKey).flatMap( ClassifyData()).name( ).uid( ); labeledTestDataDataStream.map(row -> objectMapper.writeValueAsString(row)).print(); env.execute(); } } public class TestCheckpointJob Exception public static void main (String[] args) throws final new "bootstrap.servers" "localhost:9092" "group.id" "test_group_id" new new "test_topic" new "kafka_source" "kafka_source" 1 new return "aggregate_stream" "aggregate_stream" new "classify_data" "classify_data" { ValueState<Integer> threshold; { .open(parameters); threshold = getRuntimeContext().getState( ValueStateDescriptor<Integer>( , Integer.class)); } { LabeledTestData labeledTestData = LabeledTestData(); labeledTestData.setKey(testData.getKey()); labeledTestData.setValue(testData.getValue()); labeledTestData.setCreatedAt(testData.getCreatedAt()); String label = ; (threshold.value() != ){ label = (testData.getValue() > threshold.value()) ? : ; } labeledTestData.setLabel(label); collector.collect(labeledTestData); } } < , > class ClassifyData extends RichFlatMapFunction TestData LabeledTestData @Override Exception public void open (Configuration parameters) throws super new "thresholdState" @Override Exception public void flatMap (TestData testData, Collector<LabeledTestData> collector) throws new "UNCLASSIFIED" if null "L1" "L2" Suppose, your job gets killed, and now you want to restart it using a checkpoint, but you need to modify the config as well. Earlier, there was no way to do so other than to wait for the job to start with old config and than overwrite it using a stream from Kafka or filesystem. Now, however, you can easily do that using the new API. Let’s modify the above example to do so. Bootstrapping the state Following are the necessary steps required to bootstrap your state Add Dependency org.apache.flink flink-state-processor-api_2.11 1.9.0 < > dependency < > groupId </ > groupId < > artifactId </ > artifactId < > version </ > version </ > dependency This is not included in the default Flink dependency and needs to be added separately in pom.xml file. Create a Bootstrap function { ValueState<Integer> threshold; { threshold = getRuntimeContext().getState( ValueStateDescriptor<Integer>( , Integer.class)); } { threshold.update(testConfig.getThresholdValue()); } } < , > class ConfigBootstrapper extends KeyedStateBootstrapFunction String TestConfig @Override Exception public void open (Configuration parameters) throws new "thresholdState" @Override Exception public void processElement (TestConfig testConfig, Context context) throws This function tells Flink what state to update when it receives the data. In this example, we are updating the threshold state using the TestConfig data we collect. Flow the config data { TestConfig testConfig = TestConfig(); testConfig.setKey( ); testConfig.setThresholdValue( ); DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig); BootstrapTransformation<TestConfig> transformation = OperatorTransformation .bootstrapWith(configDataSet) .keyBy(TestConfig::getKey) .transform( ConfigBootstrapper()); transformation; } BootstrapTransformation<TestConfig> getConfigTransformation (ExecutionEnvironment executionEnvironment) new "global" 10 new return Now, you need to flow the config data. The Flink state processor API works seamlessly with Dataset API. It doesn’t imply you can’t use bootstrapping in a Stream environment. It’s just that the data for bootstrapping can only be loaded using Batch API. You can create both Batch and Stream environment in a single job. Here, I have just created a single config object and then created a Dataset on top of it. We then create a transformation. It specifies what dataset to use with Bootstrap Function. Update the save point { { bootstrapConfig(); } } { ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint existingSavepoint = Savepoint.load(executionEnvironment, , MemoryStateBackend()); BootstrapTransformation<TestConfig> configTransformation = getConfigTransformation(executionEnvironment); String newSavepointPath = ; existingSavepoint.withOperator( , configTransformation).write(newSavepointPath); } public class TestCheckpointJob Exception public static void main (String[] args) throws //Rest same as previous code IOException static void bootstrapConfig () throws "oldSavepointPath" new "newSavepointPath" "classify_data" Next, we load the savepoint from the old directory and then update the states of the operators. To update the state, we need to specify the UID of the operator in the streaming job and the transformation created in the step. Once that is done, we can rewrite this modified savepoint in a new path. Do note that the new path contains the shallow copies of pointers from the old path. It means, deleting the old savepoint path will corrupt the new one, and hence you should refrain from doing so. Now, you can resume your Flink job using this new savepoint path. bin/flink run -s newSavepointPath test- .jar checkpoint You can even create a new Savepoint instead of updating the old one. For that, you need to do instead of Savepoint.create() Savepoint.load() Flink’s State Processor API was one of the most requested features and now it’s finally here. The API is available only in 1.9.0 and above versions. You can explore the whole API in . the official documentation Connect with me on LinkedIn or Twitter or drop a mail to kharekartik@gmail.com