Flink数据写入Redis

   点关注不迷路,欢迎再访!		

精简博客内容,尽量已专业术语来分享。
努力做到对每一位认可自己的读者负责。
帮助别人的同时更是丰富自己的良机。

框架springboot ,redis 集群配置通过properties文件形式,@Value获取配置参数。
Source :fakfa
Sink :redis

作业内容:通过kafka 主题获取数据,将统计同一个身份证每天报价的次数,将统计结果实时写入redis。

思路:以身份证唯一性,通过keyBy(0) 身份证号分区可以实现统计对应身份证实时报价次数,最终数据落地sink到redis。

一.引入redis依赖

 <!-- redis -->
		<dependency> 
		<groupId>redis.clients</groupId> 
		<artifactId>jedis</artifactId> 
		<version>2.9.0</version> 
		</dependency> 
<!-- /redis -->

二.连接Redis集群配置类

/**
 *<p>
 *description:Redis属性相关配置
 *</p>
 * @author ex_sunqi
 * @since 2019年8月20日
 * @see 
 */
@Configuration
@EnableTransactionManagement
public class RedisConfig {
    Log logger = LogFactory.getLog(this.getClass());

    @Value("${redis.master}")
    String redisMaster;

    @Value("${redis.password}")
    String redisPassword;

    @Value("${redis.sentinel1}")
    String redisSentinel1;

    @Value("${redis.sentinel2}")
    String redisSentinel2;

    @Value("${redis.sentinel3}")
    String redisSentinel3;
    
    @Value("${redis.testOnBorrow}")
    String redisTestOnBorrow;
    
    @Value("${redis.maxIdle}")
    String redisMaxIdle;
    
    @Value("${redis.maxWaitMillis}")
    String redisMaxWaitMillis;
    
    @Value("${redis.maxTotal}")
    String redisMaxTotal;
    
    @Bean(name = "redisProps")
    public Properties RedisProps() {
        Properties props = new Properties();
        props.setProperty("redis.master", redisMaster);
        props.setProperty("redis.password", redisPassword);
        props.setProperty("redis.sentinel1", redisSentinel1);
        props.setProperty("redis.sentinel2", redisSentinel2);
        props.setProperty("redis.sentinel3", redisSentinel3);
        props.setProperty("redis.testOnBorrow", redisTestOnBorrow);
        props.setProperty("redis.maxIdle", redisMaxIdle);
        props.setProperty("redis.maxWaitMillis", redisMaxWaitMillis);
        props.setProperty("redis.maxTotal", redisMaxTotal);
        return props;
    }

}

三.连接Flink配置类,ICAScoreCalculation 自定义业务数据处理类

/**
 * @author ex_sunqi
 *
 */
@Component
public class KafkaFlinkJob implements ApplicationRunner {

    private final static Logger logger = LoggerFactory.getLogger(KafkaFlinkJob.class);

    @Autowired
    private Properties kafkaProps;
    
    @Autowired
    private Properties redisProps;
    

    @SuppressWarnings("all")
    @Override
    public void run(ApplicationArguments args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(6000);
        env.setStateBackend( new FsStateBackend("file:///opt/tpapp/flinkdata", true ));        
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);        
        env.setParallelism(1);
                     
        DataStream<String> sourceStream = readFromKafka(env,kafkaProps);        
										
		DataStream<Tuple5<String,List<ICAnomalyDetectionTuple>, List<String>,Integer,Date>> sourceStreamTra = sourceStream
				.map(new MapFunction<String, Tuple2<String,String>>() {
					private static final long serialVersionUID = 1L;
						@Override
						public Tuple2<String,String> map(String value)
							throws Exception {
							JSONObject object = JSONObject.parseObject(value);
							return new Tuple2<String,String>(object.getString("idard"),value);
						}
				})
				.name("source data prase")
				.setParallelism(1)
				.keyBy(0)
				.map(new ICAScoreCalculation())
				.name("score calculation")
				.setParallelism(1);

		//将数据写入RedisSink  redis
        sourceStreamTra.addSink(new RedisSink(redisProps))
        			   .name("score-data-mysink")
        			   .setParallelism(1);
        
        env.execute("flink-score-job");    	

    }
    

	public static DataStream<String> readFromKafka(StreamExecutionEnvironment env,Properties kafkaProps) {

        DataStream<String> stream = env
        		.addSource(new FlinkKafkaConsumer09<>("score-topic-test", new SimpleStringSchema(), kafkaProps))
                .name("kafka-source")
                .setParallelism(1);
        
        return stream;
    }
    
}

三.连接redis提供数据写入

/**
 * <p>
 * description:
 * </p>
 * 
 * @author ex_sunqi
 * @since 2019年8月8日
 * @see
 */
public class RedisSink extends RichSinkFunction<Tuple5<String, List<ICAnomalyDetectionTuple>, List<String>, Integer, Date>> {

    /**
     * 
     */
    private static final long serialVersionUID = -939279650400545711L;
    
    private final static Logger logger = LoggerFactory.getLogger(RedisSink.class);

    private static JedisSentinelPool pool = null;
    
    private Properties redisProps = null;

    public RedisSink(Properties redisProps) {
        this.redisProps = redisProps;
    }

    @Override
    public void invoke(Tuple5<String, List<ICAnomalyDetectionTuple>, List<String>, Integer, Date> value) throws Exception {
        if (pool == null) createJedisPool();
        Jedis jedis = pool.getResource();      
        Map<String,String> map = new HashMap<String,String>();
        map.put("ica", value.f1.toString());
        map.put("num", String.valueOf(value.f3));
        try {
            //value.f0 身份证号,通过HASH写入redis
            jedis.hmset(value.f0, map);
            logger.info("存入redis成功");
        }
        catch(Exception e) {
        	logger.error("存入redis失败:"+e.getMessage());
            System.out.println(e.getMessage());
        }
        finally {
            if(jedis!=null) {
                jedis.close();
            }
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        this.createJedisPool();

    }

    @Override
    public void close() throws Exception {
        pool.close();
    }
    
    /**
     * 创建redis连接池
     *
     */
    private  void createJedisPool() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(Integer.parseInt(redisProps.getProperty("redis.maxTotal")));
        config.setMaxWaitMillis(Long.parseLong(redisProps.getProperty("redis.maxWaitMillis")));
        config.setMaxIdle(Integer.parseInt(redisProps.getProperty("redis.maxIdle")));
        config.setTestOnBorrow(redisProps.getProperty("redis.testOnBorrow").equals("false"));
        String password = redisProps.getProperty("redis.password");
        String masterName = redisProps.getProperty("redis.master");
        Set<String> sentinels = new HashSet<String>();
        sentinels.add(redisProps.getProperty("redis.sentinel1"));
        sentinels.add(redisProps.getProperty("redis.sentinel2"));
        sentinels.add(redisProps.getProperty("redis.sentinel3"));
        pool = new JedisSentinelPool(masterName, sentinels, config,3000,password);
    }
}

四.redis测试类

/**
 *<p>
 *description:
 *</p>
 * @author ex_sunqi
 * @since 2019年8月5日
 * @see 
 */
public class RedisTest {
    
    /**
     * 创建连接池
     *
     */
    private  JedisSentinelPool createJedisPool() {
        // 建立连接池配置参数
        JedisPoolConfig config = new JedisPoolConfig();
//        Properties prop = getJedisProperties();
        // 设置最大连接数
        config.setMaxTotal(Integer.parseInt("2"));
        // 设置最大阻塞时间,记住是毫秒数milliseconds
        config.setMaxWaitMillis(Long.parseLong("2"));
        // 设置空间连接
        config.setMaxIdle(Integer.parseInt("2"));
        // jedis实例是否可用
        config.setTestOnBorrow(false);
        //获取redis密码
        String password = "****";
 
        String masterName = "master1";
        Set<String> sentinels = new HashSet<String>();
        //模拟reids集群
        sentinels.add("x.x.x.x:xxx");
        sentinels.add("x.x.x.x:xxx");
        sentinels.add("x.x.x.x:xxx");
        JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinels, config,3000,password);
        return pool;
    }
    
    @Test
    public void testRedisData() {
        JedisSentinelPool pool  = createJedisPool();
        Jedis jedis = null;
        try {
            jedis= pool.getResource();         
            List<String> list = jedis.hmget("**********","ica","num");
            list.stream().forEach(System.out::println);
        }
        catch(Exception e) {
            System.out.println(e.getMessage());
        }
        finally {
            if(jedis!=null) {
                jedis.close();
            }
        }
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐