关于流处理框架Flink的入门使用
1、什么是flinkflink是一种流处理框架,通常使用场景是消费kafka数据进行分组聚合后发送到其他系统,分组与聚合是flink的核心,在本文中仅阐述单个使用场景。流数据相当于是连续不断的数据,生产上的kafka中的日志数据就可以理解为流数据,流数据还分为有界流和无界流,有界即文本数据作为datastream这种有固定大小的数据,无界即源源不断的数据。2、flink的界面下图为flink的界面
1、什么是flink
flink是一种流处理框架,通常使用场景是消费kafka数据进行分组聚合后发送到其他系统,分组与聚合是flink的核心,在本文中仅阐述单个使用场景。流数据相当于是连续不断的数据,生产上的kafka中的日志数据就可以理解为流数据,流数据还分为有界流和无界流,有界即文本数据作为datastream这种有固定大小的数据,无界即源源不断的数据。
2、flink的界面
下图为flink的界面,在界面中可以提交代码jar包,即可实时运行处理
3、flink结合代码案例讲解使用场景
在main入口函数中定义以下方法
//获取流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据流
DataStream<String> stringDataStreamSource = env.socketTextStream("127.0.0.1", 6666);
//转pojo
SingleOutputStreamOperator<KafkaEntity> map = stringDataStreamSource.map(new MapFunction<String, KafkaEntity>() {
@Override
public KafkaEntity map(String value) throws Exception {
KafkaEntity kafkaEntity = new KafkaEntity();
if (!"".equals(value)){
String[] splitResult = value.split("1");
kafkaEntity.setCityId(splitResult[0]);
kafkaEntity.setAppId(splitResult[1]);
kafkaEntity.setProcessCode(splitResult[2]);
kafkaEntity.setStartTime(splitResult[3].substring(0,12));
kafkaEntity.setErrCode(splitResult[4]);
}
return kafkaEntity;
}
});
//分组,聚合
SingleOutputStreamOperator<Object> applyResult = map.keyBy("processCode", "appId", "cityId", "startTime")
.timeWindow(Time.seconds(15))//每隔15秒聚合一次
.apply(new WindowFunction<KafkaEntity, Object, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<KafkaEntity> input, Collector<Object> out) throws Exception {
//调用总次数
KafkaEntity aggregateResult = input.iterator().next();
int reqAmount = IteratorUtils.toList(input.iterator()).size();
//成功次数
int successAmount = 0;
//总时长
long timeAll = 0;
//限流次数
int failAmount = 0;
List<KafkaEntity> list = IteratorUtils.toList(input.iterator());
for (int i = 0; i < list.size(); i++) {
KafkaEntity kafkaEntity = list.get(i);
timeAll += Long.parseLong(kafkaEntity.getDuration());
if ("0".equals(kafkaEntity.getErrCode())) {
successAmount += 1;
} else {
failAmount += 1;
}
}
//平均调用时长
long averageDuration = (timeAll / reqAmount);
//聚合结果
aggregateResult.setReqAmount(String.valueOf(reqAmount));
aggregateResult.setSuccessAmount(String.valueOf(successAmount));
aggregateResult.setAverageDuration(String.valueOf(averageDuration));
aggregateResult.setFailAmount(String.valueOf(failAmount));
aggregateResult.setInsertTime(new Date());
out.collect(aggregateResult);
}
});
applyResult.addSink(new RichSinkOperation());
env.execute();
4、代码解释
4.1
首先需要获取流环境
4.2
以socket文本流代替kafka消费者,在linux中使用nc -lk 6666 启动,然后写文本发送即可模拟kafka消费者读取数据,这里也是通过第一步的流环境来获取数据流
4.3
获取到数据流后,将datastream通过map方法(这也可以当作一种算子)转为pojo类,到此,数据准备完成
4.4
SingleOutputStreamOperator也是datastream的子类,我们将获取到的pojo流通过keyby分组,分组的维度是四个,即"processCode", “appId”, “cityId”, “startTime”,只要收到的数据中有一个元素与上一个不同,即为新的一个组
4.5
分组以后通过timewindow设置窗口大小为15秒,即15秒进行一次聚合,聚合方法为下面的apply
4.6
apply方法是对15秒内收到的数据根据用户自定义来做数据处理
KafkaEntity aggregateResult = input.iterator().next();代表按那四个维度来分组得到的pojo对象,同一组中那四个属性都是一样的,在本例中由此来计算同一组的总次数即按当前维度分组后,每组的数据个数,即list的大小,重新计算后放入pojo的一个属性中,最终通过out.collect方法将计算得到的结果汇总在一个对象的几个属性中输出
4.7
applyResult为聚合后的结果,最后一步为将聚合结果输出到外部系统,这里举例为入数据库(redis或hbase都一样)
4.8
public class RichSinkOperation extends RichSinkFunction {
@Override
public void invoke(Object value) throws Exception {
InputStream inputStream = Resources.getResourceAsStream("mybatis-config.xml");
//获取工厂
SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(inputStream);
SqlSession sqlSession = factory.openSession();
FlinkDao flinkDao = sqlSession.getMapper(FlinkDao .class);
KafkaEntity kafkaEntity = (KafkaEntity) value;
flinkDao.insertRecord(kafkaEntity);
sqlSession.commit();
}
@Override
public void open(Configuration parameters) throws Exception {
}
}
此处集成了mybatis,该自定义类继承RichSinkFunction,主要实现invoke方法,将聚合结果的每一条进行入库处理
本例代码仅为很局限的场景使用,仅为打通整体流程,需要根据业务不同定义不同的apply处理办法,此处的sink操作中也不合理,生产中数据库连接应该放在open中并使用数据池,另外还需要考虑生产每分钟都是上亿的数据,如果开一分钟的窗口,聚合结果都在内存中内存会不会炸,聚合后一次性sink数据库操作会不会阻塞,需要压测来得到实际效果验证。
更多推荐
所有评论(0)