pom.xml依赖项:

		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc</artifactId>
            <version>1.6.1</version>
        </dependency>

配置文件Baseconf

package com.conf;

public class BaseConf {
    public static final String USERNAME = "postgres";
    public static final String PASSWORD = "passwd";
    public static final String DRIVERNAME = "org.postgresql.Driver";
    public static final String URL = "jdbc:postgresql://192.168.108.***:5432/***";
}

写入postgresql数据库
https://blog.csdn.net/weixin_43315211/article/details/88354331

package com.sink;

import com.conf.BaseConf;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class PostSQLSink extends RichSinkFunction<Tuple5<String, String,String,String,String>> {
    private static final long serialVersionUID = 1L;
    private Connection connection;
    private PreparedStatement preparedStatement;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName(BaseConf.DRIVERNAME);
        connection = DriverManager.getConnection(BaseConf.URL, BaseConf.USERNAME, BaseConf.PASSWORD);
        String sql = "insert into public.log_info(ip,time,courseid,status_code,referer) values (?,?,?,?,?)";
        preparedStatement = connection.prepareStatement(sql);
        super.open(parameters);
    }

    @Override
    public void invoke(Tuple5<String, String, String, String, String> value,Context context) {
        try {
            preparedStatement.setString(1, value.f0);
            preparedStatement.setString(2, value.f1);
            preparedStatement.setString(3, value.f2);
            preparedStatement.setString(4, value.f3);
            preparedStatement.setString(5, value.f4);
            System.out.println("Start insert");
            preparedStatement.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {

        if (preparedStatement != null) {
            preparedStatement.close();
        }

        if (connection != null) {
            connection.close();
        }

    }
}

主函数:

package com.source;

import com.sink.PostSQLSink;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;


import java.util.Properties;

public class FlinkCleanKafka {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);  //检查点 每5000ms
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "master01:9092");//kafka的节点的IP或者hostName,多个使用逗号分隔
        properties.setProperty("zookeeper.connect", "master01:2181");//zookeeper的节点的IP或者hostName,多个使用逗号进行分隔
        properties.setProperty("group.id", "test22222");//flink consumer flink的消费者的group.id


        FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>("flink_test"
                , new SimpleStringSchema()
                , properties);
//        myConsumer.setStartFromEarliest();  //最早
        myConsumer.setStartFromLatest();  //设置消费时间,最后

        DataStream<String> stream = env.addSource(myConsumer);

//        stream.print();
        DataStream CleanData = stream.map(new MapFunction<String, Tuple5<String, String, String, String, String>>() {
            @Override
            public Tuple5<String, String, String, String, String> map(String value) {
                String[] data = value.split("\\t");
//                for(String str:data){
//                    System.out.println(str);
//                }
                String CourseID = null;
                String url = data[2].split(" ")[1];
//                System.out.println("url: "+url);
                if (url.startsWith("/class")) {
                    String CourseHTML = url.split("/")[2];
                    CourseID = CourseHTML.substring(0, CourseHTML.lastIndexOf("."));
//                    System.out.println("CourseID: "+CourseID);
                }

                return Tuple5.of(data[0], data[1], CourseID, data[3], data[4]);
            }
        }).filter(new FilterFunction<Tuple5<String, String, String, String, String>>() {
            @Override
            public boolean filter(Tuple5<String, String, String, String, String> value) {
                return value.f2 != null;
            }
        });

        CleanData.print();
        CleanData.addSink(new PostSQLSink());

        env.execute("Flink kafka");
    }
}

kafka的数据生成可参考以下文档
https://blog.csdn.net/weixin_43315211/article/details/88424903

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐