下列代码,涉及到数据Kafka接入,数据Spark算子数据处理,Kafka偏移量记录,数据反压,数据批量插入MySql等所有操作步骤。

package com.e_data;

import com.util.DateUtil;
import com.util.JDBCUtils3;
import com.util.RedisUtil;
import org.apache.commons.collections.IteratorUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.*;

/**
 * 最稳定版本
 * 作者 liuwunan   可以保证数据一点不会丢失。
 */

public class HuaLong_RealTime_ETC implements Serializable {

    //设置异常标记,如果发生异常,则记录起始偏移量,如果正常 则记录解释偏移量
    public static Boolean ExceptionFlag = true;
    //配置kafka参数(节点、消费者组、topic)
    private static String topics = "topics"; //指定topic
    private static String groupId = "groupId";//指定消费者组id
    private static String offset = "offset";
    private static String brokers = "IP:9092,IP:9092,IP:9092";//指定kafka地

    public static void main(String[] args)  {
            //设置hadoop 文件备份为1,Hadoop 系统默认3份  减少数据同步延迟性
            //Configuration hdfs = new Configuration();
            //hdfs.set("dfs.replication","1");
            SparkConf conf = new SparkConf().setAppName("RealTimeE")
            .set("spark.dynamicAllocation.enabled", "false");//关闭资源动态添加
            conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
            conf.setMaster("local[5]"); 			//设置线程数 禁止使用1 可以使用 *
            conf.set("spark.streaming.backpressure.enabled", "true");//启用反压
            conf.set("spark.streaming.backpressure.pid.minRate","1");//最小条数
            conf.set("spark.streaming.kafka.maxRatePerPartition","2000");//最大条数
            conf.set("spark.speculation", "true");//开启资源动态调用
            JavaSparkContext sc = new JavaSparkContext(conf);//初始化sparkStreaming对象
            JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(60)); //每一分钟调用一批次数据
            //配置kafka参数(节点、消费者组、topic)
            Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            //每次程序启动获取最新的消费者偏移量 earliest
            kafkaParams.put("auto.offset.reset", "latest");
            //消费之偏移量自动提交
            kafkaParams.put("enable.auto.commit", "false");
            HashMap<TopicPartition, Long> mapTopic = new HashMap<>();
            JavaInputDStream<ConsumerRecord<String, String>> messages =null;
            Boolean flag = RedisUtil.FlagExits(etc_data_offset, 1);
            if(flag){
                Map<String, String> offsets = RedisUtil.getAll(etc_data_offset, 1);
                for (Map.Entry<String, String> entry : offsets.entrySet()) {
                    String partition = entry.getKey();
                    String offset = entry.getValue();
                    //截取去掉时间 只传入偏移量 避免程序错误
                    String[] s = offset.split("_", -1);
                    String offset_last = s[0];
                    TopicPartition topicPartition = new TopicPartition(topics, Integer.valueOf(partition));
                    mapTopic.put(topicPartition, Long.valueOf(offset_last));
                }
                messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams, mapTopic));
            }else{
                System.out.println("重头消费 最新消费");
                messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams));
            }

            JavaDStream<ConsumerRecord<String, String>> v1 = messages.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
                @Override
                public Boolean call(ConsumerRecord<String, String> v1) throws Exception {
                    //判断数据
                    if((v1.key().startsWith("E_data")) &&  (10 == v1.value().split(",",-1).length)) {
                        return true;
                    }
                    return false;
                }
            });

            //数据规范处理,抽取各数据源需要的字段
            JavaPairDStream<String, ETC_Data> v2 = v1.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, ETC_Data>() {
                @Override
                public Tuple2<String, ETC_Data> call(ConsumerRecord<String, String> v) throws Exception {
                    E_Data e_data = new E_Data();
                    //iarea,device_id,x,y,car_color,serial_num,idcard_num,car_num,times
                    String[] split = v.value().split(",", -1);
                    etc_data.setArea(split[0]);
                    etc_data.setDevice_id(split[1]);
                    etc_data.setX(split[2]);
                    etc_data.setY(split[3]);
                    etc_data.setCar_color(split[4]);
                    etc_data.setSerial_num(split[5]);
                    etc_data.setIdcard_num(split[6]);
                    etc_data.setCar_num(split[7]);
                    etc_data.setUser_name(split[8]);
                    etc_data.setTimes(split[9]);
                    return new Tuple2("E_DATA",etc_data);
                }
            });

            JavaPairDStream<String, Iterable<ETC_Data>> V3 = v2.groupByKey();
            V3.foreachRDD(new VoidFunction<JavaPairRDD<String, Iterable<ETC_Data>>>() {
                @Override
                public void call(JavaPairRDD<String, Iterable<ETC_Data>> v4) throws Exception {
                    v4.repartition(1).foreachPartition(new VoidFunction<Iterator<Tuple2<String, Iterable<ETC_Data>>>>() {
                        @Override
                        public void call(Iterator<Tuple2<String, Iterable<ETC_Data>>> v5) throws Exception {
                            Connection conn = JDBCUtils3.getConnection();
                            PreparedStatement pstm = null;
                            while (v5.hasNext()) {
                                Tuple2<String, Iterable<ETC_Data>> next = v5.next();
                                List<ETC_Data> list = IteratorUtils.toList(next._2.iterator());
                                    String sql = "INSERT INTO punlic_xxxxxxxx.xxxxxxxx(xxxxxxxx,xxxxxxxx,xxxxxxxx,x,y,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx,xxxxxxxx) values(?,?,?,?,?,?,?,?,?,?,?)";
                                    ETC_Data ed = null;
                                    try {
                                        pstm = conn.prepareStatement(sql);
                                        // 设置事务为手动
                                        conn.setAutoCommit(false);
                                        // 批量添加
                                        for (int i = 0; i < list.size(); i++) {
                                            ed = list.get(i);
                                            pstm.setString(1, UUID.randomUUID().toString());
                                            pstm.setString(2, ed.getArea());
                                            pstm.setString(3, ed.getDevice_id());
                                            pstm.setString(4, ed.getX());
                                            pstm.setString(5, ed.getY());
                                            pstm.setString(6, ed.getCar_color());
                                            pstm.setString(7, ed.getSerial_num());
                                            pstm.setString(8, ed.getIdcard_num());
                                            pstm.setString(9, ed.getUser_name());
                                            pstm.setString(10, ed.getCar_num());
                                            pstm.setString(11, ed.getTimes());
                                            pstm.addBatch();
                                        }
                                        pstm.executeBatch();
                                        conn.commit();
                                    } catch (Exception e) {
                                        //如果发生异常将 标记切换 解决 数据重复问题
                                        ExceptionFlag = false;
                                        e.printStackTrace();
                                        throw new RuntimeException(e);
                                    } finally {
                                        JDBCUtils3.close(pstm, conn);
                                    }
                                }
                        }
                    });
                }
            });

        messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> v3) throws Exception {
                v3.repartition(1).foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
                    @Override
                    public void call(Iterator<ConsumerRecord<String, String>> st) throws Exception {
                        String time = com.util.DateUtil.formatDateString(new Date(), DateUtil.DATE_FORMAT_12W);
                        HashMap<String, String> redisMapOk = new HashMap<>();
                        HashMap<String, String> redisMapErro = new HashMap<>();
                        OffsetRange[] offsetRanges = ((HasOffsetRanges) v3.rdd()).offsetRanges();
                        for (OffsetRange offsetRange : offsetRanges) {
                            //将时间拼接 添加到redis 数据库可以准备记录当前数据 偏移量消费的时间
                            //记录正确的偏移量 如果没有发生错误,则记录当前偏移量的结束位置,因为起始位置已经数据入库, 下次从上次的结束开始
                            redisMapOk.put(String.valueOf(offsetRange.partition()), offsetRange.untilOffset()+"_"+time+"_OK");
                            //记录错误的的偏移量 因为异常插入, 所以记录当前偏移量的起始位置。
                            redisMapErro.put(String.valueOf(offsetRange.partition()), offsetRange.fromOffset()+"_"+time+"_ERROR");
                        }

                        //当数据为空 不对数据添加到redis 然后减少redis 的压力
                        if(st.hasNext()){
                            if (ExceptionFlag) {
                               // System.out.println("正确偏移量~~~~~~~~~~~ 提交" + redisMapOk);
                                RedisUtil.PutAll(offset, redisMapOk, 1);
                            } else {
                               // System.out.println("错误偏移量~~~~~~~~~~~" + redisMapErro);
                                RedisUtil.PutAll(offset, redisMapErro, 1);
                            }
                        }
                    }
                });
            }
        });

            ssc.start();
        try {
            ssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐