接上文:《架构设计:系统间通信(29)——Kafka及场景应用(中2)

4-5、Kafka原理:消费者

作为Apache Kafka消息队列,它的性能指标相当一部分取决于消费者们的性能——只要消息能被快速消费掉不在Broker端形成拥堵,整个Apache Kafka就不会出现性能瓶颈问题。

4-5-1、基本使用

我们首先使用Kafka Client For JAVA API为各位读者演示一下最简单的Kafka消费者端的使用。以下示例代码可以和上文中所给出的生产者代码相对应,形成一个完整的消息创建——接收——发送过程:

package kafkaTQ;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * 这是Kafka的topic消费者
 * @author yinwenjie
 */
public class KafkaConsumer_GroupOne {
    public static void main(String[] args) throws RuntimeException {
        // ==============首先各种连接属性
        // Kafka消费者的完整连接属性在Apache Kafka官网http://kafka.apache.org/documentation.html#consumerconfigs
        // 有详细介绍(请参看Old Consumer Configs。New Consumer Configs是给Kafka V0.9.0.0+使用的)
        // 这里我们设置几个关键属性
        Properties props = new Properties();
        // zookeeper相关的,如果有多个zk节点,这里以“,”进行分割
        props.put("zookeeper.connect", "192.168.61.140:2181");
        props.put("zookeeper.connection.timeout.ms", "10000");
        // 还记得上文的说明吗:对于一个topic而言,同一用户组内的所有用户只被允许访问一个分区。
        // 所以要让多个Consumer实现对一个topic的负载均衡,每个groupid的名称都要一样
        String groupname = "group2";
        props.put("group.id", groupname);

        //==============
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        // 我们只创建一个消费者
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        map.put("my_topic2", 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);

        // 获取并启动消费线程,注意看关键就在这里,一个消费线程可以负责消费一个topic中的多个partition
        // 但是一个partition只能分配到一个消费线程去
        KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);
        new Thread(new ConsumerThread(stream)).start();

        // 接着锁住主线程,让其不退出
        synchronized (KafkaConsumer_GroupTwo.class) {
            try {
                KafkaConsumer_GroupTwo.class.wait();
            } catch (InterruptedException e) {
                e.printStackTrace(System.out);
            }
        }
    }

    /**
     * @author yinwenjie
     */
    private static class ConsumerThread implements Runnable {

        private KafkaStream<byte[], byte[]> stream;

        /**
         * @param stream
         */
        public ConsumerThread(KafkaStream<byte[], byte[]> stream) {
            this.stream = stream;
        }

        @Override
        public void run() {
            ConsumerIterator<byte[], byte[]> iterator =  this.stream.iterator();
            //============这个消费者获取的数据在这里
            while(iterator.hasNext()){  
                MessageAndMetadata<byte[], byte[]> message = iterator.next();
                int partition = message.partition();
                String topic = message.topic();
                String messageT = new String(message.message());
                System.out.println("接收到: " + messageT + "来自于topic:[" + topic + "] + 第partition[" + partition + "]"); 
            }
        }
    }
}

以上代码片段有几个关键点需要进行一下说明:

  • “map.put(“my_topic2”, 1);” 这句代码表示将会为名叫“my_topic2”的队列创建数量为1的消费者。在一个进程的连接中,您可以指定创建多个topic的消费者数量。例如:
......
# 为my_topic2的队列创建数量为1的消费者
# 并且为my_topic3的队列创建数量为4的消费者
map.put("my_topic2", 1);
map.put("my_topic3", 4);
......
  • 每一个消费者都需要一个独立的线程进行工作。您可以将工作任务放入已经创建好的线程池(推荐这样做),也可以像以上代码示例中那样创建一个线程并运行任务。
......
# 使用线程池
# 这里的参数就是消费者的总数量
ExecutorService threadPool = Executors.newFixedThreadPool(1);
threadPool.execute(new ConsumerThread(stream));
......
  • 在开发过程中,消费者端无需知道任何一个Broker的位置。但是必须至少知道一个zookeeper服务节点的位置。通过这个位置,消费者端首先会去zookeeper服务上查找指定的topic的分区情况和已有的消费者情况。

4-5-2、分区与消费者负载

Apache Kafka集群中的消费者以线程为单位,如在上一小节代码段所示:我们在一个进程中,为Topic为“my_topic2”的队列创建了一个线程,这个线程就是一个消费者——属于名为“group2”的用户组。这时,Topic中所有分区的消息都会交给这个消费线程进行消费。如下图所示:

这里写图片描述

虽然一个消费者可以同时消费Topic中多个分区(Partition)的消息,但在生产环境下为了获得更优的消费性能并不建议这样做。由于单个消费者线程的处理能力是有限的,一旦出现数据洪峰,消息就会堆积在Broker端无法被处理(如果消费者端使用了线程池,则可能堆积在消费者端,这要看您怎么编写代码)。所以上一个小节那样的消费者编码方式,最多就是用来做做“Hello World”那样的示例,没有更多的使用价值了。

4-5-3、优化 一:

第一种改进方法,就是让一个消费者只消费一个分区(Partition)中的消息,且整个系统中的消费者大于等于Topic中的分区数量。设计方案如下:

这里写图片描述

如上图所示,这个Topic下一共有四个分区(Partition),对应的消费者数量也有四个,但是这四个消费者同属于一个进程,存在于同一个物理节点上。我们根据这个设计方案,更改之前消费者端的代码,如下(为了节约篇幅,只给出主要的更改位置):

......
// 后续创建的所有消费者线程,都是属于group2的消费组
String groupname = "group2";
props.put("group.id", groupname);

......
// 在这个进程中,为topic名为my_topic2的队列创建了四个消费者
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("my_topic2", 4);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);

......
// 为这四个消费者分配四个不同的线程
// 消费者线程1
KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);
new Thread(new ConsumerThread(stream)).start();

// 消费者线程2
stream = topicMessageStreams.get("my_topic2").get(1);
new Thread(new ConsumerThread(stream)).start();

// 消费者线程3
stream = topicMessageStreams.get("my_topic2").get(2);
new Thread(new ConsumerThread(stream)).start();

// 消费者线程4
stream = topicMessageStreams.get("my_topic2").get(3);
new Thread(new ConsumerThread(stream)).start();

......
// 接着锁住主线程,让其不退出
synchronized (KafkaConsumer_GroupTwo.class) {
    try {
        KafkaConsumer_GroupTwo.class.wait();
    } catch (InterruptedException e) {
        e.printStackTrace(System.out);
    }
}
......

4-5-4、优化 二:

显然“优化方案一”中的做法虽然实现了4消费者分别对应4个分区的负载均衡方案,但是受限于单个物理节点的处理性能,所以这种方案的处理性能还有进一步优化的可能。我们可以在多个节点物理节点上均匀散步这些消费者,对Topic分区中的消息进行一一对应的消费,如下图所示:

这里写图片描述

上图所示的设计思路中,我们使用了2个物理节点完成消息的消费任务,每个服务节点上开启的消费进程上有两个消费者线程。这样Topic中4个分区的消息就会被均匀分布到2个物理节点中,且每一个物理节点处理两个分区中的消息。

注意:可能您在分别启动这些消费进程的时候,由于时间上存在差异,某一台服务节点上的消费进程将暂时被分配多个分区进行消息接收。但没有关系,当这个消费者性能到达瓶颈,分区中的消息出现拥堵的时候,这个分区就会被新的消费者所代替,直到10个消费者线程分别和10个分区建立一一对应关系为止

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐