Kafka Producer Acks

设置ACK

 props.put("acks", "all");

通过上述代码,配置kafka生产者发送消息后,是否等待Broker的回执信息。在集群环境下,该配置是kafka保证数据不丢的重要的参数之一,今天来学习一下,里面隐藏在该参数背后的原理和逻辑。

Kafka 生产者将消息发送至topic的Leader分区。由于Leader分区和Replica分区是异步复制,存在时间差。极端情况下,当消息被写入到Leader分区后,Leader分区奔溃,此时数据就会出现丢失。

因此kafka 生产者还可以指定ack的级别,以确保消息被成功写入之前必须写入最少的副本数。

acks=0

设置为0,生产者只管发送消息,不等待broker返回响应,这种设置安全级别最低。如果broker下线或发生异常,系统并不知道将丢失数据。该设置适用于允许数据丢失的场景,优势是producer吞吐量高,网络开销小

在这里插入图片描述

acks=1

设置为1,生产者发送数据后,等待Leader 分区返回结果确认。Leader分区返回响应,但是不保证消息复制到Replica分区。如果生产者没有收到Leader分区返回的响应结果, 生产者可以通过重试请求,保证数据不丢。如果Leader分区在消息复制之前发生意外,则存在丢失数据的风险

在这里插入图片描述

acks=all

生产者默认值设置为all,当消息被所有Replica副本同步完成时,生产者才认为消息已经写入成功。也可以将acks设置为-1,二者作用是相等的。

在这里插入图片描述

分区的Leader副本检查是否有足够的同步副本用于安全写入消息(由参数min.insync.replicates控制)。请求将存储在缓冲区中,直到Leader副本观察到跟随者副本复制了消息,此时成功的确认被发送回客户端。

可以在主题和broker级别配置min.insync.replicas。当数据写入所有同步副本-min.sync.replicas时,数据被视为已提交。值2表示至少有2个ISR副本(包括Leader副本)必须同时拥有数据。

如果希望确保已提交的数据写入多个副本,则需要将同步副本的最小数量设置为更高的值。例如主题有三个副本,并且将min.insync.replicas设置为2,则只有在三个副本中至少有两个副本同步时,才能写入主题中的分区。当所有三个副本都同步时,一切正常进行。如果其中一个副本不可用,也是如此。但是,如果三个副本中有两个不可用,代理将不再接受生产请求。相反,尝试发送数据的生产者将收到NotEnoughReplicasException。

在这里插入图片描述

高可用性

Kafka topic的副本设置为3,最多可以忍受2个broker失效,而保证数据不丢失。所以,通常来讲,N个副本,可以在N-1个副本丢失的情况下,仍然保证数据整体可用。

对于3个副本的topic,从读写两个方面分析下数据的高可用性

  • 读数据 - 只要有一个分区启动并被视为ISR,该主题将可供读取

  • 写数据

  • acks=0 & acks=1 - 只要有一个分区启动并被视为ISR,该主题将可以继续写入

  • acks=all

    min.insync.replicas=1 默认情况: topic必须有一个分区作为ISR,因此可以容忍2个broker关闭

    min.insync.replicas=2 : 主题必须至少有2个ISR启动,因此我们最多可以容忍一个代理关闭(在复制因子为3的情况下),并且我们保证每次写入时,数据至少会被写入两次

    min.insync.replicas=3: 这种场景要求最严格,不允许任何broker宕机

min.insync.replicas 参数在集群环境中是如此的重要,接下来学习下如何设置该参数。

Topic级别配置

min.insync.replicas默认配置为1,可以通过CLI工具更改topic的配置进行覆盖。接下来学习下如何使用工具进行个性化的配置

创建主题

使用工具创建一个3个分区,一个副本的topic

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic configured-topic --partitions 3 --replication-factor 1

查看主题

查看主题是否创建成功

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic configured-topic
Topic: configured-topic	TopicId: CDU7SBxBQ1mzJGnuH68-cQ	PartitionCount: 3	ReplicationFactor: 1	Configs:
	Topic: configured-topic	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: configured-topic	Partition: 1	Leader: 3	Replicas: 3	Isr: 3
	Topic: configured-topic	Partition: 2	Leader: 1	Replicas: 1	Isr: 1

修改配置

将 min.insync.replicas 参数设置为 2

./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name configured-topic --add-config min.insync.replicas=2

重新查看主题信息

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic configured-topic
Topic: configured-topic	TopicId: CDU7SBxBQ1mzJGnuH68-cQ	PartitionCount: 3	ReplicationFactor: 1	Configs: min.insync.replicas=2
	Topic: configured-topic	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: configured-topic	Partition: 1	Leader: 3	Replicas: 3	Isr: 3
	Topic: configured-topic	Partition: 2	Leader: 1	Replicas: 1	Isr: 1

删除配置

可以通过 --delete-config 参数选项 删除配置

./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name configured-topic --delete-config min.insync.replicas

Broker级别配置

静态配置

min.insync.replicas默认配置为1,可以通过修改配置文件config/server.properties修改默认属性。配置修改需要重启服务才能生效

min.insync.replicas=2

动态配置

kafka-configs CLI工具脚本可以动态更新broker配置,无需重启应用。使用方式如下:

  • 修改配置
$ ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default --add-config min.insync.replicas=2 
Completed updating default config for brokers in the cluster.
  • 查看配置
$ ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-default
Default configs for brokers in the cluster are:
  min.insync.replicas=2 sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:min.insync.replicas=2}
  • 删除配置
./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-default  --delete-config min.insync.replicas
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐