• 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。
  • 📝个人公众号:爱敲代码的小黄(回复 “技术书籍” 可获千本电子书籍)
  • 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神
  • 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人

一、引言

往期推荐:

在这里插入图片描述


二、多线程开发消费者

1. 消费者设计原理

Kafka consumer 是一个单线程的设计方案,从 Kafka Consumer 的入口类 KafkaConsumerKafkaConsumer 是一个双线程的操作,既用户线程和心跳线程

  • 用户线程:启动 Consumer 应用程序 main 方法的线程

  • 心跳线程:负责定期给对应的 Broker 发送心跳请求,以标识消费者的存活性。

2. 多线程设计方案

我们要明确的是:KafkaConsumer 类不是线程安全的,所有的 I/O 操作都是发生在用户主线程中。多个线程不能共享一个 KafkaConsumer

我们制定两套设计方案:

  • 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整消息获取,处理消息逻辑。

    image-20220319224940677

  • 消息者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。

    image-20220319225032457

这里我们解释一下这两个方案分别代表什么意思:

  • 第一种:每一个线程创建一个 kafkaConsumer,来执行消息获取和消息处理逻辑
  • 第二种:多个线程拉取消息,交于线程池进行消息的处理

两个方案的优缺点:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cAIibrNR-1647873783964)(C:\Users\黄良帅\AppData\Roaming\Typora\typora-user-images\image-20220320000129315.png)]

方案一:

优点:

  • 实现简单,符合我们当前使用 consumer 的习惯。
  • 多个线程间没有交互,省去很多线程安全保障的开销
  • Kafka 每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑,因此 Kafka 主题中的每个分区都能保证只被一个线程处理。容易实现分区内的消息消费顺序。

缺点:

  • 每个线程维护自己的 KafkaConsumer 实例,占用更多的系统资源,如:内存、TCP连接等
  • 方案的线程数受限于订阅主题的分区数。一个消费组中,一个分区只能由一个消费者消费。
  • 每个线程完整的执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,消息处理过慢,容易出现重平衡

方案二:

优点:

  • 将任务切分为消息获取和消息处理两个部分,拥有极高的可伸缩性

缺点:

  • 实现难度过大,两组线程管理
  • 无法保证分区消息消费的顺序性
  • 多线程提交位移,可能导致正确提交位移较困难,可能会出现重复性消费。

3. 实战演示

方案一:

public class FirstMultiConsumerThreadDemo {
	// 集群的地址
    public static final String brokerList = "cluster1:9092";
    // 订阅的主题
    public static final String topic = "test.topic";
    // 消费组的ID
    public static final String groupId = "group.demo";
	
    // 配置文件
    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer",StringDeserializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        // 启动 4 个 KafkaConsumer 实例
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(props,topic).start();
        }
    }

    public static class KafkaConsumerThread extends Thread{
        private KafkaConsumer<String,String> kafkaConsumer;


        public KafkaConsumerThread(Properties props, String topic) {
            this.kafkaConsumer = new KafkaConsumer<>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        @Override
        public void run() {
            try {
                while (true){
                    // 获取消息
                    ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String,String> record : records){
                        //实现处理逻辑
                        System.out.println(record.value());
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                kafkaConsumer.close();
            }
        }
    }
}

方案二:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
 
 
private int workerNum = ...;
// 创建一个线程陈
executors = new ThreadPoolExecutor(
	workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
	new ArrayBlockingQueue<>(1000), 
	new ThreadPoolExecutor.CallerRunsPolicy());
 
 
...
while (true)  {
	ConsumerRecords<String, String> records = 
		consumer.poll(Duration.ofSeconds(1));
    // 获取消息后,使用线程池来处理消息
	for (final ConsumerRecord record : records) {
		executors.submit(new Worker(record));
	}
}
..

三、消费者TCP连接

我们之前讲过,生产者是如何管理TCP连接的,同样,我们的消费者也可以管理TCP连接。

1. 何时创建TCP连接

和我们的生产者类型,消费者也需要构建一个 KafkaConsumer 实例,但是消费者在构建 KafkaConsumer 实例的时候,是不会创建TCP连接的。

原因在于:生产者在进行创建的时候,会启动一个 Sender 线程,这个线程负责 Socket 连接的创建。

// 会 new 一个 Sender 的类,这个类继承 Runnable 接口
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// KafkaThread 继承 Thread,启动 Sender 线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

我们的 KafkaConsumer TCP 连接是在调用 KafkaConsumer.poll() 方法是被创建的

从细粒度来说,在 poll 方法的内部有三个时机可以创建 TCP 连接。

  1. 发起 FindCoordinator 请求时
  • 消费者端有个组件叫做协调者(Coordinator),在我们的 Broker 里面,负责消费组的成员管理和各个消费者位移提交管理。
    • 消费者首次调用 poll 方法时,向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉消费者哪一个 Borker 是他的协调者。请求会发给当前负载量最小的那台 Broker
    • 这个时候会创建一个 TCP 连接
  1. 连接协调者时
  • 当我们的 Broker 处理完 FindCoordinator 请求时,会给我们的消费者返回一个响应结果,告诉我们消费者哪一台 Broker 是真正的协调者。
    • 当消费者知道真正的协调者时,创建与该 Broker 的 TCP 连接。这个时候,我们的协调者可以做一些组协调操作,比如: 加入组、等待组分配、心跳请求处理、位移获取、位移提交等
  1. 消费数据时
    • 消费者会为每个要消费的分区创建与该分区领导者的 TCP 连接。如果当前消费者需要消费 5 个分区的数据,这 5个分区的领导者副本分布在 4 个 Broker 上,那么我们需要创建与这 4 台 Broker 的连接

2. 创建多少个 TCP 连接

  • 第一类连接:消费者向 Kafka 集群发送元数据请求以获取整个集群的信息。然后发送 FindCoordinator 请求获取协调者所在的 Broker。
  • 第二类连接:连接协调者所在的 Broker,消费者进程开启消费者组的各种功能以及后续的消息消费。
  • 第三类连接:消费者去向所有分区的领导者副本所在的 Broker 获取数据,创建每个的 TCP 连接。

这里有一个需要注意的点:当我们第三类TCP连接创建成功后,消费者程序将会抛弃第一类TCP连接。之后定期请求元数据,也是使用的第三类。

第一类 TCP 连接仅仅是为了首次获取元数据而创建的,后面就会被废弃掉。最根本的原因是,消费者在启动时还不知道 Kafka 集群的信息,只能使用一个“假”的 ID 去注册,即使消费者获取了真实的 Broker ID,它依旧无法区分这个“假”ID 对应的是哪台 Broker,因此也就无法重用这个 Socket 连接,只能再重新创建一个新的连接

3. 何时关闭 TCP 连接

消费者关闭 TCP 连接分为主动关闭和自动关闭

主动关闭:

  • 手动调用 KafkaConsumer.close() 方法,或者执行 kill 命令

自动关闭:

  • 消费者端参数 connection.max.idle.ms ,如果在规定的时间该 TCP 连接没有请求的话,则自动关闭。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐