实现使用低级API读取指定topic,指定partition,指定offset的数据。

1)消费者使用低级API 的主要步骤:

步骤

主要工作

1

根据指定的分区从主题元数据中找到主副本

2

获取分区最新的消费进度

3

从主副本拉取分区的消息

4

识别主副本的变化,重试

指定分区,指定offset

(1)根据partition找到leader

(2)找到offset

(3)找到leader中的副本,进行保存,方便识别主副本的变化

2)方法描述:

findLeader()

客户端向种子节点发送主题元数据,将副本集加入备用节点

getLastOffset()

消费者客户端发送偏移量请求,获取分区最近的偏移量

run()

消费者低级AP I拉取消息的主要方法

findNewLeader()

当分区的主副本节点发生故障,客户将要找出新的主副本

public class SimpleExample {

    private List<String> m_replicaBrokers = new ArrayList<>();

 

    public SimpleExample() {

        m_replicaBrokers = new ArrayList<>();

    }

 

    public static void main(String args[]) {

        SimpleExample example = new SimpleExample();

        // 最大读取消息数量

        long maxReads = Long.parseLong("3");

        // 要订阅的topic

        String topic = "test1";

        // 要查找的分区

        int partition = Integer.parseInt("0");

        // broker节点的ip

        List<String> seeds = new ArrayList<>();

        seeds.add("192.168.9.102");

        seeds.add("192.168.9.103");

        seeds.add("192.168.9.104");

        // 端口

        int port = Integer.parseInt("9092");

        try {

            example.run(maxReads, topic, partition, seeds, port);

        } catch (Exception e) {

            System.out.println("Oops:" + e);

            e.printStackTrace();

        }

    }

 

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {

        // 获取指定Topic partition的元数据

        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);

        if (metadata == null) {

            System.out.println("Can't find metadata for Topic and Partition. Exiting");

            return;

        }

        if (metadata.leader() == null) {

            System.out.println("Can't find Leader for Topic and Partition. Exiting");

            return;

        }

        String leadBroker = metadata.leader().host();

        String clientName = "Client_" + a_topic + "_" + a_partition;

//获取数据的消费者对象

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);

        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;

        while (a_maxReads > 0) {

            if (consumer == null) {

                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);

            }

//可以有多个addFetch方法,也就是这里可以传入多个topic和partition

            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();

            FetchResponse fetchResponse = consumer.fetch(req);

 

            if (fetchResponse.hasError()) {

                numErrors++;

                // Something went wrong!

                short code = fetchResponse.errorCode(a_topic, a_partition);

                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);

                if (numErrors > 5)

                    break;

                if (code == ErrorMapping.OffsetOutOfRangeCode()) {

                    // We asked for an invalid offset. For simple case ask for

                    // the last element to reset

                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);

                    continue;

                }

                consumer.close();

                consumer = null;

                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);

                continue;

            }

            numErrors = 0;

 

            long numRead = 0;

            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {

                long currentOffset = messageAndOffset.offset();

                if (currentOffset < readOffset) {

                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);

                    continue;

                }

                readOffset = messageAndOffset.nextOffset();

                ByteBuffer payload = messageAndOffset.message().payload();

 

                byte[] bytes = new byte[payload.limit()];

                payload.get(bytes);

                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));

                numRead++;

                a_maxReads--;

            }

 

            if (numRead == 0) {

                try {

                    Thread.sleep(1000);

                } catch (InterruptedException ie) {

                }

            }

        }

        if (consumer != null)

            consumer.close();

    }

 

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {

        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);

        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

        OffsetResponse response = consumer.getOffsetsBefore(request);

 

        if (response.hasError()) {

            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));

            return 0;

        }

        long[] offsets = response.offsets(topic, partition);

        return offsets[0];

    }

 

//当主leader挂了后,只需要从follower获取变成leader的主机

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {

        for (int i = 0; i < 3; i++) {

            boolean goToSleep = false;

            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);

            if (metadata == null) {

                goToSleep = true;

            } else if (metadata.leader() == null) {

                goToSleep = true;

            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {

                // first time through if the leader hasn't changed give

                // ZooKeeper a second to recover

                // second time, assume the broker did recover before failover,

                // or it was a non-Broker issue

                //

                goToSleep = true;

            } else {

                return metadata.leader().host();

            }

            if (goToSleep) {

                    Thread.sleep(1000);

            }

        }

        System.out.println("Unable to find new leader after Broker failure. Exiting");

        throw new Exception("Unable to find new leader after Broker failure. Exiting");

    }

 

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {

        PartitionMetadata returnMetaData = null;

        loop:

        for (String seed : a_seedBrokers) {

            SimpleConsumer consumer = null;

            try {

                //获取分区leader的消费者对象

                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");

                List<String> topics = Collections.singletonList(a_topic);

                TopicMetadataRequest req = new TopicMetadataRequest(topics);

                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

 

                List<TopicMetadata> metaData = resp.topicsMetadata();

                for (TopicMetadata item : metaData) {

                    for (PartitionMetadata part : item.partitionsMetadata()) {

                        if (part.partitionId() == a_partition) {

                            returnMetaData = part;

                            break loop;

                        }

                    }

                }

            } catch (Exception e) {

                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);

            } finally {

                if (consumer != null)

                    consumer.close();

            }

        }

        if (returnMetaData != null) {

            m_replicaBrokers.clear();

            for (BrokerEndPoint replica : returnMetaData.replicas()) {

                m_replicaBrokers.add(replica.host());

            }

        }

        return returnMetaData;

    }

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐