【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。这是我的系列专栏:Kafka 从入门到成神。如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步。如果感觉博主的文章还不错的话,请三连支持一下博主哦。
- 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。
- 📝个人公众号:爱敲代码的小黄(回复 “技术书籍” 可获千本电子书籍)
- 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神
- 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人
文章目录
一、引言
往期推荐:
二、多线程开发消费者
1. 消费者设计原理
Kafka consumer 是一个单线程的设计方案,从 Kafka Consumer 的入口类 KafkaConsumer 。KafkaConsumer 是一个双线程的操作,既用户线程和心跳线程。
-
用户线程:启动 Consumer 应用程序 main 方法的线程
-
心跳线程:负责定期给对应的 Broker 发送心跳请求,以标识消费者的存活性。
2. 多线程设计方案
我们要明确的是:KafkaConsumer 类不是线程安全的,所有的 I/O 操作都是发生在用户主线程中。多个线程不能共享一个 KafkaConsumer 。
我们制定两套设计方案:
-
消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整消息获取,处理消息逻辑。
-
消息者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。
这里我们解释一下这两个方案分别代表什么意思:
- 第一种:每一个线程创建一个 kafkaConsumer,来执行消息获取和消息处理逻辑
- 第二种:多个线程拉取消息,交于线程池进行消息的处理
两个方案的优缺点:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cAIibrNR-1647873783964)(C:\Users\黄良帅\AppData\Roaming\Typora\typora-user-images\image-20220320000129315.png)]
方案一:
优点:
- 实现简单,符合我们当前使用 consumer 的习惯。
- 多个线程间没有交互,省去很多线程安全保障的开销
- Kafka 每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑,因此 Kafka 主题中的每个分区都能保证只被一个线程处理。容易实现分区内的消息消费顺序。
缺点:
- 每个线程维护自己的 KafkaConsumer 实例,占用更多的系统资源,如:内存、TCP连接等
- 方案的线程数受限于订阅主题的分区数。一个消费组中,一个分区只能由一个消费者消费。
- 每个线程完整的执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,消息处理过慢,容易出现重平衡
方案二:
优点:
- 将任务切分为消息获取和消息处理两个部分,拥有极高的可伸缩性
缺点:
- 实现难度过大,两组线程管理
- 无法保证分区消息消费的顺序性
- 多线程提交位移,可能导致正确提交位移较困难,可能会出现重复性消费。
3. 实战演示
方案一:
public class FirstMultiConsumerThreadDemo {
// 集群的地址
public static final String brokerList = "cluster1:9092";
// 订阅的主题
public static final String topic = "test.topic";
// 消费组的ID
public static final String groupId = "group.demo";
// 配置文件
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer",StringDeserializer.class.getName());
return properties;
}
public static void main(String[] args) {
Properties props = initConfig();
int consumerThreadNum = 4;
// 启动 4 个 KafkaConsumer 实例
for (int i = 0; i < consumerThreadNum; i++) {
new KafkaConsumerThread(props,topic).start();
}
}
public static class KafkaConsumerThread extends Thread{
private KafkaConsumer<String,String> kafkaConsumer;
public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
while (true){
// 获取消息
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record : records){
//实现处理逻辑
System.out.println(record.value());
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
kafkaConsumer.close();
}
}
}
}
方案二:
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
// 创建一个线程陈
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
// 获取消息后,使用线程池来处理消息
for (final ConsumerRecord record : records) {
executors.submit(new Worker(record));
}
}
..
三、消费者TCP连接
我们之前讲过,生产者是如何管理TCP连接的,同样,我们的消费者也可以管理TCP连接。
1. 何时创建TCP连接
和我们的生产者类型,消费者也需要构建一个 KafkaConsumer
实例,但是消费者在构建 KafkaConsumer
实例的时候,是不会创建TCP连接的。
原因在于:生产者在进行创建的时候,会启动一个 Sender
线程,这个线程负责 Socket 连接的创建。
// 会 new 一个 Sender 的类,这个类继承 Runnable 接口
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// KafkaThread 继承 Thread,启动 Sender 线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
我们的 KafkaConsumer TCP 连接是在调用 KafkaConsumer.poll() 方法是被创建的。
从细粒度来说,在 poll 方法的内部有三个时机可以创建 TCP 连接。
- 发起 FindCoordinator 请求时
- 消费者端有个组件叫做协调者(Coordinator),在我们的 Broker 里面,负责消费组的成员管理和各个消费者位移提交管理。
- 消费者首次调用 poll 方法时,向 Kafka 集群发送一个名为 FindCoordinator 的请求,希望 Kafka 集群告诉消费者哪一个 Borker 是他的协调者。请求会发给当前负载量最小的那台 Broker。
- 这个时候会创建一个 TCP 连接
- 连接协调者时
- 当我们的 Broker 处理完 FindCoordinator 请求时,会给我们的消费者返回一个响应结果,告诉我们消费者哪一台 Broker 是真正的协调者。
- 当消费者知道真正的协调者时,创建与该 Broker 的 TCP 连接。这个时候,我们的协调者可以做一些组协调操作,比如: 加入组、等待组分配、心跳请求处理、位移获取、位移提交等
- 消费数据时
- 消费者会为每个要消费的分区创建与该分区领导者的 TCP 连接。如果当前消费者需要消费 5 个分区的数据,这 5个分区的领导者副本分布在 4 个 Broker 上,那么我们需要创建与这 4 台 Broker 的连接
2. 创建多少个 TCP 连接
- 第一类连接:消费者向 Kafka 集群发送元数据请求以获取整个集群的信息。然后发送 FindCoordinator 请求获取协调者所在的 Broker。
- 第二类连接:连接协调者所在的 Broker,消费者进程开启消费者组的各种功能以及后续的消息消费。
- 第三类连接:消费者去向所有分区的领导者副本所在的 Broker 获取数据,创建每个的 TCP 连接。
这里有一个需要注意的点:当我们第三类TCP连接创建成功后,消费者程序将会抛弃第一类TCP连接。之后定期请求元数据,也是使用的第三类。
第一类 TCP 连接仅仅是为了首次获取元数据而创建的,后面就会被废弃掉。最根本的原因是,消费者在启动时还不知道 Kafka 集群的信息,只能使用一个“假”的 ID 去注册,即使消费者获取了真实的 Broker ID,它依旧无法区分这个“假”ID 对应的是哪台 Broker,因此也就无法重用这个 Socket 连接,只能再重新创建一个新的连接
3. 何时关闭 TCP 连接
消费者关闭 TCP 连接分为主动关闭和自动关闭
主动关闭:
- 手动调用 KafkaConsumer.close() 方法,或者执行 kill 命令
自动关闭:
- 消费者端参数 connection.max.idle.ms ,如果在规定的时间该 TCP 连接没有请求的话,则自动关闭。
更多推荐
所有评论(0)