FsStateBackend

/** 
 * 从Kafka读取数据处理完毕写入Redis
 */
public class KafkaToRedis {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //开启checkpointing
        env.enableCheckpointing(1000);
        //设置StateBackEnd 存储在HDFS中
        env.setStateBackend(new FsStateBackend("hdfs://mydfs/checkpoint"));
        //设置cancel任务checkpoint数据的策略  cancel任务保留checkpoint的数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        Properties properties = new Properties();
        //设置Broker地址
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        //设置没有偏移量的话从头开始读取数据
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_DOC,"earliest");
        //设置不自动提交偏移量
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        //设置GroupID
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("wordcount", new SimpleStringSchema(), properties);
        //设置在checkpoint后不提交偏移量到kafka特殊的topic中
        kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
        DataStreamSource<String> kafkaSource = env.addSource(kafkaConsumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word,1));
                }
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(tp -> tp.f0).sum(1);
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("linux01").build();
        //将数据写入redis
        result.addSink(new RedisSink<Tuple2<String, Integer>>(config,new KafkaToRedisMapper()));
        //启动程序
        env.execute();
    }
    private static class KafkaToRedisMapper implements RedisMapper<Tuple2<String,Integer>>{
        //设置Redis中的key
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET,"wordcount");
        }
        //设置Value -> Hash类型的key
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }
        //设置Value -> Hash类型的value
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }
}

 HashMapStateBackend

/**
 *
 * 可以将状态保存在内存中,当内存中的状态太大时,可以溢写到本地磁盘,也可以将状态全量checkpoint到外部的HDFS中
 *
 * 优点:可以存储大量状态,长窗口,单个k-v较大的状态并且可以配置高可用,可以将数据保存在HDFS中
 *
 * StateBackend有两种方式:
 * 1.为一个单独个性化配置,在代码中药env.setStateBackend(new HashMapStateBackend());
 * 2.为整个集群配置群全局的StateBackend,需要在flink的配置文件中添加
 * state.backend: hashmap
 * state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
 */
public class HashMapStateBackendDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //开启checkpointing
        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.seconds(5)));
        //设置StateBackEnd的储存策略
        env.setStateBackend(new HashMapStateBackend());
        //将checkpoint数据保存在外部的HDFS中
        env.getCheckpointConfig().setCheckpointStorage("hdfs://mydfs/checkpoint03");
        //cancel程序保留checkpoint的数据 默认值为DELETE_ON_CANCELLATION
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DataStreamSource<String> lines = env.socketTextStream("linux01", 7777);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String input, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = input.split(" ");

                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //按照单词keyBy
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(tp -> tp.f0).sum(1);

        result.print();

        env.execute();


    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐