SparkStreaming消费Kafka数据,实时写入HDFS,Java版本
SparkStreaming消费Kafka数据入HDFS
·
前言:当前为实时数仓,Lambda 架构,相比于目前比较主流的Kappa 架构相比,Lambda机构相对十分稳定。但是系统运维,维护十分繁琐。 两种架构对比。
博主代码样例如下:
package com.dataInPutHdfs;
import com.utils.DateUtil;
import com.utils.RedisUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;
import java.util.*;
/**
* sparkStreaming消费kafka数据每1分钟
*/
public class Hl_MWD_Data_InputHdfs {
public static Boolean ExceptionFlag = true;
private static String topics = "topics"; //指定topic
private static String groupId = "groupId001"; //指定消费者组id
private static String Hl_MWD_InputHdfs = "InputHdfs_offset";
private static String brokers = "IP:9092,IP:9092,IP:9092"; //指定kafka地
public static void main(String[] args) {
//设置hadoop 文件备份为1,Hadoop 系统默认3份 减少数据同步延迟性
Configuration hdfs = new Configuration();
hdfs.set("dfs.replication","1");
//初始化spark 并且优化spark
SparkConf conf = new SparkConf().setAppName("Hl_MWD_Data_InputHdfs");
conf.set("spark.dynamicAllocation.enabled", "false")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//启用反压
.set("spark.streaming.backpressure.enabled", "true")
//最小条数
.set("spark.streaming.backpressure.pid.minRate","1")
//最大条数
.set("spark.streaming.kafka.maxRatePerPartition","2000")
//开启资源动态调用
.set("spark.speculation", "true");
System.setProperty("es.set.netty.runtime.available.processors", "false");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(60));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", "false");
HashMap<TopicPartition, Long> mapTopic = new HashMap<>();
Boolean flag = RedisUtil.FlagExits(Hl_MWD_InputHdfs, 1);
JavaInputDStream<ConsumerRecord<String, String>> messages = null;
String[] s =null;
String offsetlast =null;
if (flag) {
Map<String, String> offsets = RedisUtil.getAll(Hl_MWD_InputHdfs, 1);
for (Map.Entry<String, String> entry : offsets.entrySet()) {
String partition = entry.getKey();
String offset = entry.getValue();
//偏移量切割 将下标为0 数据拿到 为当前kafak 偏移量
s = offset.split("_", -1);
offsetlast = s[0];
TopicPartition topicPartition = new TopicPartition(topics, Integer.valueOf(partition));
mapTopic.put(topicPartition, Long.valueOf(offsetlast));
}
messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams, mapTopic));
} else {
messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams));
}
//讲获得的kafka数据转换为K,V格式对象
JavaPairDStream<String, String> lines = messages.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
String[] lst = record.key().split("_");
if(record.key().startsWith("BASIC") || record.key().startsWith("DATA")){
return new Tuple2<>(record.key(), lst[0]+ "_"+ lst[1] + "|" + record.value());
}else{
return new Tuple2<>(record.key(), lst[0]+ "_"+ lst[1] + "_"+ lst[2] + "|" + record.value());
}
}
});
//根据接收到的数据的key按照每天一个文件夹的方式保存到/origin_data/目录下 2个文件
lines.repartition(2).saveAsHadoopFiles("/origin_data/a","", Text.class,Text.class, TextOutputFormat.class);
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> v3) throws Exception {
v3.repartition(1).foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, String>> st) throws Exception {
HashMap<String, String> redisMapOk = new HashMap<>();
HashMap<String, String> redisMapErro = new HashMap<>();
String time = DateUtil.formatDateString(new Date(), DateUtil.DATE_FORMAT_12W);
OffsetRange[] offsetRanges = ((HasOffsetRanges) v3.rdd()).offsetRanges();
for (OffsetRange offsetRange : offsetRanges) {
//记录正确的偏移量 如果没有发生错误,则记录当前偏移量的结束位置,因为起始位置已经数据入库, 下次从上次的结束开始
redisMapOk.put(String.valueOf(offsetRange.partition()), offsetRange.untilOffset()+"_"+time+"_OK");
//记录错误的的偏移量 因为异常插入, 所以记录当前偏移量的起始位置。
redisMapErro.put(String.valueOf(offsetRange.partition()),offsetRange.fromOffset()+"_"+time+"_ERROR");
}
//当数据为空 不对数据添加到redis 然后减少redis 的压力
if(st.hasNext()){
if (ExceptionFlag) {
RedisUtil.PutAll(Hl_MWD_InputHdfs, redisMapOk, 1);
} else {
RedisUtil.PutAll(Hl_MWD_InputHdfs, redisMapErro, 1);
}
}
}
});
}
});
ssc.start();
try {
ssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述文件以每分钟形式写入HDFS ,需要用Azkaban定时,每小时对小文件进行合并追加到一个大文件,脚本如下
#将当前文件夹下的数据,按照每分钟小文件合并,并且追加到指定服务器临时文件夹下。
#/bin/bash
num=`hdfs dfs -ls /origin_data|wc -l`
a=1
for i in `hdfs dfs -ls /origin_data|awk '{print $8}'`
do
if [ $a == $num ]
then
echo "===========================================zuihouyige"
break
else
echo $i
echo $a
hdfs dfs -cat $i/* >> /home/work_space/combine/data_temp.txt
hdfs dfs -rm -r -skipTrash $i
a=`expr $a + 1`
fi
done
在每日离线数据更新的时候,对数据进行上传HDFS,同时对文件进行数据清空更新,脚本如下:
hadoop fs -put /home/work_space/combine/data_temp.txt /everyday_file
cd /home/work_space/combine/data/
cat /dev/null > /home/work_space/combine/data_temp.txt
POM坐标如下
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<spark.version>2.3.0</spark.version>
<scala.version>2.11.8</scala.version>
<!--<scala.version>2.10.0</scala.version>-->
<elasticsearch.version>6.6.1</elasticsearch.version>
</properties>
<dependencies>
<!-- spark start -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<!--<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>-->
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>1.1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.23</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.5.4</version>
</dependency>
更多推荐
已为社区贡献3条内容
所有评论(0)