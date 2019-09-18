Coronavirus Data Sources [CONTRIBUTE]
public class TestCheckpointJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaConsumerProperties.setProperty("group.id", "test_group_id");
ObjectMapper objectMapper = new ObjectMapper();
FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<>("test_topic", new SimpleStringSchema(), kafkaConsumerProperties);
DataStream<String> kafkaSource = env.addSource(kafkaConsumer010).name("kafka_source").uid("kafka_source");
DataStream<TestData> aggregatedStream = kafkaSource
.map(row -> objectMapper.readValue(row, TestData.class))
.keyBy(TestData::getKey)
.timeWindow(Time.hours(1))
.reduce((rowA, rowB) -> {
TestData result = new TestData();
result.setKey(rowA.getKey());
result.setValue(rowA.getValue() + rowB.getValue());
result.setCreatedAt(System.currentTimeMillis());
return result;
}).name("aggregate_stream").uid("aggregate_stream");
DataStream<LabeledTestData> labeledTestDataDataStream = aggregatedStream.keyBy(TestData::getKey).flatMap(new ClassifyData()).name("classify_data").uid("classify_data");
labeledTestDataDataStream.map(row -> objectMapper.writeValueAsString(row)).print();
env.execute();
}
}
class ClassifyData extends RichFlatMapFunction<TestData, LabeledTestData>{
ValueState<Integer> threshold;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));
}
@Override
public void flatMap(TestData testData, Collector<LabeledTestData> collector) throws Exception {
LabeledTestData labeledTestData = new LabeledTestData();
labeledTestData.setKey(testData.getKey());
labeledTestData.setValue(testData.getValue());
labeledTestData.setCreatedAt(testData.getCreatedAt());
String label = "UNCLASSIFIED";
if(threshold.value() != null){
label = (testData.getValue() > threshold.value()) ? "L1" : "L2";
}
labeledTestData.setLabel(label);
collector.collect(labeledTestData);
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.11</artifactId>
<version>1.9.0</version>
</dependency>
class ConfigBootstrapper extends KeyedStateBootstrapFunction<String, TestConfig> {
ValueState<Integer> threshold;
@Override
public void open(Configuration parameters) throws Exception {
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));
}
@Override
public void processElement(TestConfig testConfig, Context context) throws Exception {
threshold.update(testConfig.getThresholdValue());
}
}
BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment){
TestConfig testConfig = new TestConfig();
testConfig.setKey("global");
testConfig.setThresholdValue(10);
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig);
BootstrapTransformation<TestConfig> transformation = OperatorTransformation
.bootstrapWith(configDataSet)
.keyBy(TestConfig::getKey)
.transform(new ConfigBootstrapper());
return transformation;
}
public class TestCheckpointJob {
public static void main(String[] args) throws Exception {
bootstrapConfig();
//Rest same as previous code
}
}
static void bootstrapConfig() throws IOException {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint existingSavepoint = Savepoint.load(executionEnvironment, "oldSavepointPath", new MemoryStateBackend());
BootstrapTransformation<TestConfig> configTransformation = getConfigTransformation(executionEnvironment);
String newSavepointPath = "newSavepointPath";
existingSavepoint.withOperator("classify_data", configTransformation).write(newSavepointPath);
}
bin/flink run -s newSavepointPath test-checkpoint.jar