Flink数据写入Redis
Flink数据写入Redis点关注不迷路,欢迎再访!框架springboot ,redis 集群配置通过propertie文件形式,@Value获取配置参数。Source :fakfaSink :redis作业内容:通过kafka 主题获取数据,将统计同一个身份证每天报价的次数,将统计结果实时写入redis。思路:以身份证唯一性,通过keyBy(0) 身份证号分区可以实现统...
·
Flink数据写入Redis
点关注不迷路,欢迎再访!
精简博客内容,尽量已专业术语来分享。
努力做到对每一位认可自己的读者负责。
帮助别人的同时更是丰富自己的良机。
框架springboot ,redis 集群配置通过properties文件形式,@Value获取配置参数。
Source :fakfa
Sink :redis
作业内容:通过kafka 主题获取数据,将统计同一个身份证每天报价的次数,将统计结果实时写入redis。
思路:以身份证唯一性,通过keyBy(0) 身份证号分区可以实现统计对应身份证实时报价次数,最终数据落地sink到redis。
一.引入redis依赖
<!-- redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- /redis -->
二.连接Redis集群配置类
/**
*<p>
*description:Redis属性相关配置
*</p>
* @author ex_sunqi
* @since 2019年8月20日
* @see
*/
@Configuration
@EnableTransactionManagement
public class RedisConfig {
Log logger = LogFactory.getLog(this.getClass());
@Value("${redis.master}")
String redisMaster;
@Value("${redis.password}")
String redisPassword;
@Value("${redis.sentinel1}")
String redisSentinel1;
@Value("${redis.sentinel2}")
String redisSentinel2;
@Value("${redis.sentinel3}")
String redisSentinel3;
@Value("${redis.testOnBorrow}")
String redisTestOnBorrow;
@Value("${redis.maxIdle}")
String redisMaxIdle;
@Value("${redis.maxWaitMillis}")
String redisMaxWaitMillis;
@Value("${redis.maxTotal}")
String redisMaxTotal;
@Bean(name = "redisProps")
public Properties RedisProps() {
Properties props = new Properties();
props.setProperty("redis.master", redisMaster);
props.setProperty("redis.password", redisPassword);
props.setProperty("redis.sentinel1", redisSentinel1);
props.setProperty("redis.sentinel2", redisSentinel2);
props.setProperty("redis.sentinel3", redisSentinel3);
props.setProperty("redis.testOnBorrow", redisTestOnBorrow);
props.setProperty("redis.maxIdle", redisMaxIdle);
props.setProperty("redis.maxWaitMillis", redisMaxWaitMillis);
props.setProperty("redis.maxTotal", redisMaxTotal);
return props;
}
}
三.连接Flink配置类,ICAScoreCalculation 自定义业务数据处理类
/**
* @author ex_sunqi
*
*/
@Component
public class KafkaFlinkJob implements ApplicationRunner {
private final static Logger logger = LoggerFactory.getLogger(KafkaFlinkJob.class);
@Autowired
private Properties kafkaProps;
@Autowired
private Properties redisProps;
@SuppressWarnings("all")
@Override
public void run(ApplicationArguments args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(6000);
env.setStateBackend( new FsStateBackend("file:///opt/tpapp/flinkdata", true ));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setParallelism(1);
DataStream<String> sourceStream = readFromKafka(env,kafkaProps);
DataStream<Tuple5<String,List<ICAnomalyDetectionTuple>, List<String>,Integer,Date>> sourceStreamTra = sourceStream
.map(new MapFunction<String, Tuple2<String,String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String,String> map(String value)
throws Exception {
JSONObject object = JSONObject.parseObject(value);
return new Tuple2<String,String>(object.getString("idard"),value);
}
})
.name("source data prase")
.setParallelism(1)
.keyBy(0)
.map(new ICAScoreCalculation())
.name("score calculation")
.setParallelism(1);
//将数据写入RedisSink redis
sourceStreamTra.addSink(new RedisSink(redisProps))
.name("score-data-mysink")
.setParallelism(1);
env.execute("flink-score-job");
}
public static DataStream<String> readFromKafka(StreamExecutionEnvironment env,Properties kafkaProps) {
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<>("score-topic-test", new SimpleStringSchema(), kafkaProps))
.name("kafka-source")
.setParallelism(1);
return stream;
}
}
三.连接redis提供数据写入
/**
* <p>
* description:
* </p>
*
* @author ex_sunqi
* @since 2019年8月8日
* @see
*/
public class RedisSink extends RichSinkFunction<Tuple5<String, List<ICAnomalyDetectionTuple>, List<String>, Integer, Date>> {
/**
*
*/
private static final long serialVersionUID = -939279650400545711L;
private final static Logger logger = LoggerFactory.getLogger(RedisSink.class);
private static JedisSentinelPool pool = null;
private Properties redisProps = null;
public RedisSink(Properties redisProps) {
this.redisProps = redisProps;
}
@Override
public void invoke(Tuple5<String, List<ICAnomalyDetectionTuple>, List<String>, Integer, Date> value) throws Exception {
if (pool == null) createJedisPool();
Jedis jedis = pool.getResource();
Map<String,String> map = new HashMap<String,String>();
map.put("ica", value.f1.toString());
map.put("num", String.valueOf(value.f3));
try {
//value.f0 身份证号,通过HASH写入redis
jedis.hmset(value.f0, map);
logger.info("存入redis成功");
}
catch(Exception e) {
logger.error("存入redis失败:"+e.getMessage());
System.out.println(e.getMessage());
}
finally {
if(jedis!=null) {
jedis.close();
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
this.createJedisPool();
}
@Override
public void close() throws Exception {
pool.close();
}
/**
* 创建redis连接池
*
*/
private void createJedisPool() {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(Integer.parseInt(redisProps.getProperty("redis.maxTotal")));
config.setMaxWaitMillis(Long.parseLong(redisProps.getProperty("redis.maxWaitMillis")));
config.setMaxIdle(Integer.parseInt(redisProps.getProperty("redis.maxIdle")));
config.setTestOnBorrow(redisProps.getProperty("redis.testOnBorrow").equals("false"));
String password = redisProps.getProperty("redis.password");
String masterName = redisProps.getProperty("redis.master");
Set<String> sentinels = new HashSet<String>();
sentinels.add(redisProps.getProperty("redis.sentinel1"));
sentinels.add(redisProps.getProperty("redis.sentinel2"));
sentinels.add(redisProps.getProperty("redis.sentinel3"));
pool = new JedisSentinelPool(masterName, sentinels, config,3000,password);
}
}
四.redis测试类
/**
*<p>
*description:
*</p>
* @author ex_sunqi
* @since 2019年8月5日
* @see
*/
public class RedisTest {
/**
* 创建连接池
*
*/
private JedisSentinelPool createJedisPool() {
// 建立连接池配置参数
JedisPoolConfig config = new JedisPoolConfig();
// Properties prop = getJedisProperties();
// 设置最大连接数
config.setMaxTotal(Integer.parseInt("2"));
// 设置最大阻塞时间,记住是毫秒数milliseconds
config.setMaxWaitMillis(Long.parseLong("2"));
// 设置空间连接
config.setMaxIdle(Integer.parseInt("2"));
// jedis实例是否可用
config.setTestOnBorrow(false);
//获取redis密码
String password = "****";
String masterName = "master1";
Set<String> sentinels = new HashSet<String>();
//模拟reids集群
sentinels.add("x.x.x.x:xxx");
sentinels.add("x.x.x.x:xxx");
sentinels.add("x.x.x.x:xxx");
JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinels, config,3000,password);
return pool;
}
@Test
public void testRedisData() {
JedisSentinelPool pool = createJedisPool();
Jedis jedis = null;
try {
jedis= pool.getResource();
List<String> list = jedis.hmget("**********","ica","num");
list.stream().forEach(System.out::println);
}
catch(Exception e) {
System.out.println(e.getMessage());
}
finally {
if(jedis!=null) {
jedis.close();
}
}
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)