之前我们讲过kafka-flink-es的场景,本次我们讲解kafka->flink-mySql,即数据采集存储到kafka,通过flink消费kafka数据,实时计算,结果存储到mySql,这个场景项目接处也是非常多,因为数据很多时候要存储到数据库,下面介绍具体实现过程。

环境搭建

flink参考 Flink环境搭建,令人惊愕的HA,mySql自行安装。

代码实现

1、pom.xml 引入下面的包

<properties>    <flink.version>1.13.0</flink.version>     <scala.binary.version>2.11</scala.binary.version>    <java.version>1.8</java.version></properties><dependency>    <groupId>org.apache.commons</groupId>    <artifactId>commons-dbcp2</artifactId>    <version>2.1.1</version></dependency><dependency>    <groupId>mysql</groupId>    <artifactId>mysql-connector-java</artifactId>    <version>6.0.6</version></dependency><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.4.0</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>    <version>${flink.version}</version></dependency>

2、启动kafka后,在kafka服务端产生消息。

图片

3、Flink读取kafka写入mySql,读取kafka我们用,flink-kafka-connector用来连接kafka,用于消费kafka的数据, 并传入给下游的算子,flink-kafka-connector官方文档,

(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/)已经有介绍,传入相关的配置, 创建consumer对象, 并调用addsource即可。

现在整体的实现过程如下:首先是读取kafka

package operator.mysql;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import java.util.Properties;public class MysqlSinkTest {   //kafka to mysql    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(5000);        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        env.setParallelism(3);        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "192.168.244.129:9092");        properties.setProperty("group.id", "test1");        properties.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");        properties.setProperty("max.poll.records","1000");        properties.setProperty("max.partition.fetch.bytes","5242880");        //创建kafak消费者,获取kafak中的数据        DataStream<String> stream = env                .addSource(new FlinkKafkaConsumer<>("kafkaInfo", new SimpleStringSchema(), properties));        DataStream<String> sum = stream.flatMap(new FlatMapFunction<String,String>() {            @Override            public void flatMap(String str, Collector<String> collector) throws Exception {                String[] arr = str.split(" ");                for (String s : arr) {                    collector.collect(s);                }            }        });        sum.print();        sum.addSink(new MysqlSink());        env.execute("data to mysql start ");    }}

sink到mySql

package operator.mysql;import operator.dt.Student;import org.apache.commons.dbcp2.BasicDataSource;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.SQLException;import java.util.List;
public class MysqlSink extends        RichSinkFunction<String> {    private Connection connection;    private String username = "root";    private String password = "123456";    private PreparedStatement ps;    private BasicDataSource dataSource;    /**     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接     *     * @param parameters     * @throws Exception     */    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        dataSource = new BasicDataSource();        connection = getConnection(dataSource);        String sql = "insert into t_kafka_info(num) values(?);";        ps = this.connection.prepareStatement(sql);    }    @Override    public void close() throws Exception {        super.close();        //关闭连接和释放资源        if (connection != null) {            connection.close();        }        if (ps != null) {            ps.close();        }    }    /**     * 每条数据的插入都要调用一次 invoke() 方法     * @param value     * @param context     * @throws Exception     */    @Override    public void invoke(String value, Context context){        System.out.println("--invoke--"+value);        int[] count = new int[0];//批量后执行        try {            //遍历数据集合            ps.setString(1, value);            ps.addBatch();            count = ps.executeBatch();        } catch (SQLException e) {            e.printStackTrace();        }        System.out.println("成功了插入了" + count.length + "行数据");    }
    private static Connection getConnection(BasicDataSource dataSource) {        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");        dataSource.setUrl("jdbc:mysql://localhost:3307/test?serverTimezone=UTC&characterEncoding=utf-8");        dataSource.setUsername("root");        dataSource.setPassword("123456");        //设置连接池的一些参数        dataSource.setInitialSize(10);        dataSource.setMaxTotal(50);        dataSource.setMinIdle(2);        Connection con = null;        try {            con = dataSource.getConnection();            System.out.println("创建连接池:" + con);        } catch (Exception e) {            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());        }        return con;    }}

下面类图展示了RichSinkFunction的继承和实现类图,RichFunction

图片

    从测试代码中可以很清晰的看出Flink的逻辑:Source->Transformation->Sink,可以在addSource到addSink之间加入我们的业务逻辑算子。同时这里必须注意env.execute("Flink cost DB data to write Database");这个必须有而且必须要放到结尾,否则整个代码是不会执行的。

   open,invoke,close,其中open方法是建立与关系型数据库的链接,这里其实就是普通的jdbc链接及mysql的地址,端口,库等信息。invoke方法是读取数据存入mysql,close就是关闭连接。

如果觉得文章能帮到您,欢迎关注微信公众号:“蓝天Java大数据” ,共同进步!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐