springboot+kafka+SparkStreaming
1.springboot+kafka的注解、客户端,消费数据为ConsumerRecord<?, ?> record对象,通过record获取:参考链接:https://blog.csdn.net/weixin_39249427/article/details/100012629kafka+SparkStreaming:首先说一下。kafka消费Stream...
·
1.springboot+kafka的注解、客户端,消费数据为ConsumerRecord<?, ?> record对象,通过record获取:
参考链接:https://blog.csdn.net/weixin_39249427/article/details/100012629
kafka+SparkStreaming:
首先说一下。kafka消费Streaming的两种方式,Receiver方式KafkaUtils.createStream()和KafkaUtils.createDirectStream(jsc, String.class,String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics)直连方式
Receiver接收固定时间间隔的数据(放在内存中的),使用kafka高级API,自动维护偏移量, 达到固定时间才能进行处理,效率低下(),并且容易丢失数据 Direct直连方式,相当于直连连接到Kafka的分区上,使用Kafka底层API,效率高,需要自己维护偏移量 Receiver方式和直连的方式: 如果说这两种方式设置的时间间隔是5秒的话,Receiver方式拉去的数据是等待5秒,然后将这5秒的数据 全部一次性拿过来进行操作,这样等待5秒的时间,如果数据量特变大的情况下,杯子容量不够,会造成数据丢失。 Direct直连方式,这种方式是直接连接到kafka对应的分区上,如果kafka中有数据的话,就直接从kafka中拉取数据, 不用等待5秒的时间才拉取数据,等到了5秒的时间才去执行一次数据。
参考理论文章: https://www.cnblogs.com/hdfs/p/9971761.html
推荐:
KafkaUtils.createDirectStream(jsc, String.class,String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics)直连方式
package com.wxrem.controller;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConversions;
import java.util.*;
public class KafkaSparkStreamDemo {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSparkStreamDemo.class);
public static void main(String[] args) {
String topics = "hello"; // 主题
long seconds = 10; // 批次时间(单位:秒)
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount")
//本地调试
.setMaster("local[4]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(seconds));
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = getKafkaParams();
//earliest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// kafkaParams.put("kafka.consumer.auto-offset-reset","earliest");
final String groupId = kafkaParams.get("group.id");
// 创建kafka管理对象
final KafkaCluster kafkaCluster = getKafkaCluster(kafkaParams);
// 初始化offsets
Map<TopicAndPartition, Long> fromOffsets = fromOffsets(topicsSet, kafkaParams, groupId, kafkaCluster, null);
// 创建kafkaStream
JavaInputDStream<String> stream = KafkaUtils.createDirectStream(jssc,
String.class, String.class, StringDecoder.class,
StringDecoder.class, String.class, kafkaParams,
fromOffsets,
new Function<MessageAndMetadata<String, String>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
return v1.message();
}
});
stream.print();
stream.foreachRDD(record ->{
LOG.info("start consumer*****************!");
List<String> collect = record.collect();
System.out.println(collect.toString());
});
// 存储offsets
storeConsumerOffsets(groupId, kafkaCluster, stream);
// Start the computation
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static Map<String, String> getKafkaParams() {
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "tpsc01");
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
kafkaParams.put("zookeeper.connect", "127.0.0.1:2181");
return kafkaParams;
}
/**
* @param groupId 消费者 组id
* @param kafkaCluster kafka管理对象
* @param stream kafkaStreamRdd
*/
private static <T> void storeConsumerOffsets(final String groupId, final KafkaCluster kafkaCluster, JavaInputDStream<T> stream) {
long l = System.currentTimeMillis();
stream.foreachRDD(new VoidFunction<JavaRDD<T>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<T> javaRDD) throws Exception {
// 根据group.id 存储每个partition消费的位置
OffsetRange[] offsets = ((HasOffsetRanges) javaRDD.rdd()).offsetRanges();
for (OffsetRange o : offsets) {
// 封装topic.partition 与 offset对应关系 java Map
TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<>();
topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());
// 转换java map to scala immutable.map
scala.collection.immutable.Map<TopicAndPartition, Object> scalaTopicAndPartitionObjectMap =
JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap).toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
return v1;
}
});
// 更新offset到kafkaCluster
kafkaCluster.setConsumerOffsets(groupId, scalaTopicAndPartitionObjectMap);
}
}
});
// 记录处理时间
LOG.info("storeConsumerOffsets time:" + (System.currentTimeMillis() - l));
}
/**
* 将kafkaParams转换成scala map,用于创建kafkaCluster
*
* @param kafkaParams kafka参数配置
* @return kafkaCluster管理工具类
*/
private static KafkaCluster getKafkaCluster(Map<String, String> kafkaParams) {
// 类型转换
scala.collection.immutable.Map<String, String> immutableKafkaParam = JavaConversions
.mapAsScalaMap(kafkaParams)
.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> apply(Tuple2<String, String> v1) {
return v1;
}
});
return new KafkaCluster(immutableKafkaParam);
}
/**
* 获取partition信息,并设置各分区的offsets
*
* @param topicsSet 所有topic
* @param kafkaParams kafka参数配置
* @param groupId 消费者 组id
* @param kafkaCluster kafka管理对象
* @param offset 自定义offset
* @return offsets
*/
private static Map<TopicAndPartition, Long> fromOffsets(HashSet<String> topicsSet, Map<String, String> kafkaParams, String groupId, KafkaCluster kafkaCluster, Long offset) {
long l = System.currentTimeMillis();
// 所有partition offset
Map<TopicAndPartition, Long> fromOffsets = new HashMap<>();
// util.set 转 scala.set
scala.collection.immutable.Set<String> immutableTopics = JavaConversions
.asScalaSet(topicsSet)
.toSet();
// kafkaCluster获取topic分区信息,这里的topic的分区大于1为right,否则 java.util.NoSuchElementException: Either.right.value on Left
scala.collection.immutable.Set<TopicAndPartition> scalaTopicAndPartitionSet = kafkaCluster
.getPartitions(immutableTopics)
.right()
.get();
if (offset != null || kafkaCluster.getConsumerOffsets(kafkaParams.get("group.id"),
scalaTopicAndPartitionSet).isLeft()) {
// 等于空则设置为0
offset = (offset == null ? 0L : offset);
// 设置每个分区的offset
scala.collection.Iterator<TopicAndPartition> iterator = scalaTopicAndPartitionSet.iterator();
while (iterator.hasNext()) {
fromOffsets.put(iterator.next(), offset);
}
} else {
// 往后继续读取
scala.collection.Map<TopicAndPartition, Object> consumerOffsets = kafkaCluster
.getConsumerOffsets(groupId,
scalaTopicAndPartitionSet).right().get();
scala.collection.Iterator<Tuple2<TopicAndPartition, Object>> iterator = consumerOffsets.iterator();
while (iterator.hasNext()) {
Tuple2<TopicAndPartition, Object> next = iterator.next();
offset = (long) next._2();
fromOffsets.put(next._1(), offset);
}
}
// 记录处理时间
LOG.info("fromOffsets time:" + (System.currentTimeMillis() - l));
return fromOffsets;
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)