1 Kafka Broker 工作流程

1.1 Zookeeper 存储的 Kafka 信息

[vagrant@localhost bin]$ ./zkCli.sh 
....

[zk: localhost:2181(CONNECTED) 1] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]

几个关键数据:

/kafka/brokers/ids

记录有哪些服务器

[zk: localhost:2181(CONNECTED) 0] ls /kafka/brokers/ids
[0, 1, 2]

[zk: localhost:2181(CONNECTED) 1] get /kafka/brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.3.51:9092"],"jmx_port":9999,"features":{},"host":"192.168.3.51","timestamp":"1647170725662","port":9092,"version":5}

[zk: localhost:2181(CONNECTED) 2] get /kafka/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.3.52:9092"],"jmx_port":9999,"features":{},"host":"192.168.3.52","timestamp":"1646125425746","port":9092,"version":5}

[zk: localhost:2181(CONNECTED) 4] get /kafka/brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.3.53:9092"],"jmx_port":9999,"features":{},"host":"192.168.3.53","timestamp":"1646125296667","port":9092,"version":5}

/kafka/brokers/topics/first/partitions/0/state

记录谁是Leader,有哪些服务器可用

[zk: localhost:2181(CONNECTED) 7] ls /kafka/brokers/topics
[__consumer_offsets, first]

[zk: localhost:2181(CONNECTED) 17] ls /kafka/brokers/topics/first/partitions
[0, 1, 2]

[zk: localhost:2181(CONNECTED) 13] get /kafka/brokers/topics/first/partitions/0/state
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}

[zk: localhost:2181(CONNECTED) 14] get /kafka/brokers/topics/first/partitions/1/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2]}

[zk: localhost:2181(CONNECTED) 15] get /kafka/brokers/topics/first/partitions/2/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}

/kafka/controller

辅助选举Leader

[zk: localhost:2181(CONNECTED) 20] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1647170725901"}

/kafka/consumers

0.9版本之前用于保存offset信息

0.9版本之后offset存储在kafka主题中,/kafka/brokers/topics/__consumer_offsets/partitions

在这里插入图片描述

1.2 Kafka Broker 总体工作流程

在这里插入图片描述

1.3 Broker 重要参数

参数名称 描述
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hours Kafka 中数据保存的时间,默认 7 天。
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

2 节点服役和退役

准备一台新机器:192.168.3.54

访问zk,查看first存储的信息

[vagrant@localhost bin]$ ./zkCli.sh
[zk: localhost:2181(CONNECTED) 2] get /kafka/config/topics/first
{"version":1,"config":{}}

查看first 主题的详细信息:

/home/vagrant/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic first
Topic: first    TopicId: zeecMSE5QSyIWnZWGxhJZQ PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: first    Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: first    Partition: 2    Leader: 0       Replicas: 0     Isr: 0

服役新节点

2.1 创建一个要均衡的主题

vi topics-to-move.json
{
    "topics": [
        {
            "topic": "first"
        }
    ],
    "version": 1
}

2.2 生成一个负载均衡的计划

bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

Current partition replica assignment
{
    "version": 1,
    "partitions": [
        {
            "topic": "first",
            "partition": 0,
            "replicas": [
                1
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "first",
            "partition": 1,
            "replicas": [
                2
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "first",
            "partition": 2,
            "replicas": [
                0
            ],
            "log_dirs": [
                "any"
            ]
        }
    ]
}

Proposed partition reassignment configuration
{
    "version": 1,
    "partitions": [
        {
            "topic": "first",
            "partition": 0,
            "replicas": [
                1
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "first",
            "partition": 1,
            "replicas": [
                2
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "first",
            "partition": 2,
            "replicas": [
                3
            ],
            "log_dirs": [
                "any"
            ]
        }
    ]
}

2.3 创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)

vi increase-replication-factor.json

{
    "version": 1,
    "partitions": [
        {
            "topic": "first",
            "partition": 0,
            "replicas": [
                1
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "first",
            "partition": 1,
            "replicas": [
                2
            ],
            "log_dirs": [
                "any"
            ]
        },
        {
            "topic": "first",
            "partition": 2,
            "replicas": [
                3
            ],
            "log_dirs": [
                "any"
            ]
        }
    ]
}

2.4 执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --reassignment-json-file increase-replication-factor.json --execute

Current partition replica assignment

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"first","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":2,"replicas":[0],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-0,first-1,first-2

2.5 验证副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --reassignment-json-file increase-replication-factor.json --verify

Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

再次查看first 主题的详细信息:

/home/vagrant/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic first
Topic: first	TopicId: zeecMSE5QSyIWnZWGxhJZQ	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: first	Partition: 1	Leader: 2	Replicas: 2	Isr: 2
	Topic: first	Partition: 2	Leader: 3	Replicas: 3	Isr: 3

退役旧节点

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

类似服役新节点操作,此处不做赘述。

3 Kafka 副本

3.1 副本基本信息

  1. Kafka 副本作用:提高数据可靠性。
  2. Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  3. Kafka 中副本分为:LeaderFollower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
  4. Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
    AR = ISR + OSR
    ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
    OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。,表示 Follower 与 Leader 副本同步时,延迟过多的副本。

3.2 Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

Controller 的信息同步工作是依赖于 Zookeeper 的。

在这里插入图片描述

  1. 创建一个新的 topic,4 个分区,4 个副本
bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --create --topic second --partitions 4 --replication-factor 4

Created topic second.
  1. 查看 Leader 分布情况
bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic second 

Topic: second	TopicId: -DB-j_GCR0qOiVOkMGdzLg	PartitionCount: 4	ReplicationFactor: 4	Configs: segment.bytes=1073741824
	Topic: second	Partition: 0	Leader: 0	Replicas: 0,3,1,2	Isr: 0,3,1,2
	Topic: second	Partition: 1	Leader: 2	Replicas: 2,1,0,3	Isr: 2,1,0,3
	Topic: second	Partition: 2	Leader: 3	Replicas: 3,0,2,1	Isr: 3,0,2,1
	Topic: second	Partition: 3	Leader: 1	Replicas: 1,2,3,0	Isr: 1,2,3,0
  1. 停止掉 192.168.3.54(id=3) 的 kafka 进程,并查看 Leader 分区情况
bin/kafka-server-stop.sh
[vagrant@localhost kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic second 
[2022-03-24 14:02:28,273] WARN [AdminClient clientId=adminclient-1] Connection to node -4 (/192.168.3.54:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: second	TopicId: -DB-j_GCR0qOiVOkMGdzLg	PartitionCount: 4	ReplicationFactor: 4	Configs: segment.bytes=1073741824
	Topic: second	Partition: 0	Leader: 0	Replicas: 0,3,1,2	Isr: 0,1,2
	Topic: second	Partition: 1	Leader: 2	Replicas: 2,1,0,3	Isr: 2,1,0
	Topic: second	Partition: 2	Leader: 0	Replicas: 3,0,2,1	Isr: 0,2,1
	Topic: second	Partition: 3	Leader: 1	Replicas: 1,2,3,0	Isr: 1,2,0

3.3 Leader 和 Follower 故障处理细节

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。

3.3.1 Follower故障处理

在这里插入图片描述

Follower发生故障后会被临时踢出ISR,这个期间Leader和Follower继续接收数据;待该Follower恢复后,Follower会读取本地磁盘记录的
上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。等该Follower的LEO大于等于该Partition的HW,即
Follower追上Leader之后,就可以重新加入ISR了。

3.3.2 Leader故障处理

在这里插入图片描述
Leader发生故障之后,会从ISR中选出一个新的Leader,为保证多个副本之间的数据一致性,其余的Follower会先
将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

3.4 分区副本分配

如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka底层如何分配存储副本呢?
1)创建 16 分区,3 个副本

bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --create --partitions 16 --replication-factor 3 --topic third

Created topic third.

查看分区和副本情况。

bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic third

Topic: third    TopicId: RUzXbc22ThiH6usi2vux5Q PartitionCount: 16      ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: third    Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: third    Partition: 1    Leader: 0       Replicas: 0,2,3 Isr: 0,2,3
        Topic: third    Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: third    Partition: 3    Leader: 3       Replicas: 3,1,0 Isr: 3,1,0
        Topic: third    Partition: 4    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: third    Partition: 5    Leader: 0       Replicas: 0,3,1 Isr: 0,3,1
        Topic: third    Partition: 6    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: third    Partition: 7    Leader: 3       Replicas: 3,0,2 Isr: 3,0,2
        Topic: third    Partition: 8    Leader: 1       Replicas: 1,3,0 Isr: 1,3,0
        Topic: third    Partition: 9    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: third    Partition: 10   Leader: 2       Replicas: 2,0,3 Isr: 2,0,3
        Topic: third    Partition: 11   Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
        Topic: third    Partition: 12   Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: third    Partition: 13   Leader: 0       Replicas: 0,2,3 Isr: 0,2,3
        Topic: third    Partition: 14   Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: third    Partition: 15   Leader: 3       Replicas: 3,1,0 Isr: 3,1,0

3.5 手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

需求:创建一个新的topic,4个分区,两个副本,名称为three。将 该topic的所有副本都存储到broker0和broker1两台服务器上。

bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --create --partitions 4 --replication-factor 2 --topic three

Created topic three.

查看分区副本存储情况

bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic three

Topic: three    TopicId: DWrKh6rKRJ6Y2NEtKgCkOw PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: three    Partition: 0    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: three    Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: three    Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: three    Partition: 3    Leader: 2       Replicas: 2,3   Isr: 2,3

创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)

vi increase-replication-factor.json

{
    "version": 1,
    "partitions": [
        {
            "topic": "three",
            "partition": 0,
            "replicas": [
                0,
                1
            ]
        },
        {
            "topic": "three",
            "partition": 1,
            "replicas": [
                0,
                1
            ]
        },
        {
            "topic": "three",
            "partition": 2,
            "replicas": [
                1,
                0
            ]
        },
        {
            "topic": "three",
            "partition": 3,
            "replicas": [
                1,
                0
            ]
        }
    ]
}

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --reassignment-json-file increase-replication-factor.json --execute

Current partition replica assignment

{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"three","partition":1,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"three","partition":2,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"three","partition":3,"replicas":[2,3],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3

验证副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --reassignment-json-file increase-replication-factor.json --verify

Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic three

查看分区副本存储情况

bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic three

Topic: three    TopicId: DWrKh6rKRJ6Y2NEtKgCkOw PartitionCount: 4       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: three    Partition: 0    Leader: 0       Replicas: 0,1   Isr: 1,0
        Topic: three    Partition: 1    Leader: 1       Replicas: 0,1   Isr: 1,0
        Topic: three    Partition: 2    Leader: 0       Replicas: 1,0   Isr: 0,1
        Topic: three    Partition: 3    Leader: 1       Replicas: 1,0   Isr: 0,1

3.6 Leader Partition 负载平衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某 些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

在这里插入图片描述

  • auto.leader.rebalance.enable,默认是true。自动Leader Partition 平衡
  • leader.imbalance.per.broker.percentage,默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
  • leader.imbalance.check.interval.seconds,默认值300秒。检查leader负载是否平衡的间隔时间。

下面拿一个主题举例说明,假设集群只有一个主题如下图所示:

Topic: second	TopicId: -DB-j_GCR0qOiVOkMGdzLg	PartitionCount: 4	ReplicationFactor: 4	Configs: segment.bytes=1073741824
	Topic: second	Partition: 0	Leader: 0	Replicas: 1,0,3,2	Isr: 0,3,1,2
	Topic: second	Partition: 1	Leader: 2	Replicas: 2,1,0,3	Isr: 2,1,0,3
	Topic: second	Partition: 2	Leader: 3	Replicas: 0,3,2,1	Isr: 3,0,2,1
	Topic: second	Partition: 3	Leader: 1	Replicas: 3,2,1,0	Isr: 1,2,3,0

针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是Leader节点,所以不平衡数加1,AR副本总数是4,所以broker0节点不平衡率为1/4>10%,需要再平衡。

broker2和broker3节点和broker0不平衡率一样,需要再平衡。

Broker1的不平衡数为0,不需要再平衡。

参数名称 描述
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。

3.7 增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

创建 topic

bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --create --partitions 3 --replication-factor 1 --topic four

Created topic four.
bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic four

Topic: four     TopicId: 3BdiSOnBSjiM9gXsHEpfpQ PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: four     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: four     Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: four     Partition: 2    Leader: 3       Replicas: 3     Isr: 3

手动增加副本存储

(1)创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。

vi increase-replication-factor.json

{
    "version": 1,
    "partitions": [
        {
            "topic": "four",
            "partition": 0,
            "replicas": [
                0,
                1,
                2
            ]
        },
        {
            "topic": "four",
            "partition": 1,
            "replicas": [
                0,
                1,
                2
            ]
        },
        {
            "topic": "four",
            "partition": 2,
            "replicas": [
                0,
                1,
                2
            ]
        }
    ]
}

(2)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --reassignment-json-file increase-replication-factor.json --execute

Current partition replica assignment

{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"four","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"four","partition":2,"replicas":[3],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for four-0,four-1,four-2

再次查看

bin/kafka-topics.sh --bootstrap-server 192.168.3.51:9092,192.168.3.52:9092,192.168.3.53:9092,192.168.3.54:9092 --describe --topic four

Topic: four     TopicId: 3BdiSOnBSjiM9gXsHEpfpQ PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: four     Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,2,1
        Topic: four     Partition: 1    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: four     Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,2,1

4 文件存储

4.1 文件存储机制

Topic 数据的存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

在这里插入图片描述

Topic 数据到底存储在什么位置?

启动生产者,并发送消息

# 创建一个新主题five
> bin/kafka-topics.sh --bootstrap-server 192.168.3.52:9092 --create --partitions 1 --replication-factor 1 --topic five
# 启动一个生产者
> bin/kafka-console-producer.sh --bootstrap-server 192.168.3.52:9092 --topic five
>hell01
>hello2  
>hello3
>hello4
>hello5
> cd /home/vagrant/kafka_2.12-3.0.0/data/five-0

> ll

total 12
-rw-r--r--. 1 root root 10485760 Mar 15 06:13 00000000000000000000.index
-rw-r--r--. 1 root root      222 Mar 15 06:26 00000000000000000000.log
-rw-r--r--. 1 root root 10485756 Mar 15 06:13 00000000000000000000.timeindex
-rw-r--r--. 1 root root        8 Mar 15 06:13 leader-epoch-checkpoint
-rw-r--r--. 1 root root       43 Mar 15 06:13 partition.metadata

# 查看 index 日志
> kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000005.index

Dumping 00000000000000000005.index
offset: 63 position: 4102
offset: 118 position: 8246
# 查看 log
> kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000005.log

Dumping 00000000000000000005.log
Starting offset: 5
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1648298105592 size: 74 magic: 2 compresscodec: none crc: 2922802256 isvalid: true
baseOffset: 6 lastOffset: 7 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 74 CreateTime: 1648298105628 size: 87 magic: 2 compresscodec: none crc: 2510574599 isvalid: true
baseOffset: 8 lastOffset: 11 count: 4 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 161 CreateTime: 1648298105639 size: 116 magic: 2 compresscodec: none crc: 2793496984 isvalid: true
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 277 CreateTime: 1648298105642 size: 75 magic: 2 compresscodec: none crc: 1332889768 isvalid: true
baseOffset: 13 lastOffset: 13 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 352 CreateTime: 1648298105645 size: 75 magic: 2 compresscodec: none crc: 2515862546 isvalid: true
baseOffset: 14 lastOffset: 14 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 427 CreateTime: 1648298105648 size: 75 magic: 2 compresscodec: none crc: 3992299684 isvalid: true
baseOffset: 15 lastOffset: 15 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 502 CreateTime: 1648298105651 size: 75 magic: 2 compresscodec: none crc: 4170130861 isvalid: true
baseOffset: 16 lastOffset: 16 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 577 CreateTime: 1648298105654 size: 75 magic: 2 compresscodec: none crc: 2307576682 isvalid: true
baseOffset: 17 lastOffset: 17 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 652 CreateTime: 1648298105657 size: 75 magic: 2 compresscodec: none crc: 3373951559 isvalid: true

Log文件和Index文件详解

在这里插入图片描述
说明:日志存储参数配置

参数 描述
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。

4.2 文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认 7 天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略有 delete 和 compact 两种。
1)delete 日志删除:将过期数据删除

  • log.cleanup.policy = delete 所有数据启用删除策略
    (1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
    (2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
    log.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

在这里插入图片描述
以 segment 中所有记录中的最大时间戳作为该文件时间戳,最大的时间戳过期整个文件才过期。

2)compact 日志压缩

compact日志压缩:对于相同key的不同value值,只保留最后一个版本。

  • log.cleanup.policy = compact 所有数据启用压缩策略

压缩之前的数据

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

5 高效读写数据

  1. Kafka 本身是分布式集群,可以采用分区技术,并行度高
  2. 读数据采用稀疏索引,可以快速定位要消费的数据
  3. 顺序写磁盘
    Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

在这里插入图片描述

  1. 页缓存 + 零拷贝技术
    零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。
    PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。

在这里插入图片描述

参数 描述
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐