import kafka.api.PartitionOffsetRequestInfo;

import kafka.common.TopicAndPartition;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;

import java.util.*;

import java.util.Map.Entry;

public class KafkaOffsetTools {

public final static String KAFKA_TOPIC_NAME_ADAPTER = "sample";

public final static String KAFKA_TOPIC_NAME_EXCEPTION = "exception";

public final static String KAFKA_TOPIC_NAME_AUDIT = "audit";

private static final String rawTopicTotal = "rawTopicTotalRecordCounter";

private static final String avroTopicTotal = "avroTopicTotalRecordCounter";

private static final String exceptionTopicTotal = "exceptionTopicTotalRecordCounter";

public KafkaOffsetTools() {

}

public static long getLastOffset(SimpleConsumer consumer, String topic,

int partition, long whichTime, String clientName) {

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,

partition);

Map, PartitionOffsetRequestInfo> requestInfo = new HashMap, PartitionOffsetRequestInfo>();

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(

whichTime, 1));

kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

requestInfo, kafka.api.OffsetRequest.CurrentVersion(),

clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {

System.err.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));

return 0;

}

long[] offsets = response.offsets(topic, partition);

return offsets[0];

}

private TreeMap, PartitionMetadata> findLeader(List a_seedBrokers, String a_topic) {

TreeMap, PartitionMetadata> map = new TreeMap, PartitionMetadata>();

loop:

for (String seed : a_seedBrokers) {

SimpleConsumer consumer = null;

try {

String[] hostAndPort;

hostAndPort = seed.split(":");

consumer = new SimpleConsumer(hostAndPort[0], Integer.valueOf(hostAndPort[1]), 100000, 64 * 1024,

"leaderLookup" + new Date().getTime());

List topics = Collections.singletonList(a_topic);

TopicMetadataRequest req = new TopicMetadataRequest(topics);

kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List metaData = resp.topicsMetadata();

for (TopicMetadata item : metaData) {

for (PartitionMetadata part : item.partitionsMetadata()) {

map.put(part.partitionId(), part);

}

}

} catch (Exception e) {

System.out.println("Error communicating with Broker [" + seed

+ "] to find Leader for [" + a_topic + ", ] Reason: " + e);

} finally {

if (consumer != null)

consumer.close();

}

}

return map;

}

public static void main(String[] args) {

String kafkaBrokerList = System.getenv("metadata.broker.list");

if(kafkaBrokerList == null || kafkaBrokerList.length() == 0){

System.err.println("No config kafka metadata.broker.list,it is null .");

//for test

kafkaBrokerList = "localhost:9092,localhost:9093";

System.err.println("Use this broker list for test,metadata.broker.list="+kafkaBrokerList);

}

//init topic,logSize = 0

Map,Integer> topics = new HashMap,Integer>();

topics.put(KAFKA_TOPIC_NAME_ADAPTER,0);

topics.put(KAFKA_TOPIC_NAME_EXCEPTION,0);

topics.put(KAFKA_TOPIC_NAME_AUDIT,0);

//init kafka broker list

String[] kafkaHosts;

kafkaHosts = kafkaBrokerList.split(",");

if (kafkaHosts == null || kafkaHosts.length == 0) {

System.err.println("No config kafka metadata.broker.list,it is null .");

System.exit(1);

}

List seeds = new ArrayList();

for (int i = 0; i < kafkaHosts.length; i++) {

seeds.add(kafkaHosts[i]);

}

KafkaOffsetTools kot = new KafkaOffsetTools();

for(String topicName : topics.keySet()){

TreeMap, PartitionMetadata> metadatas = kot.findLeader(seeds, topicName);

int logSize = 0;

for (Entry, PartitionMetadata> entry : metadatas.entrySet()) {

int partition = entry.getKey();

String leadBroker = entry.getValue().leader().host();

String clientName = "Client_" + topicName + "_" + partition;

SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000,

64 * 1024, clientName);

long readOffset = getLastOffset(consumer, topicName, partition,

kafka.api.OffsetRequest.LatestTime(), clientName);

logSize += readOffset;

if (consumer != null) consumer.close();

}

topics.put(topicName,logSize);

}

System.out.println(topics.toString());

System.out.println(rawTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_ADAPTER)+" "+System.currentTimeMillis());

System.out.println(avroTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_AUDIT)+" "+System.currentTimeMillis());

System.out.println(exceptionTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_EXCEPTION)+" "+System.currentTimeMillis());

}

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐