通过MessageAndMetadata获取key和message
createDirectStream方式通过以下四步,获取key和message1. 获取Topic上次消费到的offset,信息保存在inputTopicLockMaster参数配置的zk中,即,localhost:21812. Create direct kafka stream with brokers and topics2. 利用MessageAndMetadata,重写接口PairFu
·
createDirectStream方式通过以下四步,获取key和message
1. 获取Topic上次消费到的offset,信息保存在inputTopicLockMaster参数配置的zk中,即,localhost:2181
2. Create direct kafka stream with brokers and topics
2. 利用MessageAndMetadata,重写接口PairFunction<T, K, V>的方法,获取key和message
4. 获取rdd每个元素的key、message,并打印,key可能为null
package com.reco.cmdata.batch;
import com.cloudera.oryx.lambda.Functions;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.collection.JavaConversions;
import java.io.Closeable;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* Created by root on 9/9/17.
*/
public final class DirectStreamKeyMessageIT{
private static final Logger log = LoggerFactory.getLogger(DirectStreamKeyMessageIT.class);
private static final int ZK_TIMEOUT_MSEC =
(int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
public static void main(String[] args) throws Exception {
String brokers = "localhost:9092";
String topics = "KeyMessage";
String inputTopicLockMaster = "localhost:2181";
String groupID = getGroupID();
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("group.id", groupID);
// Don't re-consume old messages from input by default
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("metadata.broker.list", brokers);
// Newer version of metadata.broker.list:
kafkaParams.put("bootstrap.servers", brokers);
//获取Topic上次消费到的offset,信息保存在inputTopicLockMaster参数配置的zk中,即,localhost:2181,第一次获取时为offsets的value为0
Map<TopicAndPartition,Long> offsets =
getOffsets(inputTopicLockMaster,
groupID,
topics);
Class<MessageAndMetadata<String,String>> streamClass =
(Class<MessageAndMetadata<String,String>>) (Class<?>) MessageAndMetadata.class;
// Create direct kafka stream with brokers and topics
JavaInputDStream<MessageAndMetadata<String,String>> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
streamClass,
kafkaParams,
offsets,
Functions.<MessageAndMetadata<String,String>>identity());
//利用MessageAndMetadata,重写接口PairFunction<T, K, V>的方法,获取key和message
JavaPairDStream<String,String> pairDStream =
messages.mapToPair(new MMDToTuple2Fn<String,String>());
//打印pairDStream
pairDStream.print();
//获取rdd每个元素的key、message,并打印,key可能为null
pairDStream.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws Exception {
if (rdd.isEmpty()) {
log.info("RDD was empty, no action");
} else {
//OffsetRange[] ranges = ((HasOffsetRanges) rdd).offsetRanges();
//log.info("ranges length {}", ranges.length);
rdd.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> x) throws Exception {
log.info("rdd element: {}", x);
}
});
}
return null;
}
});
// Start the computation
jssc.start();
jssc.awaitTermination();
}
public static final class MMDToTuple2Fn<K,M> implements PairFunction<MessageAndMetadata<K,M>,K,M> {
@Override
public Tuple2<K,M> call(MessageAndMetadata<K,M> km) {
return new Tuple2<>(km.key(), km.message());
}
}
public static Map<TopicAndPartition,Long> getOffsets(String zkServers,
String groupID,
String topic) {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupID, topic);
Map<TopicAndPartition,Long> offsets = new HashMap<>();
try (AutoZkClient zkClient = new AutoZkClient(zkServers)) {
List<Object> partitions = JavaConversions.seqAsJavaList(
ZkUtils.getPartitionsForTopics(
zkClient,
JavaConversions.asScalaBuffer(Collections.singletonList(topic))).head()._2());
for (Object partition : partitions) {
String partitionOffsetPath = topicDirs.consumerOffsetDir() + "/" + partition;
Option<String> maybeOffset = ZkUtils.readDataMaybeNull(zkClient, partitionOffsetPath)._1();
Long offset = maybeOffset.isDefined() ? Long.parseLong(maybeOffset.get()) : 0;
TopicAndPartition topicAndPartition =
new TopicAndPartition(topic, Integer.parseInt(partition.toString()));
offsets.put(topicAndPartition, offset);
}
}
return offsets;
}
// Just exists for Closeable convenience
private static final class AutoZkClient extends ZkClient implements Closeable {
AutoZkClient(String zkServers) {
super(zkServers, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, ZKStringSerializer$.MODULE$);
}
}
protected static String getLayerName() {
return "BatchLayer";
}
protected static final String getGroupID() {
return "OryxGroup-" + getLayerName();
}
}
Functions类如下
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
/**
* Utility {@link Function2} implementations.
*/
public final class Functions {
private Functions() {}
/**
* @return a function that returns the second of two values
* @param <T> element type
*/
public static <T> Function2<T,T,T> last() {
return new Function2<T,T,T>() {
@Override
public T call(T current, T next) {
return next;
}
};
}
public static <T> VoidFunction<T> noOp() {
return new VoidFunction<T>() {
@Override
public void call(T t) {
// do nothing
}
};
}
public static <T> Function<T,T> identity() {
return new Function<T,T>() {
@Override
public T call(T t) {
return t;
}
};
}
}
参考:1. get topic from kafka message in spark
2. No output after using the Spark Streaming
3. KafkaUtils.createDirectStream to a String object Spark
4. 搭建 kafka 运行环境
更多推荐
已为社区贡献5条内容
所有评论(0)