本文主要学记录了kafka3.x的一些知识。



一、概述

Kafka是分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。

消息队列的两种模式:

  • 点对点模式:

一个生产者对应一个消费者,消费者主动拉取消息,确认收到消息之后,删除对应的消息。

  • 发布订阅模式:

一个生产者可以对应多个消费者,多个消费者之间相互独立,消息按照主题(topic)分类,消费者主动拉取消息后不删除消息,其他消费者依旧可以再次消费这个消息。

二、Kafka基础架构

在这里插入图片描述

架构特点:

  • 为方便扩展,提高吞吐量,一个topic分为多个partition(分区)。
  • 为配合多个分区的设计,引入了消费者组(consumer group)的概念。
  • 为提高可用性,每个topic分区增引入了副本的机制,并且副本之间存在leater和follower区分。
  • Kafka2.8之前必须依赖zk,zk中记录着Kafka集群中所有的节点的信息,以及每个topic分区中的leater和follower。

三、kafka-3.1.0集群安装

下载kafka-3.1.0压缩包,解压后即可完成安装,修改以下配置文件,修改后,分发到node-01,node-02,node-03三台机器上,即可完成kafka集群的安装:

// kafka每个broker的身份标识,必须唯一
broker.id=2
// kafka数据存放地址
log.dirs=/orkasgb/data/kafka
// 指定zookeeper的地址
zookeeper.connect=node-01:2181,node-02:2181,node-03:2181

// 安装好集群之后,分别执行以下命令启动kafka
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

四、Kafka的基本命令

  • kafka-topics.sh:
基本参数说明
–bootstrap-server <String: server to connect to>指定kafka集群地址,可以有多个,必选,格式为:node-01:9092,node-02:9092
–topic <String: topic>指定链接的topic(topic)
–create创建主题
–delete删除主题
–alter修改主题,一般只能修改分区数
–list列出所有的主题
–describe描述当前主题的详细信息
–partitions <Integer: # of partitions>创建主题时指定分区数,必选
–replication-factor <Integer: replication factor>创建主题时指定副本数,必选
–config <String: name=value>指定配置文件位置
# 创建一个topic-orkasgb-test主题,分区数为2,副本数为3
[root@node-01 ~]# kafka-topics.sh --bootstrap-server node-01:9092,node-02:9092 --topic topic-orkasgb-test --create --partitions 2 --replication-factor 3
Created topic topic-orkasgb-test.
[root@node-01 ~]#

# 查看当前kafka集群中的topic
[root@node-01 ~]# kafka-topics.sh --bootstrap-server node-01:9092,node-02:9092 --list
__consumer_offsets
topic-orkasgb-test
[root@node-01 ~]# 

# 查看topic-orkasgb-test主题的详细信息
[root@node-01 ~]# kafka-topics.sh --bootstrap-server node-01:9092,node-02:9092 --topic topic-orkasgb-test --describe
Topic: topic-orkasgb-test       TopicId: k6OqKVFcQgCFrahkJOyWbQ PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: topic-orkasgb-test       Partition: 0    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3
        Topic: topic-orkasgb-test       Partition: 1    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
[root@node-01 ~]# 
  • kafka-console-producer.sh :
基本参数说明
–bootstrap-server <String: server to connect to>指定kafka集群地址,可以有多个,必选,格式为:node-01:9092,node-02:9092
–topic <String: topic>指定链接的topic(topic)
–producer.config <String: config file>指定配置文件位置
# 创建一个消费者,将数据发送到topic-orkasgb-test主题中
[root@node-03 bin]# kafka-console-producer.sh --bootstrap-server node-01:9092,node-02:9092 --topic topic-orkasgb-test --producer.config /orkasgb/software/kafka-3.1.0/config/producer.properties
>hello kafka
>
  • kafka-console-consumer.sh :
基本参数说明
–bootstrap-server <String: server to connect to>指定kafka集群地址,可以有多个,必选,格式为:node-01:9092,node-02:9092
–topic <String: topic>指定链接的topic(topic)
–consumer.config <String: config file>指定配置文件位置
# 创建一个消费者,用于消费发往topic-orkasgb-test主题中的数据
[root@node-01 ~]# kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092 --topic topic-orkasgb-test --consumer.config /orkasgb/software/kafka-3.1.0/config/consumer.properties
hello kafka

  • kafka-consumer-groups.sh:
# 列出整个集群中的消费组
[root@node-02 bin]# kafka-consumer-groups.sh --bootstrap-server node-01:9092,node-02:9092 --list
test-consumer-group
[root@node-02 bin]# 
# 查看test-consumer-group属组中的主题消费情况
[root@node-02 bin]# kafka-consumer-groups.sh --bootstrap-server node-01:9092,node-02:9092 --group test-consumer-group  --describe

GROUP               TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST             CLIENT-ID
test-consumer-group topic-orkasgb-test 0          0               0               0               console-consumer-b234bb08-4374-498d-9cfe-192de9f9cdd3 /192.168.137.223 console-consumer
test-consumer-group topic-orkasgb-test 1          1               1               0               console-consumer-b234bb08-4374-498d-9cfe-192de9f9cdd3 /192.168.137.223 console-consumer
[root@node-02 bin]# 

五、Kafka生产者发送流程

在这里插入图片描述

  • batch.size:用来控制数据量大小,当数据量大小达到该值时,才会唤醒Sender线程去发送数据,默认16k。
  • linger.ms:数据发送延迟设置,当数据还未达到batch.size,但是已经到达延迟时间(linger.ms),也会唤醒Sender线程去发送数据,默认为0,没有延迟。
  • ack应答:
    • 0:生产者发送数据后,立即给客户端返回应答响应,不管leader是否已经落盘处理。
    • 1:生产者发送数据后,需要等待leader落盘处理后并ack应答成功后才给客户端返回应答响应。
    • -1(all):生产者发送数据后,需要等待leader和follower都落盘处理后并ack应答成功后才给客户端返回应答响应。

异步发送消息代码:

	@Test
	public void producter() throws ExecutionException, InterruptedException {
		// 创建生产者配置文件
		HashMap<String, Object> config = new HashMap<>();
		// 集群地址
		config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node-01:9092,node-02:9092,node-03:9092");
		// KEY的序列化器
		config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		// VALUE的序列化器
		config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

		// 创建Producter对象
		DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(config);
		Producer<String, String> producer = producerFactory.createProducer();
		for (int i = 0; i < 5; i++) {
            // 同步发送数据
//			RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("topic-orkasgb-test", "test" + i)).get();
//			System.out.println(recordMetadata.toString());
            			// 同步发送数据
			// RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("topic-orkasgb-test", "test" + i)).get();
			// System.out.println(recordMetadata.toString());

			// 异步发送数据
			Future<RecordMetadata> send = producer.send(new ProducerRecord<>("topic-orkasgb-test", "哈哈哈哈" + i));
            producer.flush();
			System.out.println(send.toString());

			// 带回调函数的异步发送数据
//			Future<RecordMetadata> send = producer.send(new ProducerRecord<>("topic-orkasgb-test", "哈哈哈哈" + i), new Callback() {
//				@Override
//				public void onCompletion(RecordMetadata metadata, Exception exception) {
//
//				}
//			});
//			System.out.println(send.toString());
		}	
		producer.close();
	}

六、生产者分区策略

kafka生产者存在默认的分区策略,如果记录中指定了分区,则使用指定的分区策略,如果未指定分区但是在发送消息时指定了key,则根据key的hash选择分区,如果未指定分区或key,则选择在批处理已满时才更改的粘性分区策略。

  • 指定分区号:当指定分区的时候,直接将消息发送到指定的分区中。例如,指定分区为0,那么直接会将消息发送到0号分区。
  • 未指定分区号但是指定了key:当未指定分区号但是指定了key的时候,会根据key的hash值与topic的partition数进行取余计算出分区号。例如,如果key1的hash值为4,key2的hash值为5,topic的partition数为2,那么key1对应的消息直接发送到0号分区,key2对应的消息直接发送到1号分区.
  • 未指定分区号也未指定key:当未指定分区号也未指定key的时候,kafka内部自行采用黏性分区策略,随机选择一个分区号,并且尽可能将本批次的所有消息都发送到该分区,直到该分区的bach.size已满或者innger.ms时间到,才会随机再次选择另外一个分区号(必须和上次选择的分区号不一致)。例如,如果上一次选择的分区号为1,那么直到该分区的bach.size已满或者innger.ms时间到,那么会随机再次选择另外一个分区号,如果选择的还是0,那么会一直选择,直到和上一次不一致为止。

自定义分区器:

// 在生产者配置文件中指定自定义分区器
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UserPartition.class);

/**
 * 自定义分区器,继承Partitioner类,重写partition方法即可。
 */
public class UserPartition implements Partitioner {
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		// 如果消息中包含了orkasgb字符串,就将这条消息发送到0好分区,否则就发送到1号分区
		return StringUtils.contains(value.toString(), "orkasgb") ? 0 : 1;
	}

	@Override
	public void close() {

	}

	@Override
	public void configure(Map<String, ?> configs) {

	}
}

七、幂等性原理

幂等性是指,不管producter向broker发送多少条重复数据,broker只会持久化一条。幂等性判断数据重复的标准为以<PID,Partiton,SeqNumber>作为key相同的数据即可认为是重复数据。其中PID是kafka集群没启动一次,都会重新分配一个PID,Partiton指的是分区号,SeqNumber是一个单调递增的序列。按照此标准,幂等性可以保证单分区内数据不重复,幂等性(enable.idempotence)默认开启。
在这里插入图片描述

八、kafka事务原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gsupDXmY-1649484313454)(/home/huanzi/Desktop/kafka_事务原理.png)]

  • 开启事务,必须开启幂等性,事务的底层就是幂等性。
  • _transaction_state-分区-Leader为存储事务信息的特殊主题,默认有50个分区。
  • kafka集群内部选举主事务协调器。选举方式为:根据事务ID(transational.id)的hash值%50,计算出该事物属于哪个分区。该分区leader所在的broker节点即为该事务ID(transational.id)对应的事务协调器。
  • producter开启事务的时候,需要指定一个唯一的事务ID,后续集群被重启,那么也会根据这个事务ID继续处理未完成的事务。
  • producter会向kafka集群请求一个producter id,该id主要用于幂等性。
  • producter向集群发送消息到topicA-partition0中,并发送commit请求。
  • 事务协调器接受到commit请求之后,将commit请求持久化到事务主题中,并且还会发送一个commit请求到topicA-partition0中,判断消息时候处理成功。
  • topicA-partition0返回处理成功的消息之后,事务协调器会将该消息处理成功的信息持久化到_transnnaction_state-分区-Leader中。

九、数据有序/乱序

  • kafka数据有序,保证单分区内部有序。
    1. 在kafka1.x之前,要保证数据单分区内部有序,只需要设置max.in.flight.requests.per.connection=1即可。
    2. 在kafka1.x之后,要保证数据单分区内部有序,如果没有开启幂等性,只需要设置max.in.flight.requests.per.connection=1即可。如果开启了幂等性,需要设置max.in.flight.requests.per.connection<=5,(默认为5)即可。因为在开启幂等性的之后,kafka集群会缓存5(max.in.flight.requests.per.connection)个请求,正常情况下,请求是按照顺序发送到kafka集群,但是如果顺序错乱了,也就是说的数据乱序,那么kafak集群会等待,直到5个请求全部发送过来,kafka集群内部会对这5个请求重新排序。排序时会利用幂等性中的SeqNumber单调递增的特性,所以,kafka能保证数据单分区内部有序。

十、kafka broker工作原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K1GztRoa-1649484313455)(/home/huanzi/Desktop/kafka_broker工作原理.png)]

  1. broker启动之后,会去zookeeper注册信息,在zookeeper中生成一个/kafka/brokers/ids信息,里面存储的是所有的broker信息。
  2. 每一个主题的副本都对应一个controllor,所以必须选出一个controllor老大,最后由这个controllor老大监听各个broker节点的变化,在后续会辅助broker选举新的主题副本leader。选举controllor老大的方式就是去抢先注册zookeeper中的/kafka/controller节点,谁先注册到,谁就是controllor老大。
  3. controllor老大会将集群中的broker信息上传到zookeeper中,生成/kafka/brokers/topics/[对应的主题]/partitions/[对应的分区号]/state,里面存储的就是该主题该分区下副本的信息,包括leader以及存活的follower信息(isr)。
  4. 假如节点中的某一个主题的分区中的leader挂了,此时controllor老大会监听到这个变化,那么它会更新/kafka/brokers/topics/[对应的主题]/partitions/[对应的分区号]/state信息,并且开始重新选举leader。选举规则为,以ar中节点顺序为基准,在isr中存在的节点为新的leader。例如,leader为1,ar[1,3,2],is[1,3,2],如果leader为1挂掉了,那么新的leader将会为3。

十一、节点的服役/退役

  • 服役一台新的节点:

    1. 创建一台新的kafka节点或者从之前的机器上克隆一个kafka节点(注意,如果是克隆的机器,那么需要删除kafka的data和log目录,并修改broker.id),并启动。
    2. 创建一个xxxx.json文件,对原来的主题进行负载均衡。
    {
        "topics" : [
            {"topic" : "xxxx"}
        ],
        "version" : 1
    }
    
    1. 按照xxxx.json生成一个负载均衡的计划。
    kafka-reassign-partitions.sh --bootstrap-server node-01:9092 --topics-to-move-json-file xxxx.json --broker-list "0,1,2,3" --generate
    
    1. 将生成的负载均衡的计划保存成xx.json文件。
    2. 按照xx.json文件执行负载均衡的计划。
    kafka-reassign-partitions.sh --bootstrap-server node-01:9092 --reassignment-json-file xx.json --execute
    
    1. 验证负载均衡的计划是否执行完成。
    kafka-reassign-partitions.sh --bootstrap-server node-01:9092 --reassignment-json-file xx.json --verify
    
  • 退役一台旧节点:

思想,将旧节点上的数据转移到其他的节点上后删除该节点即可,方法类似于服役新节点的方式,在生成负载均衡计划的时候,将–broker-list "0,1,2,3"中要退役的的broker.id取消掉,其他步骤按照顺序依次执行即可将数据到其他的节点上。

十二、kafka故障处理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-efk657Dm-1649484313456)(/home/huanzi/snap/typora/57/.config/Typora/typora-user-images/image-20220408231918549.png)]

Follower故障处理

  • LEO(Log End Offset):每一个副本的最后一个offset,也就是最新的offset + 1。
  • HW:所有副本中最小的LEO。

Follower故障处理步骤:

  1. 将故障的Follower节点从isr中剔除。
  2. 期间,leader和其他follower继续接收数据。
  3. 待该follower恢复后,开始恢复数据,首先从本地磁盘中加载数据,读取自己上次的HW,高于上次的HW的数据将会被删除,并且向leader请求同步数据,等到数据恢复达到新的HW时,就可以加入ISR中了。

Leader故障处理

  • LEO(Log End Offset):每一个副本的最后一个offset,也就是最新的offset + 1。
  • HW:所有副本中最小的LEO。

Leader故障处理步骤:

  1. 将故障的Leader节点从isr中剔除。
  2. 从其他follower中选举新的leader。
  3. 其他的follower必须和新的leader保持数据一致性,高于该leader的数据部分必须删除。所以,这样会存在数据丢失的问题,因为kafka保证的时数据一致性,并不能保证数据不丢失或者不重复。

十三、kafka文件存储机制

kafka的数据存储

kafka的数据存储是按照分区进行存储的,每个分区目录的命名格式为topic+分区号,每个分区目录下有三个类型的文件,.log,.index,.timeindex。因为kafka生产端为数据是不断的追加到.log文件中,所以.index用来记录数据的索引,.timeindex文件主要用来删除过期数据。kafka默认数据过期时间为7天。

.index为稀疏索引,文件中存储的是相对offset,.log文件中每写入大约4kb(log.index.interval.bytes)的数据,就会往.index文件中写入一条索引,这样能确保文件占用空间不会太大。

# 查看kafka的.log文件
[root@node-02 topic-orkasgb-test-1]# kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
Dumping ./00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1649067089350 size: 79 magic: 2 compresscodec: none crc: 384684249 isvalid: true
[root@node-02 topic-orkasgb-test-1]#

手动调整分区副本存储策略

  1. 创建分区副本存储策略计划xxxx.json。
{
  "partitions" : [
  	{"topic" : "xxx","partition": 0,"replicas": [1,3]},
    {"topic" : "xxx","partition": 1,"replicas": [2,3]},
    {"topic" : "xxx","partition": 2,"replicas": [1,2]}
  ],
  "version" : 1
}
  1. 按照xx.json文件执行负载均衡的计划。
kafka-reassign-partitions.sh --bootstrap-server node-01:9092 --reassignment-json-file xxxx.json --execute
  1. 验证负载均衡的计划是否执行完成。
kafka-reassign-partitions.sh --bootstrap-server node-01:9092 --reassignment-json-file xxxx.json --verify

日志清除策略

kafka中的文件清除,由以下参数控制:

  • log.retention.hours:默认7天,优先级最低
  • log.retention.minutes:分钟,默认为null,优先级2
  • log.retention.ms:毫秒,默认为null,优先级1
  • log.retention.check.interval.ms:检测数据文件过期时间的周期,默认每5分钟检测一次。
  • log.cleanup.policy:日志清楚策略设置,默认delete

日志清楚策略:

  1. delete删除日志:删除过期的数据,log.cleanup.policy = delete。

    • 基于时间:默认打开,以segment中所有记录中最大时间戳作为该文件的时间戳。
    • 基于大小:默认关闭,超过设置的所有的日志总大小,删除最早的segment。log.retention.bytes,默认为-1,表示无穷大。

    假如一个segment中的日志有一部分数据过期了,一部分没有过期(记录中最大时间戳的那部分),那么这个segment是不会被删除掉的。

  2. compact:日志压缩,log.cleanup.policy = compact。

    对于相同的key,只保留最后一个版本。压缩后会存在offset不是连续的,如果要找的某个offset不存在,那么就会返回一个最近的比它大的offset,然从这个offset开始消费。

十四、kafka消费者

消费者组的分区分配流程:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iUHlBbUF-1649484313457)(/home/huanzi/Desktop/kafka_消费者分区分配原理.png)]

  1. kafka集群内部进行coordinator老大的选举,用于辅助consumer组的初始化和分区的分配。选举方式为group.id的hash值%50,计算出来的值对应的broker上的coordinator为coordinator老大。
  2. consumer组中的所有的consumer都向coordinator老大发送加入当前group组的请求,coordinator老大收到请求后将这些consumer编为一组,并从这些consumer中选举出来一个leader consumer,然leader consumer制定消费方案。
  3. leader consumer制定好消费方案后发送给coordinator老大,coordinator老大将任务分配给对应的consumer,然后各个consumer到分配给自己的分区上拉取数据。
  4. 每个消费者都会和coordinator老大保持心跳(默认3秒),一旦出现超时(session.timeout.ms=45s),那么coordinator老大就会将这个consumer移除,之后触发负载均衡,将其他分区的任务均衡到剩余的consumer。在拉取数据之后,如果消费者处理时间过长(max.poll.interval.ms=5分钟),也会触发负载均衡,将其他分区的任务均衡到剩余的consumer。

消费者组的数据拉取流程:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OCqDFW4g-1649484313458)(/home/huanzi/snap/typora/57/.config/Typora/typora-user-images/image-20220407201426298.png)]

  1. 消费者组创建一个ConsumerNetworkClient网络链接,用于和kafka集群进行交互,consumer发送消费请求,并初始化一些参数:fatch.min.bytes,每批次抓取数据最小值,默认1k。fatch.max.wait.ms,一批次数据未被抓取的超时时间,默认500ms。fatch.max.bytes,每批次抓取数据大小,默认50m。max.poll.records,一次拉取数据的最大条数,默认500条。
  2. ConsumerNetworkClient将数据拉取过来之后,缓存到对应的队列中。
  3. consumer开始从缓存队列中拉取数据,默认一次拉取500条数据。
  4. 拉取数据之后开始进行反序列化,因为在生产者端,数据被序列化了。
  5. 数据还需要进过拦截器,之后才是真正的处理处理。

消费者分配分区策略:

kafka提供了四种消费者分配分区策略(partition.assignment.strategy),分别是:

  • org.apache.kafka.clients.consumer.RangeAssignor:按主题分配分区,所以RangeAssignor是针对每一个topic而言的,首先对分区按照序号进行排序,如下,consumer1将会消费topicA-p1,topicA-p2,consumer2将会消费topicA-p3。通过partition数/consumer数来决定每一个consumer应该消费多少个分区,除不尽的,排在前边的消费者将会多消费几个分区。这种方式很明显,如果有多个topic的情况下,所有topic多余出来的分区都会交给前边几个consumer,而导致前边几个consumer会很忙,后边的consumer会很悠闲,这就是数据倾斜问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YcPORu6W-1649484313459)(/home/huanzi/snap/typora/57/.config/Typora/typora-user-images/image-20220407205356108.png)]

  • org.apache.kafka.clients.consumer.RoundRobinAssignor:以循环方式将分区分配给消费者。即以轮询的方式将分区分配给各个消费者。如下,consumer1将会消费topicA-p1和topicA-p3两个分区,consumer2将会消费topicA-p2。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vTlFYFyv-1649484313460)(/home/huanzi/snap/typora/57/.config/Typora/typora-user-images/image-20220407210542172.png)]

  • org.apache.kafka.clients.consumer.StickyAssignor:保证分配是最大平衡的,同时保留尽可能多的现有分区分配。有点类似于RangeAssignor,但是不同的是,分配是随机,consumer1将可能会消费topicA-p1,topicA-p3/或者topicA-p2,topicA-p3,consumer2将会消费topicA-p2或者topicA-p1。
  • org.apache.kafka.clients.consumer.CooperativeStickyAssignor:遵循相同的 StickyAssignor 逻辑,但允许合作再平衡。
    默认分配器是 [RangeAssignor, CooperativeStickyAssignor],默认情况下将使用 RangeAssignor,但允许升级到 CooperativeStickyAssignor,实现 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口允许您插入自定义分配策略。

消费者offset位置维护:

kafka0.9版本之前的offset存放在zookeeper中,kafka0.9版本之后的offset存放在kafka系统主题(__consumer_offset)中,在consumer.properties中设置参数exclude.internal.topic=false才可以查看系统主题,默认为true。__consumer_offset中是以key-value的形式存放数据,key就是group.id+topic+分区号,value就是当前主题所在分区号的消费到的offset。每日过一段时间,kafka会对主题中的内容进行compact,所以consumer_offset中保留的就是最新的offset。

# 查看系统主题__consumer_offset中的主题信息
kafka-console-consumer.sh --bootstrap-server node-01:9092,node-02:9092 --topic __consumer_offset --consumer.config /orkasgb/software/kafka-3.1.0/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

指定offset位置/时间消费:

kafka中提供了auto.offset.reset=earliest | latest | none三种offset消费方式,默认latest。

  • earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
  • latest:自动将偏移量重置为最新的偏移量。
  • none:如果没有找到消费者组的先前的偏移量,则抛出异常。

将时间转化为对应的offset,即可做到按照时间消费。

漏消费和重复消费:

  • 重复消费:因自动提交offset引起,如果上一个offset自动提交成功,此时consumer继续往下消费,且消费成功,但是未等到本次offset自动提交就挂了,当再次启动消费者时,消费者会读取上次提交成功的offset,那么此时就会造成重复消费的问题。
  • 漏消费:因手动提交offset引起,如果刚好手动提交offset成功,但是此时消费者刚好被kill掉,那么再次启动会读取上次提交成功的offset,而这个offset是手动提交成功,但是却未真正消费的offset,那么就会造成漏消费。

要避免漏消费和重复消费,那么就需要使用事务的方式,并且要做到整个消费到下游的所有链路上都支持事务。比如消费到存储数据到mysql,都必须支持事务才可以做到精确一次性消费。

消费者提高吞吐量:

  • 如果kafka的消费能力不足,则可以考虑增加topic的分区数,并且同时提升消费组的消费者的数量,消费者数=topic分区数(两者缺一不可)。
  • 如果是下游处理数据不及时:提高每批次数据拉取的数量,由默认的500条,提高到1000条。

十五、生产调优

1、提高生产者的吞吐量:

  • batch.size:批次大小,默认16K。
  • linger.ms:等待时间,建议修改为5~10ms。
  • commpression-type:考虑使用snappy进行数据压缩后在发送。
  • RecordAccumulator:缓冲区大小,可以适当的调整成64M,默认32M。

2、ACKS应答级别调优:

  • 0:生产者将数据发送过来之后,不需要等待leader应答成功即可响应客户端。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-svkL3lev-1649484313461)(/home/huanzi/snap/typora/57/.config/Typora/typora-user-images/image-20220405155229104.png)]

**问题分析:**当数据发送之后,因为不用等待leader应答就立即返回,假如数据发送到leader之后,leader还未处理数据就发生故障,那么此时数据已经全部丢失。

**数据可靠性分析:**数据丢失,但是效率最高。

  • 1:生产者将数据发送过来之后,需要等待leader应答成功后才能响应客户端。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1WVHke2j-1649484313462)(/home/huanzi/snap/typora/57/.config/Typora/typora-user-images/image-20220405160043922.png)]

**问题分析:**当数据发送之后,leader应答成功后立即返回,假如数据发送到leader之后,leader还未处理数据就发生故障,此时根据内部的leader选择策略,某一个follower称为新的leader,生产者会立即和新的leader交互,但是因为之前的leader应答成功,producter会认为之前的数据已经发送完成,不会再次发送,那么此时数据已经全部丢失。

当数据发送之后,leade和ISR队列中的所有follower都已经处理完数据,正准备acks应答时,leader发生故障,此时根据内部的leader选择策略,某一个follower称为新的leader,生产者会立即和新的leader交互,但是因为之前的leader没有应答成功,producter会认为之前的数据没有发送完成,继续再次发送,那么此时数据已经重复处理。

**数据可靠性分析:**数据丢失,数据重复。

  • -1(all):生产者将数据发送过来之后,需要等待leader和所有在ISR中存在的follower应答成功后才能响应客户端。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wYHUZO18-1649484313462)(/home/huanzi/snap/typora/57/.config/Typora/typora-user-images/image-20220405160953214.png)]

**问题分析:**当数据发送之后,producter再等待leader和其他follower应答,但是其中一个follower发生故障,导致无法acks应答,此时就会发生producter发送完数据却无法返回的情况。kafka为解决这种问题,让leader维护了一个动态的leader和follower的同步信息集合,形式为:(leader:1,isr:[1,2,3])。如果某一个follower超过30s(replica.lag.time.max.ms,默认30s)没有和leader进行通信,那么leader就认为该follower已经发生故障,此时就会将该follower从ISR队列中移除。

**数据可靠性分析:**如果副本数为1,任然存在数据丢失。

数据可靠性保障:

至少一次 = (ACKS级别=-1) + (分区副本数>=2)+ (ISR队列中至少要保障存在两个节点信息)。

精确一次 = 幂等性 + 至少一次。

3、手动调整分区副本存储策略

  1. 创建分区副本存储策略计划xxxx.json。
{
  "partitions" : [
  	{"topic" : "xxx","partition": 0,"replicas": [1,3]},
    {"topic" : "xxx","partition": 1,"replicas": [2,3]},
    {"topic" : "xxx","partition": 2,"replicas": [1,2]}
  ],
  "version" : 1
}
  1. 按照xx.json文件执行负载均衡的计划。
kafka-reassign-partitions.sh --bootstrap-server node-01:9092 --reassignment-json-file xxxx.json --execute
  1. 验证负载均衡的计划是否执行完成。
kafka-reassign-partitions.sh --bootstrap-server node-01:9092 --reassignment-json-file xxxx.json --verify
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐