Kartik Khare

Software Developer by choice!!

The API to Bootstrap Your Flink Jobs Has Arrived

Apache Flink is one of the most versatile data streaming open-source solution that exists. It supports all the primary functions of a typical batch processing system such as SQL, Connectors to Hive, Group By, etc. while providing fault-tolerance and exactly-once semantics. Hence, you can create a multitude of push-based applications using it.
However, one of the significant drawbacks of the Apache Flink has been the inability to modify the checkpointed state of the program. Let’s first see what I mean by that.


Flink provides fault-tolerance by using a mechanism called checkpointing. It periodically takes a snapshot of all the stateful operators/functions of your programs and stores them in a highly durable store such as HDFS.
Checkpointing allows the Flink program to resume from this snapshot. This is helpful in the cases of failures due to some error such as a simple exception not handled or a loss of data node in your YARN/Mesos/k8s cluster.
This snapshot is stored in a binary format only understood by Flink, which makes it difficult to modify the state before restart.

Why would you need to modify the data?

There can be multiple cases where you might need only partial data from the checkpoint and may want to update the other. An example job being 
Reading numerical data from one Kafka topicAggregate over a window of 1 hourClassify using some statistical thresholds provided by config stored in your operator’s state.
public class TestCheckpointJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties kafkaConsumerProperties = new Properties();
        kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092");
        kafkaConsumerProperties.setProperty("group.id", "test_group_id");

        ObjectMapper objectMapper = new ObjectMapper();
        FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<>("test_topic", new SimpleStringSchema(), kafkaConsumerProperties);
        DataStream<String> kafkaSource =  env.addSource(kafkaConsumer010).name("kafka_source").uid("kafka_source");
        DataStream<TestData> aggregatedStream = kafkaSource
                .map(row -> objectMapper.readValue(row, TestData.class))
                .reduce((rowA, rowB) -> {
                   TestData result = new TestData();
                    result.setValue(rowA.getValue() + rowB.getValue());
                    return result;

        DataStream<LabeledTestData> labeledTestDataDataStream =  aggregatedStream.keyBy(TestData::getKey).flatMap(new ClassifyData()).name("classify_data").uid("classify_data");
        labeledTestDataDataStream.map(row -> objectMapper.writeValueAsString(row)).print();
class ClassifyData extends RichFlatMapFunction<TestData, LabeledTestData>{

        ValueState<Integer> threshold;

        public void open(Configuration parameters) throws Exception {
            threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));

        public void flatMap(TestData testData, Collector<LabeledTestData> collector) throws Exception {
            LabeledTestData labeledTestData = new LabeledTestData();
            String label = "UNCLASSIFIED";
            if(threshold.value() != null){
                label = (testData.getValue() > threshold.value()) ? "L1" : "L2";
Suppose, your job gets killed, and now you want to restart it using a checkpoint, but you need to modify the config as well. Earlier, there was no way to do so other than to wait for the job to start with old config and than overwrite it using a stream from Kafka or filesystem.
Now, however, you can easily do that using the new API. Let’s modify the above example to do so.

Bootstrapping the state

Following are the necessary steps required to bootstrap your state

Add Dependency

This is not included in the default Flink dependency and needs to be added separately in pom.xml file. 

Create a Bootstrap function

class ConfigBootstrapper extends KeyedStateBootstrapFunction<String, TestConfig> {
        ValueState<Integer> threshold;

        public void open(Configuration parameters) throws Exception {
            threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));

        public void processElement(TestConfig testConfig, Context context) throws Exception {
This function tells Flink what state to update when it receives the data. In this example, we are updating the threshold state using the TestConfig data we collect.

Flow the config data

BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment){
     TestConfig testConfig = new TestConfig();

        DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig);
        BootstrapTransformation<TestConfig> transformation = OperatorTransformation
                .transform(new ConfigBootstrapper());
        return transformation;
Now, you need to flow the config data. The Flink state processor API works seamlessly with Dataset API. It doesn’t imply you can’t use bootstrapping in a Stream environment. It’s just that the data for bootstrapping can only be loaded using Batch API. You can create both Batch and Stream environment in a single job.
Here, I have just created a single config object and then created a Dataset on top of it. We then create a transformation. It specifies what dataset to use with Bootstrap Function.

Update the save point

public class TestCheckpointJob {

    public static void main(String[] args) throws Exception {
      //Rest same as previous code

static void bootstrapConfig() throws IOException {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint existingSavepoint = Savepoint.load(executionEnvironment, "oldSavepointPath", new MemoryStateBackend());
        BootstrapTransformation<TestConfig> configTransformation = getConfigTransformation(executionEnvironment);
        String newSavepointPath = "newSavepointPath";
        existingSavepoint.withOperator("classify_data", configTransformation).write(newSavepointPath);
Next, we load the savepoint from the old directory and then update the states of the operators. To update the state, we need to specify the UID of the operator in the streaming job and the transformation created in the step.
Once that is done, we can rewrite this modified savepoint in a new path. Do note that the new path contains the shallow copies of pointers from the old path. It means, deleting the old savepoint path will corrupt the new one, and hence you should refrain from doing so.
Now, you can resume your Flink job using this new savepoint path.
bin/flink run -s newSavepointPath test-checkpoint.jar
 You can even create a new Savepoint instead of updating the old one. For that, you need to do Savepoint.create() instead of Savepoint.load() 
Flink’s State Processor API was one of the most requested features and now it’s finally here. The API is available only in 1.9.0 and above versions.
You can explore the whole API in the official documentation.
Connect with me on LinkedIn or Twitter or drop a mail to kharekartik@gmail.com



More by Kartik Khare

Topics of interest