持续总结中!2024年面试必问 20 道 Kafka面试题(三)
1、Kafka 的 ISR (In-Sync Replica) 是什么?2、如何获取 Kafka 中 topic 主题的列表?
上一篇地址:持续总结中!2024年面试必问 20 道 Kafka面试题(二)-CSDN博客
五、Kafka 的 ISR (In-Sync Replica) 是什么?
Kafka的ISR(In-Sync Replicas)是Kafka中用于维护数据一致性和高可用性的关键概念之一。ISR列表包含了一个领导者副本(Leader)和与之保持完全同步的追随者副本(Followers)。以下是ISR的详细说明:
-
领导者副本(Leader):
- 在Kafka中,每个分区都有一个领导者副本,负责处理所有的读写请求。生产者发送的消息首先会被写入领导者副本,然后由领导者副本负责将消息复制到追随者副本。
-
追随者副本(Follower):
- 追随者副本的作用是从领导者副本那里同步数据。追随者副本会尝试不断从领导者那里拉取新的消息,并将其写入自己的日志中。
-
同步状态:
- 当追随者副本成功从领导者副本同步消息后,它会更新自己的日志,并发送一个确认给领导者。只有当追随者副本的消息落后于领导者副本的消息不超过特定的配置值(
replica.lag.time.max.ms
)时,它才会被认为是同步的。
- 当追随者副本成功从领导者副本同步消息后,它会更新自己的日志,并发送一个确认给领导者。只有当追随者副本的消息落后于领导者副本的消息不超过特定的配置值(
-
ISR列表:
- ISR列表是由领导者维护的,包含了当前与领导者保持完全同步的所有追随者副本。如果追随者副本落后于领导者副本的消息太多,它将从ISR列表中被移除。
-
数据一致性:
- Kafka确保在任何时候,只有ISR列表中的副本才会被用于处理读写请求。这意味着,即使有追随者副本因为网络问题或故障而落后,它们也不会影响数据的一致性。
-
故障转移:
- 如果领导者副本发生故障,Kafka会从ISR列表中选举一个新的领导者。由于ISR列表中的追随者副本都是与原领导者完全同步的,因此可以保证数据不会丢失。
-
性能考虑:
- 通过维护ISR列表,Kafka可以确保只有同步的副本参与数据复制,这有助于提高集群的性能。如果允许所有追随者副本都参与复制,那么可能会因为网络延迟或追随者副本的性能问题而影响整体性能。
-
动态调整:
- ISR列表是动态调整的。如果追随者副本能够赶上领导者副本的进度,它可以重新加入ISR列表。同样,如果追随者副本落后太多,它将被从ISR列表中移除。
-
监控和配置:
- Kafka提供了监控工具来查看每个分区的ISR列表状态。管理员可以根据需要调整相关配置,如
min.insync.replicas
,以确保数据的一致性和可用性。
- Kafka提供了监控工具来查看每个分区的ISR列表状态。管理员可以根据需要调整相关配置,如
通过ISR机制,Kafka能够在保证数据一致性的同时,提供高可用性和良好的性能。这种机制使得Kafka成为一个非常强大的分布式消息队列系统。
六、如何获取 Kafka 中 topic 主题的列表?
在Kafka中,获取topic主题列表可以通过几种不同的方法实现,包括使用Kafka命令行工具、Kafka AdminClient API或通过编写自定义的客户端应用程序。以下是一些常用的方法:
1. Kafka命令行工具(kafka-topics.sh脚本)
Kafka提供了一个命令行工具kafka-topics.sh
,它可以用来管理topic,包括列出所有的topic。
基本命令格式:
bin/kafka-topics.sh --list --zookeeper <zookeeper_host>:<zookeeper_port>
--zookeeper
:指定ZooKeeper的连接字符串。如果你的Kafka版本是2.8.0或更高,使用--bootstrap-server
来指定Kafka集群的bootstrap服务器。<zookeeper_host>
:ZooKeeper服务的主机名或IP地址。<zookeeper_port>
:ZooKeeper服务的端口号。
示例:
bin/kafka-topics.sh --list --zookeeper localhost:2181
2. Kafka AdminClient API
Kafka的AdminClient API提供了编程方式来管理topic,包括获取topic列表。
Java示例代码:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import java.util.Collections;
import java.util.Properties;
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka集群地址
AdminClient admin = AdminClient.create(props);
ListTopicsOptions options = new ListTopicsOptions();
// 如果需要,可以设置options来过滤topics
admin.listTopics(options).names().whenComplete((names, exception) -> {
if (exception == null) {
names.forEach(System.out::println);
} else {
exception.printStackTrace();
}
});
admin.close();
3. Kafka Consumer API
通过创建一个Kafka消费者,可以订阅一个空的主题列表,然后使用listTopics()
方法获取所有主题的列表。
Java示例代码:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import java.util.Collections;
import java.util.Set;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
// 订阅空的主题列表
consumer.subscribe(Collections.emptyList());
// 获取主题列表
Set<String> topics = consumer.listTopics();
topics.forEach(System.out::println);
} catch (KafkaException e) {
e.printStackTrace();
} finally {
if (consumer != null) {
consumer.close();
}
}
注意事项
- 确保你有权限访问Kafka集群。
- 如果Kafka集群启用了安全特性,如SSL/TLS或SASL,需要在配置中添加相应的安全设置。
- 使用AdminClient API时,记得在操作完成后关闭AdminClient以释放资源。
通过以上任一方法,你都可以获取Kafka中topic主题的列表。
更多推荐
所有评论(0)