需求

首先从kafka中读取数据,然后从mysql中读取数据,然后将这两个数据进行合并处理。

环境

  • Flink 1.8.2

实现

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        String topic = "mytest";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "swarm-manager:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
		FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
        //接收kafka
        DataStreamSource<String> kafkaDataSource = executionEnvironment.addSource(flinkKafkaConsumer);
		SingleOutputStreamOperator<String> kafkaData = data.map(new MapFunction<String, String>() {....});
		//接收mysql
		DataStreamSource<HashMap<String, String>> mysqlData = executionEnvironment.addSource(new MySqlSource());
}

上面可以实现分别从mysql和kafka中获取数据,并且都设置好数据源。
其中mysql数据源配置如下:

package com.vincent.project;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;

public class MySqlSource extends RichParallelSourceFunction<HashMap<String, String>> {

    private PreparedStatement ps;
    private Connection connection;


    // 用来建立连接
    @Override
    public void open(Configuration parameters) throws Exception {
        connection = getConnection();
        String sql = "select user_id, domain from user_domain_config";
        ps = this.connection.prepareStatement(sql);
        System.out.println("open");
    }


    @Override
    public void close() throws Exception {
        if (ps != null) {
            ps.close();
        }
        if (connection != null) {
            connection.close();
        }

    }

    private Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            String url = "jdbc:mysql://swarm-manager:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8";
            String username = "root";
            String password = "123456";
            con = DriverManager.getConnection(url, username, password);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }

    /**
     * 此处是代码的关键,要从mysql表中,把数据读取出来,转成Map进行数据的封装
     * @param sourceContext
     * @throws Exception
     */
    @Override
    public void run(SourceContext<HashMap<String, String>> sourceContext) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        HashMap<String, String> map = new HashMap<>();
        while (resultSet.next()) {
            String user_id = resultSet.getString("user_id");
            String domain = resultSet.getString("domain");
            map.put(domain, user_id);
        }
        sourceContext.collect(map);
    }


    @Override
    public void cancel() {

    }
}

连接两个数据源:

// CoFlatMapFunction 的第一个类型是logData的数据类型,第二个类型是Mysql的数据类型,第三个类型是输出类型
        SingleOutputStreamOperator<String> connectData = logData.connect(mysqlData).flatMap(new CoFlatMapFunction<Tuple3<Long, String, String>, HashMap<String, String>, String>() {

            HashMap<String, String> userDomainMap = new HashMap<>();

            //处理logData的
            @Override
            public void flatMap1(Tuple3<Long, String, String> longStringStringTuple3, Collector<String> collector) throws Exception {
                String domain = longStringStringTuple3.f1;
                String userId = userDomainMap.getOrDefault(domain, "");
                System.out.println("userID:" + userId);
                collector.collect(longStringStringTuple3.f0 + "\t" + longStringStringTuple3.f1 + "\t" + longStringStringTuple3.f2 + "\t" + userId);

            }

            //处理mysql的
            @Override
            public void flatMap2(HashMap<String, String> stringStringHashMap, Collector<String> collector) throws Exception {
                userDomainMap = stringStringHashMap;

            }
        });
        connectData.setParallelism(1).print();

使用connector链接两个数据源

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐