4.Kafka Broker 总体工作流程和文件存储
Kafka Broker 总体工作流程和文件存储1.broker启动后在zk上注册2.controller谁先注册,由哪个controller决定leader选举3.由选举出来的Controller监听brokers节点变化4.Controller决定leader选举5.Controller将节点的信息上传到zk6.其他Controller从zk上同步相关信息7.如果Broker中的leader挂
Kafka Broker 总体工作流程和文件存储
- 1.broker启动后在zk上注册
- 2.controller谁先注册,由哪个controller决定leader选举
- 3.由选举出来的Controller监听brokers节点变化
- 4.Controller决定leader选举
- 5.Controller将节点的信息上传到zk
- 6.其他Controller从zk上同步相关信息
- 7.如果Broker中的leader挂了
- 8.Controller监听到变化
- 9.获取zk上的ISR(存活的followers)
- 10.选举出新的leader,在ISR中存活且在AR中拍在前面的优先
具体流程如下图:
Kafka服役新节点
- 准备hadoop105机器,修改server.properties的broker.id 为 3
- 单独启动 hadoop105 中的 kafka
bin/kafka-server-start.sh -daemon ./config/server.properties
- 创建一个要均衡的主题
vim topics-to-move.json
#输入下面的内容
{
"topics": [
{"topic": "first"}
],
"version": 1
}
- 生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
- 创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)
vim increase-replication-factor.json
#增加刚刚kafka生成的计划
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}
- 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
- 验证
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
退役旧节点
- 执行负载均衡操作
vim topics-to-move.json
#输入下面的内容
{
"topics": [
{"topic": "first"}
],
"version": 1
}
- 生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
- 创建副本存储计划
vim increase-replication-factor.json
#增加刚刚kafka生成的计划
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[1,2,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[2,0,1],"log_dirs":["any","any","any"]}]}
- 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
- 验证
Kafka 副本
- Kafka 副本作用:提高数据可靠性。
- Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
- Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
- Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
- AR = ISR + OSR
- ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。
- OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。
kafka Leader 和 Follower 故障处理细节
follower故障
- Follower发生故障后会被临时踢出ISR
- 这个期间Leader和其他Follower继续接收数据
- 待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
- HW(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
- HW(High Watermark):所有副本中最小的LEO
- 等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。
Leader故障处理细节
- Leader发生故障之后,会从ISR中选出一个新的Leader
- 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
手动调整分区副本存储
在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副
本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
调整步骤:
- 创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
vim increase-replication-factor.json
#输入以下内容
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
- 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
- 查看是否更改成功
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
# 更改前
Topic: three TopicId: NPN8zL2fTWuRvaSaWFb7zQ PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: three Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: three Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: three Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: three Partition: 3 Leader: 2 Replicas: 2,0 Isr: 2,0
# 更改后
Topic: three TopicId: NPN8zL2fTWuRvaSaWFb7zQ PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: three Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0
Topic: three Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: three Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: three Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1
kafka的文件存储
kafka的文件存储机制
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment(每个segment大小1G)。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。
index 文件和 log 文件详解
-
怎样根据offset找到记录在log文件中的位置
- 根据目标offset定位到segment文件(segment文件名末尾是offset)
- 在index文件中找到小于等于目标offset的最大offset对应的索引项
- 根据索引项在log文件中找到对应的目标
如下面的例子:找offset=600的record。
- 首先根据offset=600找到位于segment1中
- 然后在segment1的index文件中,找到比600小的offset587,对应position6410
- 根据position6410在log中找到6410的位置开始往下找到offset=600的记录
-
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
- log.retention.hours,最低优先级小时,默认 7 天
- log.retention.minutes,分钟。
- log.retention.ms,毫秒。
优先级:毫秒>分钟>小时(如设置毫秒则设置的分钟将失效)
- log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。(默认每5分钟判断数据有没有超时)
-
超时后,Kafka 中提供的日志清理策略有 delete 和 compact 两种,默认delete
delete 日志删除:将过期数据删除(log.cleanup.policy = delete 所有数据启用删除策略)
- 基于时间的delete策略。以一个segment中最大的时间作为这个segment的时间(如果一个segment,一部分数据超过7天,一部分数据没超过7天,则不会删除这个segment)
- 基于大小的delete策略,超过设置的日志大小则删除最早的segment。(一般不用)
compact 日志压缩
- 对于相同key的不同value值,只保留最后一个版本。
kafka读写速度快的原因
- Kafka 本身是分布式集群,可以采用分区技术,并行度高
- 读数据采用稀疏索引,可以快速定位要消费的数据
- 顺序写磁盘。记录追加到log文件中
更多推荐
所有评论(0)