文章目录

一、分区和副本机制:

1. 生产者分区写入策略:

生产者写入消息到topic, kafak将依据不同的策略将数据分配到不同的分区中:

  • 轮询分区策略;
  • 随机分区策略;
  • 按key分区分配策略;
  • 自定义分区策略;

1.1 轮询分配策略:

在这里插入图片描述

  • 默认的策略, 也是使用最多的策略, 可以最大限度的保证所有消息平均分配到一个分区;
  • 如果在生产消息时, key为null, 则使用轮询算法均衡的分配分区;

1.2 随机策略(不用):

随机策略, 每次都随机地将消息分配到每个分区; 在较早的版本, 默认的分区策略师随机策略; 也是为了将消息均衡的写入每个分区; 但后续轮询策略表现更佳, 所以基本上很少使用随机策略;
在这里插入图片描述

1.3 按key分配策略:

按key分配策略, 有可能出现 数据倾斜; 例如: 某个key包含大量的数据, 因为key值一样, 所以, 所有的数据都将分配到一个分区中, 造成改分区的消息数量远大于其他分区;
在这里插入图片描述

1.4 乱序问题:

轮询策略, 随机策略都会产生一个问题, 生产到kafka中的数据是乱序存储的, 而按key分区可以在一定程度上实现数据有序存储–也就是局部有序, 但这又可能导致数据倾斜, 所以在实际生产环境中要结合实际情况来取舍; 即: kafka中的消息是全局乱序, 局部partition是有序的; 如果要实现消息总是有序的, 可以将连续的消息放到一个partition, 但kafka失去了分布式的意义;

在这里插入图片描述

1.5 自定义分区策略:

在这里插入图片描述
实现步骤:

  • 创建自定义分区器;
  • 在kafka生产者中, 自定的使用自定义分区器;

2. 消费者组Rebalance机制:

2.1 Rebalance 再平衡:

kafka中rebalance称之为"再平衡’, 是kafka中确保Consumer Group下所有的consumer如何达成一致, 分配订阅的topic的每个分区的机制;
Rebalance触发时机有:

  • 消费者组中consumer的个数发生变化; 例如: 有新的消费者加入, 或者是某个consumer停止;
    在这里插入图片描述
  • 订阅的topic个数发生变化:
    消费者可以订阅多个主题, 假设当前的消费者组订阅了三个topic, 但有一个topic突然被删除了, 此时需要再平衡;
    在这里插入图片描述
  • 订阅的topic分区数发生变化
    在这里插入图片描述

2.2 rebalance的不良影响:

  • 发生rebalance时, consumer group下的所有consumer都会协调在一起共同参与, kafka使用分配策略尽可能达到公平分配;
  • rebalance过程会对consumer group产生非常严重的影响, rebalance的过程中所有的consumer都将停止工作, 直到rebalance完成;

3. 消费者分区消费策略:

3.1 Range范围分配策略:

Range范围分配策略是kafka默认的分配策略, 它可以确保每个消费者消费的分区数量是均衡的;
注意: Range范围分配策略是针对每个topic的

配置
配置消费者的partition.assignment.strategyRangeAssignor

算法公式
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量

前m个消费者消费n+1, 剩余消费者消费n个
在这里插入图片描述
在这里插入图片描述

3.2 RoundRobin轮询策略:

RoundRobinAssignor轮询策略是将消费者组所有消费者以及消费者所订阅的所有的topic的partition按照字典顺序排序(topic和partition的hashcode进行排序), 然后通过轮询方式逐个将分区一次分配给每个消费者;

配置
配置消费者的partition.assignment.strategyRoundRobinAssignor
在这里插入图片描述

3.3 Stricky粘性分配策略:

从kafak 0.11.x开始, 引入此类分配策略; 主要目的:

  • 分区分配尽可能均匀;
  • 在发生rebalance的时候, 分区的分配尽可能与上一次分配保持相同;
    • 没有发生rebalance时, stricky粘性分配策略和RoundRobin分配策略类似;

在这里插入图片描述

如果consumer2崩溃了, 此时需要进行rebalance; 如果是Range分配和轮询分配都会重新进行分配;
在这里插入图片描述
此时, consumer0和consumer1原来消费的分区大多发生了改变;

粘性分配方式:
在这里插入图片描述

粘性分配策略, 保留rebalance之前的分配结果, 只是将原先的consumer2负责的两个分区再均匀分配给consumer0、consumer1; 这样可以明显减少系统资源的浪费, 例如: 之前consumer0、consumer1之前正在消费某几个分区, 但由于rebalance发生, 导致consumer0和consumer1需要重新消费之前正在处理的分区, 导致不必要的系统开销;

4. 副本机制:

副本的目的就是冗余备份, 当某个Broker上的分区数据丢失时, 依然可以保障数据可用; 因为在其他的broker上的副本是可用的;

4.1 producer的ACKs参数

对副本关系较大的就是, producer配置的acks参数;
acks参数表示当生产者生产消息的时候, 写入到副本的要求严格程度; 它决定了生产者如何在性能和可靠性之间的取舍:

配置

conf := sarama.NewConfig()
conf.Producer.RequiredAcks = sarama.WaitForAll
const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

4.2 ack配置为0

在这里插入图片描述

4.3 ack配置为1

在这里插入图片描述
当生产者的ACK配置为1时, 生产者会等待leader副本确认接收后, 才会发送下一条数据, 性能中等;

4.4 ack配置为-1或all

在这里插入图片描述

bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=all
指标单分区单副本(ack=0)单分区单副本(ack=1)单分区单副本(ack=-1/all)
吞吐量16.5W/s9.3W/s7.3W/s
吞吐速率158.19 MB/sec88.78 MB/sec70.18 MB/sec
平均延迟时间192.43 ms346.62 ms438.77 ms
最大延迟时间670.00 ms1003.00 ms1884.00 ms

二、高级API和低级API

1. 高级API

1.1 优点:

  • 不需要执行管理offset, 直接通过zk管理, 也不需要管理分区、副本, 由kafka统一管理;
  • 消费者会自动根据上一次在zk中保存的offset去接着获取数据;
  • 在zk中, 不同的消费者组, 同一个topic记录不同的offset, 这样不同程序读取同一个topic, 不会受到offset的影响;

1.2 缺点:

  • 不能控制offset, 例如: 从指定的位置读取数据;
  • 不能细化控制分区、副本、zk等;

2. 低级API

通过使用低级API, 可以自己来控制offset, 想从哪读, 就从哪读; 而且, 可以自己控制连接分区, 对分区自定义负载均衡;
之前offset是自动保存在ZK中, 使用低级API, 可以将offset不一定使用zk存储, 可以自己来存储offset, 例如: 存储文件、Mysql、内存等;
但是低级API比较复杂, 需要执行控制offset连接到那个分区, 并找到分区的leader;

三、监控工具: kafka-eagle

在这里插入图片描述

1. kafka-eagle介绍:

在开发工作中, 当业务前提不复杂时, 可以使用Kafka命令来进行一些集群的管理工作; 但如果业务变得复杂, 例如: 需要增加group、topic分区, 此时, 再使用命令行就感觉很不方便, 此时, 如果使用一个可视化的工具帮助完成日常的管理工作, 将会大大提高对于Kafka集群管理的效率,而且使用工具来监控消费者在Kafka中消费情况;

早期, 要监控Kafka集群我们可以使用Kafka Monitor以及Kafka Manager, 但随着我们对监控的功能要求、性能要求的提高, 这些工具已经无法满足;

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点, 重新研发的一块开源免费的Kafka集群优秀的监控工具; 它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等;

官网地址: https://www.kafka-eagle.org/

2. 安装Kafka-Eagle

2.1 开启Kafka JMX端口

JMX接口
JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架;
JMX是一套标准的代理和服务, 实际上, 用户可以在任何Java应用程序中使用这些代理和服务实现管理; 很多的一些软件都提供了JMX接口, 来实现一些管理、监控功能;

在启动Kafka的脚本前, 添加:

cd ${KAFKA_HOME}
export JMX_PORT=9988
nohup bin/kafka-server-start.sh config/server.properties &

PS: 如果是docker启动, 需要添加 -e JMX_PORT=9988

2.2 安装Kafka-Eagle

  • 安装JDK, 并配置好JAVA_HOME;
  • 将kafka_eagle上传, 并解压到 /export/server 目录中;
cd cd /export/software/
tar -xvzf kafka-eagle-bin-x.x.tar.gz -C ../server/
cd /export/server/kafka-eagle-bin-x.x/ 
tar -xvzf kafka-eagle-web-x.x-bin.tar.gz
cd /export/server/kafka-eagle-bin-x.x/kafka-eagle-web-x.x
  • 配置 kafka_eagle: 使用vi打开conf目录下的system-config.properties
vim conf/system-config.properties

# 修改第4行,配置kafka集群别名
kafka.eagle.zk.cluster.alias=cluster1
# 修改第5行,配置ZK集群地址
cluster1.zk.list=node1:2181,node2:2181,node3:2181
# 注释第6行
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

# 修改第32行,打开图标统计
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30

# 注释第69行,取消sqlite数据库连接配置
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org

# 修改第77行,开启mys
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  • 配置 kafka_eagle 环境变量;
vim /etc/profile
export KE_HOME=/export/server/kafka-eagle-bin-x.x/kafka-eagle-web-x.x
export PATH=$PATH:$KE_HOME/bin

source /etc/profile
  • 配置JAVA_HOME
    操作系统配置JAVA_HOME的环境变量

PS:
- 配置kafka_eagle和JAVA_HOME均可在bin/ke.sh脚本中完成

...
# Update by Dec 11, 2021 -- add grep KafkaEagle
export JAVA_HOME=/Users/chengfei/Documents/学习/kafka/Home
export KE_HOME=/Users/chengfei/Documents/学习/kafka/kafka-eagle
export MALLOC_ARENA_MAX=1
export KE_JAVA_OPTS="-server -Xmx2g -Xms2g -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
...
  • 修改Kafka eagle可执行权限
cd /export/server/kafka-eagle-bin-x.x/kafka-eagle-web-x.x/bin
chmod +x ke.sh
  • 启动 kafka_eagle:
./ke.sh start
  • 访问Kafka eagle, 默认用户为admin, 密码为: 123456
    http://localhost:8048
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述

3 Kafka度量指标

3.1 topic list

点击Topic下的List菜单, 就可以展示当前Kafka集群中的所有topic;
在这里插入图片描述

指标意义
Brokers Spreadbroker使用率
Brokers Skew分区是否倾斜
Brokers Leader Skewleader partition是否存在倾斜

四、kafka原理

1. 副本:

1.1 副本的Leader与Follower:

在kafka中, 每个topic都可以配置多个分区以及多个副本; 每个分区都有一个leader副本以及0个及以上个follower副本, 在创建topic时, kafak会将每个分区的leader副本均匀的分配在每个borker上;
正常使用kafka时, 是感觉不到leader、follower存在的, 但其实, 所有的读写操作都是由leader处理的, 而所有的follower都复制leader的日志数据文件;
如果leader出现故障, follower就会被选举为新leader;

在这里插入图片描述

所以:

  • kafak中leader负责处理读写操作, 而follower只负责副本数据同步;
  • 如果leader出现故障, 其他follower会被重新选举为leader;
  • follower像一个consumer一样, 拉取leader对应分区的数据, 并保存到日志数据文件中;

kafka会自动将leader均匀地分配在不同的borker中; 如果一个borker有多个分区的leader, 就会出现不均衡的情况, 应该尽量让leader均匀分配;

在这里插入图片描述

  • topic: topic name;
  • partition: 当前行表示的partition ID;
  • Log Size: 数据条数;
  • leader: 当前行表示的partition的leader所在的replica ID;
  • replicas: 当前行表示的partition的replica ID列表;
  • In Sync Replicas: 可用的副本
  • Preferred Leader: 当前为首选leader(没有发生borker挂掉的情况)
  • Under Replicated: 正在同步副本;

1.2 AR、ISR、OSR

在实际环境中, leader有可能会出现一些故障, 所以, kafak一定会选举出新的leader; kafak中, 把follower按照不同的状态分为三类: AR、ISR、OSR;

  • AR: 分区的所有副本; (Assigned Replicas – 已分配的副本);
  • ISR: 所有与leader副本保持一定程度同步的副本(包括leader); (In Sync Replicas – 正在同步的副本)
  • OSR: 与leader副本同步滞后过多的副本(不包括leader); (Out-Of-Sync Replicas)
  • AR = ISR + OSR
  • 正常情况下, 所有的follower副本都应该与leader副本保持同步, 即AR = ISR, OSR为空


当第一个borker挂掉之后;
在这里插入图片描述
重启第一个borker:

在这里插入图片描述
此时. 发现, leader并没有恢复为推荐配置;

1.3 Leader选举

leader对于消息的写入以及读取是非常关键的, 此时有2个问题:

  • kafak如何确定某个partition的副本是leader, 哪个是follower呢?
  • 某个leader崩溃了, 如何快速确定另一个是leader呢? 因为kafak的吞吐量很高、延迟很低; 所以选举leader必须很快
1.3.1 Controller介绍
  • kafak启动时, 会在所有的broker中选择一个controller, controller是高可用的;
  • leader和follower是针对partition副本的, 而controller是针对broker的;
  • 创建topic, 添加分区, 修改副本数据量之类的任务管理都是由controller完成的;
  • kafka分区副本keader的选举, 也是有controller决定的;
1.3.2 controller的选举:
  • 在kafka集群启动的时候, 每个broker都会尝试去Zookeeper上注册称为COntroller(zk临时节点);
  • 但只有一个竞争成功, 其他的broker会注册该节点的监视器;
  • 一个节点状态发生变化, 就可以进行相应的处理;
  • controller也是高可用的, 一旦某个broler崩溃, 其他的broker会重新注册为controller;
1.3.3 找到当前Kafka集群的controller
  • 点击Kafka Tools的「Tools」菜单, 找到「ZooKeeper Brower…」;
  • 点击左侧树形结构的controller节点, 就可以查看到哪个broker是controller了;

在这里插入图片描述

1.3.4 controller选举partition leader
  • 所有partition的leader选举都由controller决定的;
  • controller会将leader的改变直接通过RPC的方式通知需要为此做出响应的broker;
  • controller读取到当前分区ISR, 只要有一个Replica还幸存,就选择这个作为leader, 否则, 则任意选择一个replica作为leader;
  • 如果该partition的所有replica都已经宕机, 则新的leader为-1;

为什么不能通过ZK的方式选举partition的leader?

  • 如果业务很多, kafka集群会有很多partiton;
  • 假设某个broker宕机, 就会出现很多个partition都需要重新选举leader;
  • 如果使用zookeeper选举leader, 会给zookeeper带来巨大的压力; 所以, kafka中leader的选举不能使用zk实现;

1.4 leader的负载均衡

  • kafka中引入[preferred-replica]的概念, 即: 优先的replic;
  • 在ISR列表中, 第一个replica就是preferred-replica;
  • 使用一下脚本可以将preferred-replica设置为leader, 均匀分配每个分区副本的leader;
./kafka-leader-election.sh --bootstrap-server node1:9092 --topic 主题 --partition=1 --election-type preferred
  • --partition: 指定需要重新分配leader的partition编号;

如果某个broker crash之后, 就可能会导致副本的leader分布不均匀, 就是一个broker上存一个topic下不同partition的leader副本;

2. kafka生产、消费数据工作流程

2.1 kafka数据写入的流程

在这里插入图片描述

  • 生产者先从 zookeeper 的 "/brokers/topics/主题名/partitions/分区名/state"节点找到该 partition 的leader
    在这里插入图片描述
  • 生产者在ZK中找到该ID找到对应的broker
    在这里插入图片描述
  • broker进程上的leader将消息写入到本地log中;
  • follower从leader上拉取消息, 写入到本地log, 并向leader发送ACK;
  • leader接收到所有的ISR中的Replica的ACK后, 并向生产者返回ACK;

2.2 kafka数据消费流程

2.2.1 两种消费模式

在这里插入图片描述

  • kafka采用拉取模型, 由消费者自己记录消费状态, 每个消费者互相独立地顺序拉取每个分区的消息;
  • 消费者可以按照任意的顺序消费消息; 比如, 消费者可以重置到旧的偏移量, 重新处理之前已经消费过的消息; 或者直接跳到最近的位置, 从当前的时刻开始消费;
2.2.2 Kafka消费数据流程

在这里插入图片描述

  • 每个consumer都可以根据分配策略(默认RangeAssignor), 获得要消费的分区;
  • 获取到consumer对应的offset(默认从ZK中获取上一次消费的offset);
  • 找到该分区的leader, 拉取数据;
  • 消费者提交offset;

3. kafka的数据存储形式:

在这里插入图片描述

  • 一个topic由多个分区组成;
  • 一个分区(partition)由多个segment(段)组成;
  • 一个segment(段)由多个文件组成(log-数据、index-稀疏索引、timeindex);

3.1 存储日志

  • kafka中的数据是保存在kafka_xxx-xxx/data中;
  • 消息是保存在以topic-partitionID 的文件家中
  • 数据文件夹中包含一下内容:
    在这里插入图片描述
    这些分别对应:
文件名说明
00000000000000000000.index索引文件,根据offset查找数据就是通过该索引文件来操作的
00000000000000000000.log日志数据文件
00000000000000000000.timeindex时间索引
leader-epoch-checkpoint持久化每个partition leader对应的LEO(log end offset、日志文件中下一条待写入消息的offset)
  • 每个日志文件的文件名为起始偏移量, 因为每个分区的起始偏移量是0, 所以, 分区的日志文件都以0000000000000000000.log开始;
  • 默认的每个日志文件最大为log.segment.bytes =1024*1024*1024=1G
  • 为了简化根据offset查找消息, Kafka日志文件名设计为开始的偏移量

创建一个topic: test_10m, 该topic每个日志数据文件最大为10M

bin/kafka-topics.sh --create --zookeeper node1.cn:9092 --topic test_10m --replication-factor 2 --partitions 3 --config segment.bytes=10485760

3.2 读取消息

kafka日志的存储格式:
在这里插入图片描述

  • 根据 offset, 首先需要找到存储数据的segment段(ps: offset指分区的全局偏移量);
  • 然后根据这个"全局分区offset"找到相对于文件的"segment段offset";

在这里插入图片描述

  • 最后在根据 segment段offset"读取消息;
  • 为了提高查询效率, 每个文件都会维护对应的范围内存, 查找的时候就是使用简单的二分查找;

3.3 删除消息

  • kafka中, 消息是会被定期清理的, 一次删除一个segment段的日志文件;
  • kafka的日志管理器, 会根据kafke的配置, 来决定那些文件可以被删除;

4. 消息不丢失机制:

4.1 broker数据不丢失:

生产者通过分区的leader写入数据后, 所有在ISR中follower都会从leader中赋值数据, 这样, 可以确保及时leader崩溃了, 其他的follower的数据仍然是可用的;

4.2 生产者数据不丢失:

  • 生产者连接leader写入数据时, 可以通过ACK机制来确保数据已成功写入;
  • ACK机制有三个可选配置:
    • -1: 表示所有的节点都收到数据(leader和follower);
    • 1: 表示leader收到数据;
    • 0: 生产者只负责生产数据, 不关心数据是否丢失(可能会丢失数据, 但是性能最好);
  • 生产者可以采用同步异步两种方式发送结果:
    • 同步: 发送一批数据给kafka后, 等待kafka返回结果;
    • 异步: 发送一批数据给kafka后, 只是提供一个回调函数;

ps: 如果broker迟迟不给ACK, 而buffer又满了, 可以设置是否直接清空buffer中的数据

4.3 消费者数据不丢失:

在消费者消费数据的时候, 只要每个消费者记录好offset值即可, 就能保证数据不丢失;

4.3.1 一般消费丢失情况:
  • 消费者从ZK中拉取offset, 开始读取消息;
  • 在业务程序中处理这条消息, 并将处理后的结果写入到存储中;
  • 在写入存储的时候, 出现了故障, 导致写入失败;
  • 但是, 消费者提交了offset到ZK中;
  • 下一次消费就会从新的offset开始消费, 导致了数据丢失;

消息传递的语义性

  • At-most once : 最多一次 (只管把数据消费到, 不管有没有成功, 可能会有数据丢失);
  • At-least once: 最少一次(有可能会出现重复消费)
  • Exactly-Once: 仅有一次(事务性的保障, 保证消息有且仅被处理一次 )
4.3.2 重复消费:
  • 根据offset来消费partition中的数据;
  • 消费者业务程序处理数据, 并将结果成功写入到DB中;
  • 将offset提交到zookeeper, 但是是失败的;
  • 此时会出现重复消费;

使用kafka的事务没法解决"只消费一次";
kafka的事务是针对当前kafka集群中的消费者、生产者操作的;

解决方案:

  • 通过lowlevel API从mysql中读取offset;
  • 通过mysql的事务, 将写入到mysql的数据和offset放在一个mysql事务里, 要么全部成功, 要么全部失败, 就可以实现"只消费一次";

5. 数据积压:

kafka消费者消费数据的速度是非常快的, 但如果由于处理kafka消息时, 由于有一些外部I/O或者网络拥堵, 就会造成kafak中的数据积压; 如果数据一致积压, 会导致出来的数据的实时性受到较大影响;

5.1 使用工具查看积压情况

5.1.1 kafka Tools

在这里插入图片描述

5.1.2 kafka-Eagle:

在这里插入图片描述

5.2 解决数据积压问题:

当kafka出现数据积压时, 首先要找到数据积压的原因

常见的场景:

数据持久化时出错;
消费者超时失败, 导致数据消费缓慢; ⇒ 将消费时间修改的大一些;

在实际生产中, 要有监控系统, 如果出现这种情况, 需要尽快处理;
虽然 Spark Streaming / Flink等流式处理中间件可以实现背压, 但是数据积累太多一定会对实时系统的实时性造成影响;

五、kafka中数据清理(Log Deletion):

kafka的消息存储在磁盘中, 为了控制磁盘占用空间, kafka需要不断地对过去的一些消息进行清理; kafka的每个分区都有很多的日志文件, 这样也是为了方便的进行日志清理;
在kafka中, 提供两种日志清理方式:

  • 日志删除(Log Deletion): 按照指定的策略直接删除不符合条件的日志;
  • 日志压缩(Log Compaction): 按照消息的key进行整合, 有相同key的但有不同value值, 只保留最后一个版本;

在kafka的broker或topic中配置

配置项配置值说明
log.cleaner.enabletrue(默认)开启自动清理日志功能
log.cleanup.policydelete(默认)删除日志
log.cleanup.policyompaction压缩日志
log.cleanup.policydelete,compaction同时支持删除和压缩

1. 日志删除

日志删除是以段(segment日志)为单位来进行定期清理的;

1.1 定时日志删除任务

kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件, 这个周期可以通过broker端参数log.retention.check.interval.ms来配置, 默认为 300 000, 即 5分钟;
当前日志分段的保留策略有3中:

  • 基于时间的保留策略;
  • 基于日志大小的保留策略;
  • 基于日志起始偏移量的保存策略;

在这里插入图片描述

1.2基于时间的保留策略

如果kafka中的消息超过指定的阈值, 就会将日志进行自动化清理:

  • log.retention.hours
  • log.retention.minutes
  • log.retention.ms

其中, 优先级为 log.retention.ms > log.retention.minutes > log.retention.hours;
默认情况, 在borker中, 配置为:

log.retention.hours=168

也就是, 默认日志的保留时间为168小时, 即7天;

1.3 删除日志分段:

删除日志分段是:

  • 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段, 以保证没有线程对这些日志分段进行读取操作;
  • 将日志分段文件添加上.deleted的后缀(包括日志分段对应的索引文件)
  • kafka的后台定时任务会定期删除这些.deleted为后缀的文件, 这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置, 默认为 60000, 即 1分钟;
1.3.1 设置 topic 5秒删除一次:
  • 为了方便观察, 设置段文件的大小为1M
    在这里插入图片描述

key: segment.bytes
value: 1048576

在这里插入图片描述

  • 设置topic的删除策略

key: retention.ms
value: 5000

在这里插入图片描述

  • 尝试往topic中添加一些数据, 等待一会, 观察日志的删除情况; 发现, 日志会定期被标记为删除, 然后被删除;

1.4 基于日志大小的保留策略

日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合;
可以通过broker端参数 log.retention.bytes来配置, 默认值为-1, 表示无穷大; 如果超过该大小, 会自动将超出部分删除;

log.retention.bytes 配置的是日志文件的总大小, 而不是单个的日志分段的大小; 一个日志文件包含多个日志分段;

1.5 基于日志起始偏移量保留策略

每个segment日志都有它的起始偏移量, 如果起始偏移量小于 logStartOffset, 那么这些日志文件将会标记为删除;

2. 日志压缩(Log Compaction)

Log Compaction是默认的日志删除之外的清理过时数据的方式; 它会将相同的key对应的数据只保留一个版本;

在这里插入图片描述

  • Log Compaction执行后, offset将不再连续, 但依然可以查询Segment;
  • Log Compaction执行前后, 日志分段中的每条消息偏移量保持不变; Log Compaction会生成一个新的Segment文件;
  • Log Compaction是针对key的, 在使用的时候注意每个消息的key不为空
  • 基于Log Compaction可以保留key的最新更新, 可以基于Log Compaction来恢复消费者的最新状态;

六、Kafka配额限速机制(Quotas)

生产者和消费者以极高的速度生产/消费大量数据或产生请求, 从而占用broker上的全部资源, 造成网络IO饱和; 有了配额(Quotas)就可以避免这些问题;
Kafka支持配额管理, 从而可以对Producer和Consumer的produce&fetch操作进行流量限制, 防止个别业务压爆服务器;

1. 限制producer端速率

为所有client id设置默认值, 以下为所有producer程序设置其TPS不超过1MB/s, 即1048576‬/s;
命令如下:

bin/kafka-configs.sh --zookeeper node1:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试, 观察生产消息的速率:

bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1

2. 限制consumer端速率

为指定的topic进行限速, 以下为所有consumer程序设置topic速率不超过1MB/s, 即1048576/s;
命令如下:

bin/kafka-configs.sh --zookeeper node1:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试:

bin/kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test --fetch-size 1048576 --messages 500000

3. 取消Kafka的Quota配置

使用以下命令, 删除Kafka的Quota配置:
取消生产者:

bin/kafka-configs.sh --zookeeper node1:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default

取消消费者

bin/kafka-configs.sh --zookeeper node1:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐