Flink中StateBackend的方式
FsStateBackend/*** 从Kafka读取数据处理完毕写入Redis*/public class KafkaToRedis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
·
FsStateBackend
/**
* 从Kafka读取数据处理完毕写入Redis
*/
public class KafkaToRedis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpointing
env.enableCheckpointing(1000);
//设置StateBackEnd 存储在HDFS中
env.setStateBackend(new FsStateBackend("hdfs://mydfs/checkpoint"));
//设置cancel任务checkpoint数据的策略 cancel任务保留checkpoint的数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties properties = new Properties();
//设置Broker地址
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
//设置没有偏移量的话从头开始读取数据
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest");
//设置不自动提交偏移量
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//设置GroupID
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("wordcount", new SimpleStringSchema(), properties);
//设置在checkpoint后不提交偏移量到kafka特殊的topic中
kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
DataStreamSource<String> kafkaSource = env.addSource(kafkaConsumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word,1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(tp -> tp.f0).sum(1);
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("linux01").build();
//将数据写入redis
result.addSink(new RedisSink<Tuple2<String, Integer>>(config,new KafkaToRedisMapper()));
//启动程序
env.execute();
}
private static class KafkaToRedisMapper implements RedisMapper<Tuple2<String,Integer>>{
//设置Redis中的key
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"wordcount");
}
//设置Value -> Hash类型的key
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
//设置Value -> Hash类型的value
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}
}
HashMapStateBackend
/**
*
* 可以将状态保存在内存中,当内存中的状态太大时,可以溢写到本地磁盘,也可以将状态全量checkpoint到外部的HDFS中
*
* 优点:可以存储大量状态,长窗口,单个k-v较大的状态并且可以配置高可用,可以将数据保存在HDFS中
*
* StateBackend有两种方式:
* 1.为一个单独个性化配置,在代码中药env.setStateBackend(new HashMapStateBackend());
* 2.为整个集群配置群全局的StateBackend,需要在flink的配置文件中添加
* state.backend: hashmap
* state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
*/
public class HashMapStateBackendDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpointing
env.enableCheckpointing(10000);
//设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.seconds(5)));
//设置StateBackEnd的储存策略
env.setStateBackend(new HashMapStateBackend());
//将checkpoint数据保存在外部的HDFS中
env.getCheckpointConfig().setCheckpointStorage("hdfs://mydfs/checkpoint03");
//cancel程序保留checkpoint的数据 默认值为DELETE_ON_CANCELLATION
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStreamSource<String> lines = env.socketTextStream("linux01", 7777);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//按照单词keyBy
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(tp -> tp.f0).sum(1);
result.print();
env.execute();
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)