Flink读取Kafka 消息并批量写入到 MySQL8.0
前言说明环境搭建可参考《kafka+flink集成实例》本实例主要实现功能如下:模拟消息生成->Kafka->Flink->Mysql其中Flink做数据流收集并定时批量写入到Mysql环境依赖本例使用Intellij IDEA作为项目开发的IDE。首先创建Maven project,group为’com.zuoan’,artifact id为‘flink-kafka-mysql
前言说明
环境搭建可参考 《Flink1.11.0+Kafka2.3.0+MySQL8的快速安装部署》
本实例主要实现功能如下:
模拟消息生成->Kafka->Flink->Mysql
其中Flink做数据流收集并定时批量写入到Mysql
环境依赖
本例使用Intellij IDEA作为项目开发的IDE。
首先创建Maven project,group为’com.zuoan’,artifact id为‘flink-kafka-mysql’,version为‘1.0-SNAPSHOT’。整个项目结构如图所示:
特别注意:
本项目所使用的MySQL的版本是8.0,而且是安装在虚拟机上,需要修改配置,才可以让本机连接到虚拟机上面的MySQL数据库
MySQL8.0允许外部访问
https://www.cnblogs.com/ningy1009/p/12806748.html
项目完整代码已提交至 github
https://github.com/peigenxiao/flink-kafka-mysql
具体的项目代码内容如下:
POM文件内容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zuoan</groupId>
<artifactId>flink-kafka-mysql</artifactId>
<version>1.0-SNAPSHOT</version>
<description>flink+kafka+mysql实例</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!--如果要打包的话,这里要换成对应的 main class-->
<mainClass>com.zuoan.KafkaSinkMysql</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
<encoding>utf8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
Mysql数据表创建:
CREATE TABLE `employee` (
`id` bigint(20) DEFAULT NULL,
`name` varchar(50) DEFAULT NULL,
`password` varchar(50) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`salary` int(11) DEFAULT NULL,
`department` varchar(20) DEFAULT NULL
) ENGINE=MyISAM AUTO_INCREMENT=90001 DEFAULT CHARSET=utf8mb4;
模拟数据生成代码
新建一个employee实体类
package com.zuoan;
public class Employee {
public int id;
public String name;
public String password;
public int age;
public Integer salary;
public String department;
public Employee(int id, String name, String password, int age, Integer salary, String department) {
this.id = id;
this.name = name;
this.password = password;
this.age = age;
this.salary = salary;
this.department = department;
}
public Employee() {
}
@Override
public String toString() {
return "Employee{" +
"id=" + id +
", name='" + name + '\'' +
", password='" + password + '\'' +
", age=" + age +
", salary=" + salary +
", department='" + department + '\'' +
'}';
}
public Integer getSalary() {
return salary;
}
public void setSalary(Integer salary) {
this.salary = salary;
}
public String getDepartment() {
return department;
}
public void setDepartment(String department) {
this.department = department;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
模拟生成数据,这里是每秒生成一条数据到kafka
package com.zuoan;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerTest {
public static void main(String[] args) {
Producer();
}
public static void Producer() {
String broker = "localhost:9092";
String topic = "demo";
Properties props = new Properties();
props.put("bootstrap.servers", broker);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
String[] depLists = new String[5];
depLists[0] = "行政部";
depLists[1] = "账务部";
depLists[2] = "市场部";
depLists[3] = "技术部";
depLists[4] = "销售部";
Random rand = new Random(300);
Gson gson = new Gson();
for (int i = 1; i <= 1000; i++) {
Employee employee = new Employee(i, "user" + i, "password" + i, rand.nextInt(40) + 20, (rand.nextInt(300) + 1) * 100, depLists[rand.nextInt(5)]);
String temp = gson.toJson(employee).toString();
ProducerRecord record = new ProducerRecord<String, String>(topic, null, "user" + i, temp);
producer.send(record);
System.out.println("发送数据: " + temp);
try {
Thread.sleep(1 * 1000); //发送一条数据 sleep
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.flush();
}
}
自定义sink,创建数据库连接池并批量写入Mysql
package com.zuoan;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
/**
* 批量数据存入Mysql
*/
public class SinkToMySQL extends RichSinkFunction<List<Employee>> {
PreparedStatement ps;
BasicDataSource dataSource;
private Connection connection;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dataSource = new BasicDataSource();
connection = getConnection(dataSource);
String sql = "insert into employee(id, name, password, age, salary, department) values(?, ?, ?, ?, ?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(List<Employee> value, Context context) throws Exception {
//遍历数据集合
for (Employee employee : value) {
ps.setInt(1, employee.getId());
ps.setString(2, employee.getName());
ps.setString(3, employee.getPassword());
ps.setInt(4, employee.getAge());
ps.setInt(5, employee.getSalary());
ps.setString(6, employee.getDepartment());
ps.addBatch();
}
int[] count = ps.executeBatch();//批量后执行
System.out.println("成功了插入了" + count.length + "行数据");
}
private static Connection getConnection(BasicDataSource dataSource) {
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
//注意,替换成自己本地的 mysql 数据库地址和用户名、密码
dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8"); //test为数据库名
dataSource.setUsername("root"); //数据库用户名
dataSource.setPassword("root"); //数据库密码
//设置连接池的一些参数
dataSource.setInitialSize(10);
dataSource.setMaxTotal(50);
dataSource.setMinIdle(2);
Connection con = null;
try {
con = dataSource.getConnection();
System.out.println("创建连接池:" + con);
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
主体函数
主要实现读取kafka消息,使用map和gson把JSON消息转换为Employee对象数据流
建立一分钟的窗口聚合employee数据
最后调用自定义sink存入到mysql
package com.zuoan;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaSinkMysql {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
//props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "latest");
SingleOutputStreamOperator<Employee> empStream = env.addSource(new FlinkKafkaConsumer011<String>(
"demo", //这个 kafka topic 需和生产消息的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1)
.map(new MapFunction<String, Employee>() {
@Override
public Employee map(String string) throws Exception {
Gson gson = new Gson();
return gson.fromJson(string,Employee.class);
}
}); //,解析字符串成JSON对象
//开个一分钟的窗口去聚合
empStream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Employee, List<Employee>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Employee> values, Collector<List<Employee>> out) throws Exception {
ArrayList<Employee> employees = Lists.newArrayList(values);
if (employees.size() > 0) {
System.out.println("1 分钟内收集到 employee 的数据条数是:" + employees.size());
out.collect(employees);
}
}
}).addSink(new SinkToMySQL());
//empStream.print(); //调度输出
env.execute("flink kafka to Mysql");
}
}
运行测试
运行主函数KafkaSinkMysql
然后运行KafkaProducerTest发送数据
查看数据库
实例运行成功
总结
本文实例了读取批量数据然后写入数据库mysql,当时也可以写入到ES、redis等。这个实例适合在对数据库实时性要求不高,或者是准实时数据分析时的场景。如果数据量大的话,聚合一分钟数据达万条,那么这样批量写会比单条性能提高很多倍。
版权声明:本文为CSDN博主「左岸天涯」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_44575542/article/details/88597378
更多推荐
所有评论(0)