Flink实战 —— 读取Kafka数据并与MySQL数据关联【附源码】
需要导入mysql驱动<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency>mysql建立表及插入数据D
·
目录
3.1 可以在idea本地启动 C01_QueryActivityName
一、功能需求说明
Flink 从 Kafka消息中间件中取数据,有活动id 无活动名称Name,Flink 通过与MySQL关联到活动名称返回
二、前期准备工作
2.1 需要导入mysql驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
2.2 mysql建立表及插入数据
DROP DATABASE IF EXISTS `flink_big_data`; -- 库名与项目名保持一致
CREATE DATABASE IF NOT EXISTS `flink_big_data` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
USE `flink_big_data`;
-- 活动列表
DROP TABLE IF EXISTS `t_activities`;
CREATE TABLE `t_activities` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键id, 必备字段',
`gmt_create` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间, 必备字段',
`gmt_modified` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间, 必备字段',
`a_id` VARCHAR(100) NOT NULL COMMENT '活动id',
`name` VARCHAR(100) NOT NULL COMMENT '活动名称',
`last_update` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间, 必备字段',
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;
-- 插入数据
INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A1', '新人礼包');
INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A2', '月末活动');
INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A3', '周末促销');
INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A4', '年度促销');
2.3 启动Kafka的topic
# 创建topic
kafka-topics --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 2 --partitions 3 --topic activity10
# 创建生产者
kafka-console-producer --broker-list node-01:9092,node-02:9092,node-03:9092 --topic activity10
2.4 编写 Flink相关应用代码
public class ActivityBean {
public String uid; // userId
public String aid; // activityId
public String activityName;
public String time;
public int eventType;
public double longitude;
public double latitude;
public String province;
public int count = 1;
public ActivityBean() {
}
public ActivityBean(String uid, String aid, String activityName, String time, int eventType, String province) {
this.uid = uid;
this.aid = aid;
this.activityName = activityName;
this.time = time;
this.eventType = eventType;
this.province = province;
}
public ActivityBean(String uid, String aid, String activityName, String time, int eventType, double longitude, double latitude, String province) {
this.uid = uid;
this.aid = aid;
this.activityName = activityName;
this.time = time;
this.eventType = eventType;
this.longitude = longitude;
this.latitude = latitude;
this.province = province;
}
@Override
public String toString() {
return "ActivityBean{" +
"uid='" + uid + '\'' +
", aid='" + aid + '\'' +
", activityName='" + activityName + '\'' +
", time='" + time + '\'' +
", eventType=" + eventType +
", longitude=" + longitude +
", latitude=" + latitude +
", province='" + province + '\'' +
", count=" + count +
'}';
}
public static ActivityBean of(String uid, String aid, String activityName, String time, int eventType, String province) {
return new ActivityBean(uid, aid, activityName, time, eventType, province);
}
public static ActivityBean of(String uid, String aid, String activityName, String time, int eventType, double longitude, double latitude, String province) {
return new ActivityBean(uid, aid, activityName, time, eventType, longitude, latitude, province);
}
}
public class C01_DataToActivityBeanFunction extends RichMapFunction<String, ActivityBean> {
private Connection connection = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 创建MySQL连接
// 这里不应该对异常进行捕获,让Flink自行处理,比如重启之类的
// 如果捕获异常了,则Flink无法捕获到该异常
String url = "jdbc:mysql://localhost:3306/flink_big_data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false";
String user = "root";
String password = "1234";
connection = DriverManager.getConnection(url, user, password);
}
@Override
public ActivityBean map(String line) throws Exception {
String[] fields = line.split(",");
String uid = fields[0];
String aid = fields[1];
// 根据aid作为查询条件查询出name
// 最好使用简单的关联查询,MySQL也可以进行关联查询
PreparedStatement preparedStatement = connection.prepareStatement("SELECT name FROM t_activities WHERE a_id = ?");
preparedStatement.setString(1, aid);
ResultSet resultSet = preparedStatement.executeQuery();
String name = null;
while (resultSet.next()) {
name = resultSet.getString(1);
}
String time = fields[2];
int eventType = Integer.parseInt(fields[3]);
String province = fields[4];
return ActivityBean.of(uid, aid, name, time, eventType, province);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
}
}
public class C01_QueryActivityName {
public static void main(String[] args) throws Exception {
// topic:activity10 分区3,副本2
// # 创建topic
// kafka-topics --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 2 --partitions 3 --topic activity10
// # 创建生产者
// kafka-console-producer --broker-list node-01:9092,node-02:9092,node-03:9092 --topic activity10
// 输入参数:activity10 group_id_flink node-01:9092,node-02:9092,node-03:9092
DataStream<String> lines = FlinkUtilsV1.createKafkaStream(args, new SimpleStringSchema());
SingleOutputStreamOperator<ActivityBean> beans = lines.map(new C01_DataToActivityBeanFunction());
beans.print();
FlinkUtilsV1.getEnv().execute("C01_QueryActivityName");
}
}
三、 启动Flink 应用程序及向Kafka生产数据
3.1 可以在idea本地启动 C01_QueryActivityName
3.2 通过向Kafka-producer生产数据
u001,A1,2019-09-02 10:10:11,1,北京市
u001,A2,2019-09-02 10:10:11,1,北京市
u001,A3,2019-09-02 10:10:11,1,北京市
u001,A4,2019-09-02 10:10:11,1,北京市
u002,A1,2019-09-02 10:11:11,1,辽宁省
u001,A1,2019-09-02 10:11:11,2,北京市
u001,A1,2019-09-02 10:11:30,3,北京市
u002,A1,2019-09-02 10:12:11,2,辽宁省
u001,A1,2019-09-02 10:10:11,1,北京市
u001,A1,2019-09-02 10:10:11,1,北京市
u001,A1,2019-09-02 10:10:11,1,北京市
u001,A1,2019-09-02 10:10:11,1,北京市
运行结果如下: 发送的数据是活动id,Flink 通过活动id从MySQL获取活动名称
文章最后,给大家推荐一些受欢迎的技术博客链接:
- JAVA相关的深度技术博客链接
- Flink 相关技术博客链接
- Spark 核心技术链接
- 设计模式 —— 深度技术博客链接
- 机器学习 —— 深度技术博客链接
- Hadoop相关技术博客链接
- 超全干货--Flink思维导图,花了3周左右编写、校对
- 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
- 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
- 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
- 深入聊聊Java 垃圾回收机制【附原理图及调优方法】
欢迎扫描下方的二维码或 搜索 公众号“大数据高级架构师”,我们会有更多、且及时的资料推送给您,欢迎多多交流!
更多推荐
已为社区贡献2条内容
所有评论(0)