博客地址:http://www.fanlegefan.com
文章地址:http://www.fanlegefan.com/archives/kafka-low-level-consumer


kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,作为大数据系统中重要的一环,目前最新版本为kafka_2.11-0.10.2.0;在0.9.0版本后就统一了consumer api,不在区分high-level和low-level,但是在很多公司还是用的老版本的api,所以今天还是重新看看low-level api

kafka SimpleConsumer 消费数据流程图

image

topic逻辑结构图

image

根据topic逻辑结构图,我们大概来说下具体的消费流程
  • 通常情况下,一个topic都会有多个partition,这样可以增加topic的message吞吐量;同时每个parition都会有多个副本集replication(例如repl1-1,repl1-2,repl1-3),这样可以提高数据的可用性,不会因为部分节点的宕机而数据丢失
  • 一个partition有多个replication,在数据读写的时候,数据会先请求主节点或者是活跃节点,然后其他的几个slave节点通过复制的方式同步数据,所以从partition上读取数据时,首先要获得这个partition的主节点(流程图中的步骤1,2)
  • 获得partition的主节点后,然后就是要从什么位置开始读取数据,也就是从哪个offset开始读数据,通常可以指定为如下两种(当然也可以从指定的具体的offset,这就需要维护kafka的消费的偏移量,可以存在zookeeper,hdfs或者db)(步骤3,4)
    1. smallest : 从最小偏移量
    2. largest : 从当前最大的偏移量
  • getMessage(步骤5,6)
代码

在生产环境中,消费topic中的数据的完整步骤是:
1. 获得所有的parition
2. 然后获得遍历每个partition,找到每个partition的leader
3. 然后消费数据;

这里我们以消费parition0中的数据为例,跳过第一个步骤

获得partition0的leader

构造函数

class SimpleConsumer(val host: String,
                     val port: Int,
                     val soTimeout: Int,
                     val bufferSize: Int,
                     val clientId: String)

import kafka.javaapi.TopicMetadataRequest;

def this(topics: java.util.List[String]) =
    this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)

获得leader

String host = "192.168.1.115";
int port = 9092;
int soTimeout = 1000*6;
int bufferSize = 512 * 1024;
String clientId = "clientId";
String topic = "test";
int paritionid = 0;

//构造consumer,这里的host是brokers,在现实环境中broker有很多个,
// 遍历所有的broker,并获得该broker上的partition,然后获得partition的leader
SimpleConsumer simpleConsumer
        = new SimpleConsumer(host,port,soTimeout,bufferSize,clientId);

List<String> topics = Collections.singletonList(topic);
//构造TopicMetadataRequest
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);

String leader = "";

TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
for(TopicMetadata topicMetadata:topicMetadatas){
    List<PartitionMetadata> partitionMetadatas = topicMetadata.partitionsMetadata();
    for(PartitionMetadata partitionMetadata:partitionMetadatas){
        if(partitionMetadata.partitionId()==paritionid){
            leader = partitionMetadata.leader().host();
        }
    }
}

System.out.println(leader);

结果

192.168.1.115
获得offset
kafka.javaapi.OffsetRequest
kafka.common.TopicAndPartition
kafka.api.PartitionOffsetRequestInfo

class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffsetRequestInfo],
                    versionId: Short,
                    clientId: String)

kafka.api.OffsetRequest.EarliestTime()
kafka.api.OffsetRequest.LatestTime()

代码

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,paritionid);
long whichTime = kafka.api.OffsetRequest.LatestTime();

PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(whichTime,1);
Map<TopicAndPartition,PartitionOffsetRequestInfo> infoMap
        = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();
infoMap.put(topicAndPartition,partitionOffsetRequestInfo);

kafka.javaapi.OffsetRequest offsetRequest
        = new kafka.javaapi.OffsetRequest(infoMap,kafka.api.OffsetRequest.CurrentVersion(), clientId);

OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(offsetRequest);

long readFromOffset = offsetsBefore.offsets(topic,paritionid)[0];
System.out.println(readFromOffset);

偏移量

69690
读取数据

这里我们从offset=0的位置开始读数据(kafka.api.OffsetRequest.EarliestTime()),这里的readFromOffset=0

int fetchSize = 10000;

kafka.api.FetchRequest fetchRequest = new FetchRequestBuilder()
        .clientId(clientId)
        .addFetch(topic,paritionid,readFromOffset,fetchSize)
        .build();

FetchResponse fetch = simpleConsumer.fetch(fetchRequest);
ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic,paritionid);

Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
while (iterator.hasNext()){
    MessageAndOffset messageAndOffset = iterator.next();
    ByteBuffer payload = messageAndOffset.message().payload();
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    System.out.println("OFFSET:"+String.valueOf(messageAndOffset.offset()) + ",MESSAGE: " + new String(bytes));
}


OFFSET:0,MESSAGE: ssssssssssssssssssssss
OFFSET:1,MESSAGE: dddddddddddddddddddddddddddd
OFFSET:2,MESSAGE: 0
OFFSET:3,MESSAGE: 1
OFFSET:4,MESSAGE: 2
OFFSET:5,MESSAGE: 3
OFFSET:6,MESSAGE: 4
OFFSET:7,MESSAGE: 5
OFFSET:8,MESSAGE: 6
OFFSET:9,MESSAGE: 7
OFFSET:10,MESSAGE: 8
OFFSET:11,MESSAGE: 9
注意

这里代码没有进行错误处理,例如,leader切换,OffsetOutOfRangeCode等问题,这些代码只是为了了解low-level消费数据的流程,完整的low-level消费代码如下

package com.fan.kafka;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.*;


public class SimpleConsumerAPI {

    public static void main(String args[]) throws Exception {
        SimpleConsumerAPI example = new SimpleConsumerAPI();
        long maxReads = Long.parseLong(args[0]);
        String topic = args[1];
        int partition = Integer.parseInt(args[2]);
        List<String> seeds = new ArrayList<String>();
        seeds.add(args[3]);
        int port = Integer.parseInt(args[4]);
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public SimpleConsumerAPI() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
        readOffset = 26000;
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐