一、Kafka概述

        Kafka作为一个商业级消息中间件 ,发布和订阅记录流,它类似于一个消息队列

先了解下Kafka的基本原理,然后通过对kakfa的存储机制、复制原理、同步原理、可靠性和持久性保证等等一步步对其可靠性进行分析

Kafka 架构图

 一、Kafka 中的术语

1、Broker

        每个kafka server称为一个Broker,多个borker组成kafka cluster。一个机器上可以部署一个或者多个Broker(一般一个Broker就是一个节点),这多个Broker连接到相同的ZooKeeper就组成了Kafka集群。

 2 、主题Topic

        主题是一种分类或发布的一系列记录的名义上的名字。Kafka的主题始终是支持多用户订阅的; 也就是说,一个主题可以有零个,一个或多个消费者订阅写入的数据。

Topic 与broker

一个Broker上可以创建一个或者多个Topic。同一个topic可以在同一集群下的多个Broker中分布。

#创建主题时,指定分区数和副本数
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test

这条命令会创建一个名为test的topic,有3个分区,每个分区需分配3个副本。
具体消息发送到哪个partition,由producer进行消息路由,采用轮询round-robin或者hash算法。

3、分区-Partitions

       每个Topics划分为一个或者多个Partition,每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名,该dir包含了这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展。

第一个segment
00000000000000000000.index 
00000000000000000000.log    
第二个segment,文件命名以第一个segment的最后一条消息的offset组成
00000000000000170410.index 
00000000000000170410.log 

        每个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加;分区中的消息都被分了一个序列号sequential id,也就是offset,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。

Topic和Partition

一个topic可以认为一个一类消息,每个topic将被分成多个partition。

在上图中我们的生产者会决定发送到哪个 Partition:

如果没有 Key 值则进行轮询发送。

如果有 Key 值,对 Key 值进行 Hash,然后对分区数量取余,保证了同一个 Key 值的会被路由到同一个分区。(所有系统的partition都是同一个路数)

在上图我们也可以看到,offset是跟partition走的,每个partition都有自己的offset。

4、Controller控制器

        Controller,是Apache Kafka的核心组件。它的主要作用是在Apache Zookeeper的帮助下管理和协调控制整个Kafka集群。

        集群中的任意一台Broker都能充当Controller的角色,但是,在整个集群运行过程中,只能有一个Broker成为Controller。也就是说,每个正常运行的Kafka集群,在任何时刻都有且只有一个Controller

        Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:{"version":1,"brokerid":0,"timestamp":"1529210278988"}

图片

 Controller保存的数据

图片

其中比较重要的数据有:

  1. 所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR集合中有哪些副本等。
  2. 所有Broker信息。包括当前都有哪些运行中的Broker,哪些正在关闭中的Broker等。
  3. 所有涉及运维任务的分区。包括当前正在进行Preferred领导者选举以及分区重分配的分区列表。

 3.1 控制器选举、Broker如何成为Controller

图片

        在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;

        如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,第一个成功创建 /controller 节点的 broker 则会被指定为控制器,而创建失败的broker则表示竞选失败。其他 broker 则会监听该节点的变化每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

3.2 控制器功能、Controller职责

        具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:

Controller职责大致分为5种:

主题管理,分区重分配,Preferred leader选举,集群成员管理(Broker上下线),数据服务(向其他Broker提供数据服务) 

控制器主要作用是管理和协调 Kafka 集群,负责管理整个集群中所有分区和副本的状态。具体如下:

1、主题topic管理:创建、删除 topic,以及增加 topic 分区等操作都是由控制器执行。

监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。

2、分区partition重分配:执行 Kafka 的 reassign 脚本对 topic 分区重分配的操作,也是由控制器实现。

监听partition相关的变化。为Zookeeper中的/admin/reassign_partitions节点注册PartitionReassignmentListener,用来处理分区重分配的动作。为Zookeeper中的/isr_change_notification节点注册IsrChangeNotificetionListener,用来处理ISR集合变更的动作。为Zookeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionListener,用来处理优先副本的选举动作。

当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。 

3、Preferred leader 选举:这里有一个概念叫 Preferred replica 即优先副本,表示的是分配副本中的第一个副本。Preferred leader 选举就是指 Kafka 在某些情况下出现 leader 负载不均衡时,会选择 preferred 副本作为新 leader 的一种方案。这也是控制器的职责范围。

4、集群成员broker管理:控制器能够监听broker相关的变化,新 broker 的增加,broker 的主动关闭与被动宕机。这里也是利用前面所说的 Zookeeper 的 ZNode 模型和 Watcher 机制,控制器会监听 Zookeeper 中 /brokers/ids 下临时节点的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化。

当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。

5、更新集群的元数据信息:控制器上保存了最全的集群元数据信息,其他所有 broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

  1. 如果参数auto.leader.rebalance.enable设置为true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来负责维护分区的优先副本的均衡。

6、broker的退出和加入过程:

Controller在Zookeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等)

其在Zookeeper对应的znode会自动被删除,Zookeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker

(1)、如果集群中有一个broker发生异常退出了,那么控制器Controller就会检查这个broker是否有分区的副本leader,如果有那么这个分区就需要一个新的leader,此时控制器就会去遍历其他副本,决定哪一个成为新的leader,同时更新分区的ISR集合。

将新的Leader,ISR和新的leader_epoch及controller_epoch(选举版本号)写入/brokers/topics/[topic]/partitions/[partition]/state。

(2)、如果有一个broker加入集群中,那么控制器Controller就会通过Broker ID去判断新加入的broker中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。

集群中每选举一次控制器,就会通过zookeeper创建一个controller epoch,每一个选举都会创建一个更大,包含最新信息的epoch,如果有broker收到比这个epoch旧的数据,就会忽略它们,kafka也通过这个epoch来防止集群产生“脑裂”。

(二)如何避免Controller出现裂脑

        如果Controller所在的Broker故障,Kafka集群必须有新的Controller,否则集群将无法正常工作。这儿存在一个问题。很难确定Broker是宕机还是只是暂时的故障。但是,为了使集群正常运行,必须选择新的Controller。如果之前更换的Controller又正常了,不知道自己已经更换了,那么集群中就会出现两个Controller。

        其实这种情况是很容易发生的。例如,由于垃圾回收(GC),一个Controller被认为是死的,并选择了一个新的控制器。在GC的情况下,在原Controller眼里没有任何变化,Broker甚至不知道自己已经被暂停了。因此,它将继续充当当前Controller,这在分布式系统中很常见,称为裂脑

        现在,集群中有两个Controller,可能会一起发出相互冲突的事件,这会导致脑裂。可能会导致严重的不一致。所以需要一种方法来区分谁是集群的最新Controller。

        Kafka是通过使用epoch number来处理,epoch number只是一个单调递增的数。第一次选择控制器时,epoch number值为1。如果再次选择新控制器,epoch number为2,依次单调递增。

        每个新选择的Controller通过zookeeper的条件递增操作获得一个新的更大的epoch number。当其他Broker知道当前的epoch number时,如果他们从Controller收到包含旧(较小)epoch number的消息,则它们将被忽略。即Broker根据最大的epoch number来区分最新的Controller。

        epoch number记录在Zookeepr的一个永久节点controller_epoch。

        上图中,Broker3向Broker1下发命令:将Broker1上的partitionA做为leader,消息的epoch number值为1,同时Broker2也向Broker1发送同样的命令。不同的是,消息的epoch number值为2,此时broker1只监听broker2的命令(由于其epoch号大),而会忽略broker3的命令,以免发生脑裂。

5、生产者-Producer发送到哪个Partitions

        向Kafka发送消息,生产者会根据topic分发消息。生产者也负责把消息关联到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区。算法可由开发者定义。

   // 指定分区器
props.put("partitioner.class", "kafka.producer.DefaultPartitioner")
    // 自定义分区器
props.put("partitioner.class", "day01.kafka.CustomPartitioner")

//设定依据key将当前这条消息发送到哪个partition的规则
public class CustomPartitioner implements Partitioner {
    public CidPartitioner(VerifiableProperties props) {  
//注意 : 构造函数的函数体没有东西,但是不能没有构造函数          
    }  
     @Override
    public int partition(Object key, int numPartitions) {
        try {            
            long partitionNum = Long.parseLong((String) key);
            return (int) Math.abs(partitionNum % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}

org.apache.kafka.clients.producer.internals.DefaultPartitioner 默认的分区策略是:

1、如果在发消息的时候指定了分区,则消息投递到指定的分区  

2、没有指定分区,根据key分发,对key求hash,然后对partition数量求模,来选择一个分区

Utils.abs(key.hashCode) % numPartitions

Utils.toPositive(nextValue) % numPartitions;

3、没有key时的分发逻辑

则用轮询的方式选择一个分区,每隔 topic.metadata.refresh.interval.ms 的时间,随机选择一个partition。这个时间窗口内的所有记录发送到这个partition。发送数据出错后也会重新选择一个partition。

请求首先被分配到kafka的Controller。Controller会根据请求中的哪个topic的哪个partition,由PartitionLeaderSelector确认leader,然后由handler将请求路由到leader所在的broker处理。决定好发送到哪个Partition后,需要明确该Partition的leader是哪台broker才能决定发送到哪里。

如何获取Partition的leader信息(元数据)  

1、从broker获取Partition的元数据。

由于Kafka所有broker存有所有的元数据,所以任何一个broker都可以返回所有的元数据

2、broker选取策略:将broker列表随机排序,从首个broker开始访问,如果出错,访问下一个

注意 Producer是从broker获取元数据的,并不关心zookeeper。

broker发生变化后,producer获取元数据的功能不能动态变化。

获取元数据时使用的broker列表由producer的配置中的 metadata.broker.list 决定。 该列表中的机器只要有一台正常服务,producer就能获取元数据。获取元数据后,producer可以写数据到非 metadata.broker.list 列表中的broker

5、 Kafka集群partition replication分配逻辑

partition leader与follower的信息受Zookeeper控制,每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,而follower从leader复制写入的数据。

下面以一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:

上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。
上述图种每个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。

4、副本分配逻辑规则如下:

在Kafka集群中,每个Broker都有均等分配Partition的Leader机会。

broker负载均衡,Topic分配partition和partition replica的算法:
将所有N Broker和待分配的i个Partition排序.
将第i个Partition分配到第(i mod n)个Broker上.
将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.

6、协调器Coordinator、再均衡Rebalance

        在介绍如何避免 Rebalance 问题之前,先来认识下 Kafka 的协调器 Coordinator,和之前 Kafka 控制器类似,Coordinator 也是 Kafka 的核心组件。Kafka 为了更好的实现消费组成员管理、offset位移管理,以及 Rebalance 等,

1、broker 服务端引入了组协调器(Group Coordinator),在 kafka-0.10 版本,Kafka 在服务端引入了组协调器(GroupCoordinator),每个 Kafka Server 启动时都会创建一个 GroupCoordinator 实例,用于管理部分消费者组和该消费者组下的每个消费者的消费偏移量。每个 broker 都有自己的 Coordinator 组件。

2、消费端引入了消费者协调器(Consumer Coordinator)。每个 Consumer 实例化时,同时会创建一个 ConsumerCoordinator 实例,负责消费组下各个消费者和服务端组协调器之前的通信。

2、消费者协调器-Coordinator

Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理Consumer Rebalance

rebalance的触发条件有三种:

  • 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了,组协调器会立即触发一次再均衡)
  • 订阅主题数发生变更——如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance,比如管理员添加了新的分区,会发生分区重分配。
  • 订阅主题的分区数发生变更

Rebalance的过程如下:

第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。

第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
所以对于Rebalance来说,Coordinator起着至关重要的作用

对于每个Consumer Group,Coordinator会存储以下信息:

  1. 对每个存在的topic,可以有多个消费组group订阅同一个topic  (对应消息系统中的广播)
  2. 对每个Consumer Group,元数据如下:
    订阅的topics列表
    Consumer Group配置信息,包括session timeout等
    组中每个Consumer的元数据。包括主机名,consumer id
    每个正在消费的topic partition的当前offsets
    Partition的ownership元数据,包括consumer消费的partitions映射关系

consumer group如何确定自己的coordinator是谁呢? 简单来说分为两步:

  1. 确定consumer group位移信息写入__consumers_offsets这个topic的哪个分区。
  2. 具体计算公式:
    __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)                    注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
  3. 该分区leader所在的broker就是被选定的coordinator

1)、offset位移管理

消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。位移(offset)。这部分信息保存在服务器端(broker端)每个consumer group管理自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。

7、消费者-Cousumer

Consermer实例可以是独立的进程,负责订阅和消费消息。

消费者用consumerGroup来标识自己。 同一个消费组可以并发地消费多个分区的消息,同一个partition也可以由多个consumerGroup并发消费,但是同一时刻,一条消息只能被consumerGroup组中的一个消费者实例消费

Consumer Group:同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个消息只发送给其中一个Consumer

消费者分区分配策略(Consumer Reblance)

1.  range策略

对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor 这是默认的分配策略

可以通过消费者配置中partition.assignment.strategy参数来指定分配策略, 

1、range分配策略针对的是主题

2、首先,将分区按数字顺序排行序,消费者按消费者名称的字典序排好序

3、然后,用分区总数除以消费者总数。如果能够除尽,则皆大欢喜,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区

//    获取主题下有多少个分区
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);

//    消费者按字典序排序
Collections.sort(consumersForTopic);

//    分区数量除以消费者数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
//    取模,余数就是额外的分区
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
    int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
    int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
    //    分配分区
    assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}

2.  roundrobin(轮询)

roundronbin分配策略的具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor

轮询分配策略是基于所有可用的消费者和所有可用的分区的

与前面的range策略最大的不同就是它不再局限于某个主题 ,可用统一分配,均衡分配

二、Kafka文件的存储机制

        同一个topic下有多个不同的partition,每个partition为一个目录,partition命名的规则是topic的名称加上一个序号,序号从0开始。

xxx/message-folder(kafkalogs)为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder)

假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,

例如创建2个topic名称分别为eshop、kafka-test,partitions数量都为partitions=3 存储路径和目录规则为:

每个文件是一个segment。 在broker的log存储文件下,除了存储这各个topic的文件夹,还存在这几个checkpoint文件。

分别是

  • recovery-point-offset-checkpoint :(文件)负责记录topic已经被写入磁盘的offset
  • replication-offset-checkpoint :(文件)负责记录已经被复制到别的topic上的文件
  • __consumer_offsets:(文件夹)存储各个topic的offset。但是,他的只有一份。
  • logStartOffset 日志段集合中第一个日志段(segment)的基础位移,也就是这个日志对象的基础位移
  • LogEndOffset 下一条将要被加入到日志的消息的位移
cat  recovery-point-offset-checkpoint

HelloWorld 2 0
__consumer_offsets 13 0
__consumer_offsets 2 0
__consumer_offsets 43 0
__consumer_offsets 6 0
__consumer_offsets 14 0
kafka-test 1 0
__consumer_offsets 20 0
__consumer_offsets 0 0
__consumer_offsets 44 0
__consumer_offsets 39 0
kafka-test 0 0
__consumer_offsets 12 0
__consumer_offsets 45 0
__consumer_offsets 1 0
__consumer_offsets 5 0
__consumer_offsets 26 0
kafka-test 2 0
__consumer_offsets 29 0
__consumer_offsets 34 0
__consumer_offsets 10 0
__consumer_offsets 32 0
__consumer_offsets 40 0

2、partiton中文件存储方式  

 

每一个partition目录下的文件被平均切割成大小相等(默认一个文件是500兆,可以手动去设置)的数据文件,
每一个数据文件都被称为一个段(segment file),但每个段消息数量不一定相等,这种特性能够使得老的segment可以被快速清除。
默认保留7天的数据。

每个partition下都会有这些每500兆一个每500兆一个(当然在上面的测试中我们将它设置为了1G一个)的segment段。

 

一对segment file文件为例,说明segment中index<—->data file对应关系物理结构如下: 

       ------   image

上述图3中:索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。  

4、 在partition中如何通过offset查找message

例如读取offset=368772的message, 索引文件中元数据[3, 497]为例

需要通过下面2个步骤查找。

  • 第一步查找segment file 上述图为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.
  • 第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.
  • 第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件, 只要根据offset **二分查找**文件列表,就可以快速定位到具体文件。
  • 当offset=368772时,依次定位到00000000000000368769.index, 元数据[3, 497],第3个message、以及该消息在00000000000000368769.log的物理偏移地址为497。 
  • 第二步通过segment file查找message 通过第一步定位到segment file,
  • 然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

从上述图3可知这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

三、kafka副本同步机制

        Kafka提供了数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。

当一个leader宕机,kafka Controller从分区的ISR中选一个作为leader。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

所有的副本(replicas)统称为Assigned Replicas,即AR。   AR=ISR+OSR。 

ISR策略  (in-sync Replica 同步副本)

ISR是AR中的一个子集,由leader维护ISR列表,Leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica 同步副本集合),在ISR中有成员存活时,只有这个组的成员才可以成为leader, 每当leader挂掉时,在ISR集合中选举出一个follower作为leader提供服务。

当producer发送一条消息到broker后,leader写入消息并复制到所有follower。内部保存的为每次提交信息时必须同步的副本(acks = all时),消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,follower从leader同步数据有一些延迟,滞后,任意一个超过阈值,leader都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。 当重新跟上leader的消息数据时,重新进入ISR。

OSR(out sync replica): 保存的副本不必保证必须同步完成才进行确认,OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。

副本同步队列(ISR)
所谓同步,必须满足如下两个条件(kafka节点活着的条件):

  • 副本节点必须能与zookeeper保持会话(心跳机制)
  • 副本能复制leader上的所有写操作,并且不能落后太多。(卡住或滞后的副本控制是由 replica.lag.time.max.ms 配置)

这里的落后可以指两种情况
1:数据复制落后,slave节点和leader节点的数据相差较大,这种情况有一个缺点,在生产者突然发送大量消息导致网络堵塞后,大量的slave复制受阻,导致数据复制落后被大量的踢出ISR。
2:时间相差过大,指的是slave向leader请求复制的时间距离上次请求相隔时间过大。通过配置replica.lag.time.max就可以配置这个时间参数。 


   Kafka中的消息以一下方式存储到文件中。

  

HW是HighWatermark的缩写,俗称高水位,取一个partition对应的ISR中最小的LEO作为HW(LEO:LogEndOffset的缩写,表示每个partition的log最后一条Message的位置)。consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。

对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broker的读取请求,没有HW的限制。

当leader挂了之后,现在B成为了leader,A重新恢复之后需要进行消息的同步,如果使用追加的方式那么就会有冗余消息,所以A将自己的消息截取到HW的位置在进行同步。
 

那么ISR是如何实现同步的呢?

broker 收到producer的请求
leader 收到消息,并成功写入,LEO 值+1
broker 将消息推给follower replica,follower 成功写入 LEO +1

所有LEO 写入后,leader HW +1
消息可被消费,并成功响应
上述过程从下面的图便可以看出:

 

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。

事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。

而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。

而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。

kafka分区partition挂掉之后如何恢复?

在kafka中有一个partition recovery机制用于恢复挂掉的partition。

每个Partition会在磁盘记录一个RecoveryPoint(恢复点), 记录已经flush到磁盘的最大offset。当broker fail 重启时,会进行loadLogs。 首先会读取该Partition的RecoveryPoint,找到包含RecoveryPoint点上的segment及以后的segment, 这些segment就是可能没有完全flush到磁盘segments。然后调用segment的recover,重新读取各个segment的msg,并重建索引。

优点:

  1. 以segment为单位管理Partition数据,方便数据生命周期的管理,删除过期数据简单
  2. 在程序崩溃重启时,加快recovery速度,只需恢复未完全flush到磁盘的segment即可

四、消息的可靠性与一致性

可靠性:保证数据不丢失

1、Producer端 (ack=all 代表至少成功发送一次)

        Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。通过调节其副本相关参数,可以使得Kafka在性能和可靠性之间运转的游刃有余。

Producer发送消息的配置

1 同步模式

kafka有同步(sync)、异步(async)以及oneway这三种发送方式

producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。

(1)Producer

当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

生产者配置:
props.put(“request.required.acks”, -1);//保证消息必须发送到Kafka 

一个消息如何算投递成功,Kafka提供了三种模式:

1. request.required.acks=1 (默认)

        当ack=1,表示producer写partition leader成功后,broker就返回成功,发送下一条message,无论其他的partition follower是否写成功。 

只要Master确认收到消息就算投递成功; 若此时ISR中的副本还没有来得及拉取该消息,leader就宕机了,那么此次发送的消息就会丢失。

2. request.required.acks=-1 (all

        只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;

同步(Kafka默认为同步,即producer.type=sync)的发送模式,数据发送到leader, ISR的follower全部完成数据同步后,表示只有producer全部写成功的时候,才算成功,kafka broker才返回成功信息。leader此时挂掉,那么会选举出新的leader,数据不会丢失。

3. request.required.acks=2

        当ack=2,表示producer写partition leader和其他一个follower成功的时候,broker就返回成功,无论其他的partition follower是否写成功。

4. request.required.acks=0

        这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;

Producer要在吞吐率和数据可靠性之间做一个权衡。 实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型

要保证数据不丢除了设置acks=-1, 还要保证ISR的大小大于等于2,具体参数设置:

(1)request.required.acks: 设置为-1 等待所有ISR列表中的Replica接收到消息后采算写成功;
(2)min.insync.replicas: 设置为大于等于2,保证ISR中至少有两个Replica

2、Broker消息的存储可靠性,参数设置

1、刷盘时机

     broker的刷盘时机主要是以下两个参数控制:

     log.flush.interval.ms                  日志刷盘的时间间隔,每隔多少时间将消息刷到磁盘上

     log.flush.interval.messages      日志刷盘的消息量,每积累多少条消息将消息刷到磁盘上

2、副本数

    在创建消息Topic的时候需要指定消息的副本数replicas一般建议设置成3保证消息的可靠,再结合客户端发送方的ack参数,

  • 1、acks=all      所有副本都写入成功并确认。
  • 2、retries=一个合理值        kafka 发送数据失败后的重试值。默认为 0,即不重试,立即失败。设置一个大于 0 的值,表示重试次数。(如果总是失败,则可能是网络原因)
  • 3、min.insync.replicas=2     消息至少要被写入到这么多副本才算成功。
  • 4、unclean.leader.election.enable=false      关闭 unclean leader 选举,即不允许非 ISR 中的副本被选举为 leader,以避免数据丢失。
  • 5、buffer.memory:指定 producer 端用于缓存消息的缓冲区的大小,默认32M;适当提升该参数值,可以增加一定的吞吐量。
  • 6、batch.size:producer 会将发送分区的多条数据封装在一个 batch 中进行发送,这里的参数指的就是 batch 的大小。该参数值过小的话,会降低吞吐量,过大的话,会带来较大的内存压力。默认为 16K,建议合理增加该值。

各场景测试总结:

  • 当acks=-1时,Kafka发送端的TPS(吞吐量)   受限于topic的副本数量(ISR中),副本越多TPS越低;   
  • acks=0时,TPS最高,其次为1,最差为-1,即TPS:acks_0 > acks_1 > ack_-1
  • min.insync.replicas参数不影响TPS;
  • partition的不同会影响TPS,随着partition的个数的增长TPS会有所增长,但并不是一直成正比关系,到达一定临界值时,partition数量的增加反而会使TPS略微降低;
  • Kafka在acks=-1,min.insync.replicas>=1时,具有高可靠性,所有成功返回的消息都可以落盘。 但效率很低

3、Consumer端 消费者 (offset手动提交,业务逻辑成功处理后,提交offset)

SparkStreaming +kafka 的offset保存MySQL、hbase、redis、zookeeper_.-CSDN博客

2、一致性:数据重复消费 

问题场景:

  1. 当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
  2. 设置offset为自动提交,正在消费数据,kill消费者线程;
  3. 设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费;
  4. 消费kafka与业务逻辑在一个线程中处理,可能出现消费程序业务处理逻辑阻塞超时,导致一个周期内,offset还未提交;消费的速度很慢的时候,导致心跳机制检测报告出问题。,继而重复消费,但是业务逻辑可能采用发送kafka或者其他无法回滚的方式;

        重复消费最常见的原因:re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。 

底层根本原因:已经消费了数据,但是offset没提交。或者消费了,又从头消费
配置问题:设置了offset自动提交

解决方案:至少发一次+去重操作(幂等性),去重问题,消息可以使用唯一id标识

        保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Hbase、Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

五、消息的顺序消费问题

(1) Producer

        发送端不能异步发送,异步发送在发送失败的情况下,就没办法保证消息顺序。比如你连续发了1,2,3。 过了一会,返回结果1失败,2, 3成功。你把1再重新发送1遍,这个时候顺序就乱掉了。

        解决:request.required.acks:=1,表示producer写 leader成功后,broker就返回成功,发送下一条message

(2) 存储端,数据备份可靠性

        在Kafka中,它叫做partition; 如果你有多个partition队列,那同1个topic的消息,会分散到多个分区里面,自然不能保证顺序。消息不能分区。也就是1个topic,只能有1个队列。

        即使只有1个队列的情况下,会有第2个问题。该机器挂了之后,能否切换到其他机器?也就是高可用问题。比如你当前的机器挂了,上面还有消息没有消费完。此时切换到其他机器,可用性保证了。但消息顺序就乱掉了。要想保证,一方面要同步复制,不能异步复制;另一方面得保证,切机器之前,挂掉的机器上面,所有消息必须消费完了,不能有残留。很明显,这个很难。

(3) Consumer消费者

1、全局顺序
  a、全局使用一个生产者,一个分区,一个消费者。
2、局部顺序
  a、每个分区是有序的,根据业务场景制定不同的 key 进入不同的分区。

        Consumer处理partition里面的message的时候是顺序读取的。所以必须维护着上一次读到哪里的offsite信息。(offset手动提交,业务逻辑成功处理后,提交offset),落表(选择唯一主键存储到Redis(等数据库),先查询是否存在,若存在则不处理;若不存在,先插入Redis ,再进行业务逻辑处理 ,避免重复数据) 

5、Kafka如何做到全局(局部,分区)有序:自定义分区

1、一个topic使用一个分区,但是这样会降低性能

2、针对单分区有序,我们想办法发同一个特征的数据写到一个分区

只要把同一个表的同一个主键的数据发到同一个分区即可,传入的参数 tableName+主键(如果多数据库得加入数据库名)
这样,消费到的数据就是有序的。不同的场景灵活运用即可。

Kafka如何做到全局有序_墨卿风竹的博客-CSDN博客_kafka保证全局有序

private int partitionDefine(String keyToPartition) {
	if (keyToPartition == null) {
		return new Random().nextInt(numPartitions);
	} else {
		return Math.abs(keyToPartition.hashCode()) % numPartitions;
	}
}

六、Zookeeper在Kafka中的作用

 Zookeeper 协调控制

1. 管理broker与consumer的动态加入与离开。(Producer不需要管理,随便一台计算机都可以作为Producer向Kakfa Broker发消息)

2. 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的消费负载平衡。(因为一个comsumer消费一个或多个partition,一个partition只能被一个consumer消费)

3.  维护消费关系及每个partition的消费信息。

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。

Controller的管理工作都是依赖于Zookeeper的。

以下为partition的leader选举过程:

kafka在zookeeper的存储

备注:

kafka要依赖外部的Zookeeper服务。
Broker注册:是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:
/brokers/ids
管理topic的分区信息及与broker的对应关系:在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:
/borkers/topics
生产者负载均衡:由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
消费者负载均衡:与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
管理分区与消费者的关系:在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
消息消费进度Offset记录
消费者Consumer注册

Kafka(四)Kafka在zookeeper中的存储 - Frankdeng - 博客园

 

6.2 Zookeeper上的细节:

1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。

2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。

3. 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

七、kafka生产者与消费者相关命令行

 Kafka常用命令之kafka-topics.sh  Kafka常用命令之kafka-topics.sh_Ernest-CSDN博客_kafka-topics.sh

1,开启zookeeper集群 startzk.sh
2,开启kafka集群  start-kafka.sh #节点都要开启
2,开启kafka可视化界面 kafka-manager : start-kafka-manager.sh
 
3,生产者操作:
kafka-console-producer.sh --broker-list node1:9092 --topic my-kafka-topic    //my-kafka-topic时topic的名字
 
4,消费者操作:
kafka-console-consumer.sh --bootstrap-server node1:9092 --topic my-kafka-topic
# 通过以上命令,可以看到消费者可以接收生产者发送的消息
 
# 如果需要从头开始接收数据,需要添加--from-beginning参数
kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic
  
5,a.创建topic
kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
 
b.查看topic列表
kafka-topics.sh --list --zookeeper node01:2181
 
c.如果需要查看topic的详细信息,需要使用describe命令
kafka-topics.sh --describe --zookeeper node1:2181 --topic test-topic
 
d.#若不指定topic,则查看所有topic的信息
kafka-topics.sh --describe --zookeeper node1:2181
 
e.删除topic
kafka-topics.sh --delete --zookeeper node1:2181 --topic my-kafka-topic

1、对于生产者,在使用的时候会用到bootstrap ,本以为是两个参数,其实是实现一个功能,查看源代码后发现broker.list是旧版本命令,

2、对于消费者,kafka中有两个设置的地方:

对于老的消费者(0.8 以前),由--zookeeper参数设置;

新的消费者(0.9 ),由--bootstrap-server参数设置,

如果使用了--zookeeper参数,那么consumer的信息(消费进度)将会存放在zk之中,consumer 必须知道 zookeeper 的地址。这个方案有性能问题。

查看的方法是使用./zookeeper-client,然后 ls /consumers/[group_id]/offsets/[topic]/[broker_id-part_id],这个是查看某个group_id的某个topic的offset

0.9 的时候整体大改了一次,brokers 接管了消费进度,使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中,consumer 不再需要和 zookeeper 通信了,新版的 Kafka 使用一个选举出来的controller 来监听 zookeeper,其他 node 再去和 controller 通信,这么做的目的是为了减少 zookeeper 的压力。bootstrap servers 只需要配个两三个就行了,会自动发现其他 broker。bootstrap-servers 会自动发现其他 broker,producer传进去的参数是 broker list, 是不会自动发现其它 brokers 的。

八、kafka使用场景

广泛应用于大数据领域:如网站行为分析、日志聚合、Apps监控等场景;

让数据集成变得简单:能将 Kafka 中的消息导入到 ODPS、HBase、HBASE 等离线数据仓库;

可广泛的与流计算引擎集成:包括阿里云平台的 StreamCompute、E-MapReduce 和开源产品 Spark、Storm 等;

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐