Kafka Broker 总体工作流程和文件存储

  • 1.broker启动后在zk上注册
  • 2.controller谁先注册,由哪个controller决定leader选举
  • 3.由选举出来的Controller监听brokers节点变化
  • 4.Controller决定leader选举
  • 5.Controller将节点的信息上传到zk
  • 6.其他Controller从zk上同步相关信息
  • 7.如果Broker中的leader挂了
  • 8.Controller监听到变化
  • 9.获取zk上的ISR(存活的followers)
  • 10.选举出新的leader,在ISR中存活且在AR中拍在前面的优先

具体流程如下图:
在这里插入图片描述

Kafka服役新节点

  • 准备hadoop105机器,修改server.properties的broker.id 为 3
  • 单独启动 hadoop105 中的 kafka
bin/kafka-server-start.sh -daemon ./config/server.properties
  • 创建一个要均衡的主题
 vim topics-to-move.json
 #输入下面的内容
 {
 	"topics": [
 		{"topic": "first"}
 	],
 	"version": 1
}
  • 生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
  • 创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)
vim increase-replication-factor.json
#增加刚刚kafka生成的计划
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}
  • 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  • 验证
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

退役旧节点

  • 执行负载均衡操作
 vim topics-to-move.json
 #输入下面的内容
 {
 	"topics": [
 		{"topic": "first"}
 	],
 	"version": 1
}
  • 生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
  • 创建副本存储计划
vim increase-replication-factor.json
#增加刚刚kafka生成的计划
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[1,2,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[2,0,1],"log_dirs":["any","any","any"]}]}
  • 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  • 验证

Kafka 副本

  • Kafka 副本作用:提高数据可靠性。
  • Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
    • AR = ISR + OSR
    • ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。
    • OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。

kafka Leader 和 Follower 故障处理细节

follower故障

  • Follower发生故障后会被临时踢出ISR
  • 这个期间Leader和其他Follower继续接收数据
  • 待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
    • HW(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
    • HW(High Watermark):所有副本中最小的LEO
  • 等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

在这里插入图片描述

Leader故障处理细节

  • Leader发生故障之后,会从ISR中选出一个新的Leader
  • 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

手动调整分区副本存储

​ 在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副

本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

调整步骤:

  • 创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
vim increase-replication-factor.json

#输入以下内容
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}] 
}
  • 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
  • 查看是否更改成功
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three

# 更改前
Topic: three    TopicId: NPN8zL2fTWuRvaSaWFb7zQ PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: three    Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: three    Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: three    Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: three    Partition: 3    Leader: 2       Replicas: 2,0   Isr: 2,0

# 更改后
Topic: three    TopicId: NPN8zL2fTWuRvaSaWFb7zQ PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: three    Partition: 0    Leader: 0       Replicas: 0,1   Isr: 1,0
        Topic: three    Partition: 1    Leader: 1       Replicas: 0,1   Isr: 1,0
        Topic: three    Partition: 2    Leader: 0       Replicas: 1,0   Isr: 0,1
        Topic: three    Partition: 3    Leader: 1       Replicas: 1,0   Isr: 0,1

kafka的文件存储

kafka的文件存储机制

​ Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment(每个segment大小1G)。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

在这里插入图片描述

index 文件和 log 文件详解

  • 怎样根据offset找到记录在log文件中的位置

    • 根据目标offset定位到segment文件(segment文件名末尾是offset)
    • 在index文件中找到小于等于目标offset的最大offset对应的索引项
    • 根据索引项在log文件中找到对应的目标

    如下面的例子:找offset=600的record。

    • 首先根据offset=600找到位于segment1中
    • 然后在segment1的index文件中,找到比600小的offset587,对应position6410
    • 根据position6410在log中找到6410的位置开始往下找到offset=600的记录
  • Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

    • log.retention.hours,最低优先级小时,默认 7 天
    • log.retention.minutes,分钟。
    • log.retention.ms,毫秒。

    优先级:毫秒>分钟>小时(如设置毫秒则设置的分钟将失效)

    • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。(默认每5分钟判断数据有没有超时)
  • 超时后,Kafka 中提供的日志清理策略有 delete 和 compact 两种,默认delete

    delete 日志删除:将过期数据删除(log.cleanup.policy = delete 所有数据启用删除策略)

    • 基于时间的delete策略。以一个segment中最大的时间作为这个segment的时间(如果一个segment,一部分数据超过7天,一部分数据没超过7天,则不会删除这个segment)
    • 基于大小的delete策略,超过设置的日志大小则删除最早的segment。(一般不用)

    compact 日志压缩

    • 对于相同key的不同value值,只保留最后一个版本。

kafka读写速度快的原因

  • Kafka 本身是分布式集群,可以采用分区技术,并行度高
  • 读数据采用稀疏索引,可以快速定位要消费的数据
  • 顺序写磁盘。记录追加到log文件中
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐