
Kafka入门到入土——万字详解,图文并茂
万字Kafka详解!图文并茂!kafka架构、数据模型、消息队列MQ、数据请求机制、副本机制、生产者、消费者组详解、主题分区、broker、controller
目录
Kafka是一个由Scala和Java语言开发的,经典高吞吐量的分布式消息发布和订阅系统,也是大数据技术领域中用作数据交换的核心组件之一。它具有以下特点:
- 支持消息的发布和订阅,类似于 RabbtMQ、ActiveMQ 等消息队列;
- 支持数据实时处理;
- 能保证消息的可靠性投递;
- 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错;
- 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量;
消息队列(MQ)
Kafka软件最初的设计就是专门用于数据传输的消息系统,类似功能的软件有RabbitMQ、ActiveMQ、RocketMQ等,这些软件的核心功能是传输数据,而Java中如果想要实现数据传输功能,那么这个软件一般需要遵循Java消息服务技术规范JMS。前面提到的ActiveMQ软件就完全遵循了JMS技术规范,而RabbitMQ是遵循了类似JMS规范并兼容JMS规范的跨平台的AMQP规范。除了上面描述的JMS,AMQP外,还有一种用于物联网小型设备之间传输消息的MQTT通讯协议。
Kafka拥有作为一个消息系统应该具备的功能,但是却有着独特的设计。Kafka借鉴了JMS规范的思想,但是却并没有完全遵循JMS规范。这也恰恰是软件名称为Kafka,而不是KafkaMQ的原因。
消息队列一般应用场景
- 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败。
- 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
- 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况。该方法有如下优点:
- 1.请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲,极 大地减少了业务处理系统的压力;
- 2.队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;
- 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者 负责产生消息,消费者(可能有多个)负责对消息进行处理。具体场景:用户新上传了一批照片,人脸识别系统需要对这个用户的所有照片进行聚类,聚类完成后由对账系统重新生成用户的人脸索引(加快查询)。这三个子 系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,后一个阶段从 队列中获取消息继续处理。
- 该方法有如下优点:1.避免了直接调用下一个系统导致当前系统失败; 2.每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按不同处理速度处理;
JMS
JMS类似于JDBC,是java平台的消息中间件通用规范,定义了系统和系统之间传输消息的接口。
为了实现系统和系统之间的数据传输,JMS规范中定义很多用于通信的组件:
- JMS Producer:JMS消息生产者。所谓的生产者,就是生产数据的客户端应用程序,这些应用通过JMS接口发送JMS消息。
- JMS Provider:JMS消息提供者。其实就是实现JMS接口和规范的消息中间件,也就是我们提供消息服务的软件系统,比如RabbitMQ、ActiveMQ、Kafka。
- JMS Message:JMS消息。这里的消息指的就是数据。一般采用Java数据模型进行封装,其中包含消息头,消息属性和消息主体内容。
- JMS Consumer:JMS消息消费者。所谓的消费者,就是从消息提供者中获取数据的客户端应用程序,这些应用通过JMS接口接收JMS消息。
JMS模型
点对点模型(peer to peer)
特点:
- 每个消息只有一个接收者(Consumer)(即一旦被消费,就会被删除);
- 发送者和接发收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
- 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接 收的消息
发布订阅模型
特点:
- 每个消息可以有多个订阅者,但是订阅者必须来自不同的消费者组;
- 针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
- 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;
Kafka采用就是这种模型。
Kafka架构
在 Kafka 2.8.0 版本,移除了对 Zookeeper 的依赖,通过 Kraft模式进行自己的集群管理,使用 Kafka 内部的 Quorum 控制器来取代 ZooKeeper管理元数据,这样我们无需维护zk集群,只要维护Kafka集群就可以了,节省运算资源。
kafka基本数据单元被称为 message(消息),为减少网络开销,提高效率,多个消息会被放入同一批次(Batch) 中后再写入。
Broker
- kafka 集群中包含多个服务实例(节点),这种服务实例被称为 broker(一个 broker 就是一个节点/一个服务器),每个 broker 都有一个唯一标识 broker.id,用于标识自己在集群中的身份,可以在配置文件 server.properties 中进行配置,或由程序自动生成。
- Broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。Broker 为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘的消息。
Controller选举
每一个集群都会选举出一个 Broker作为集群控制器 (Controller),它除了具备其他 broker 的功能外,还负责管理主题分区及其副本的状态。如果在运行过程中,Controller节点出现了故障,那么Kafka会依托于ZooKeeper软件选举其他的节点作为新的Controller,让Kafka集群实现高可用。
特殊情况
Controller节点并没有宕掉,而是因为网络的抖动,不稳定,导致和ZooKeeper之间的会话超时,那么此时,整个Kafka集群就会认为之前的Controller已经下线(退出)从而选举出新的Controller,而之前的Controller的网络又恢复了,以为自己还是Controller了,继续管理整个集群,那么此时,整个Kafka集群就有两个controller进行管理,那么其他的broker就懵了,不知道听谁的了,这种情况,我们称之为脑裂现象,为了解决这个问题,Kafka通过一个任期(epoch:纪元)的概念来解决,也就是说,每一个Broker当选Controller时,会告诉当前Broker是第几任Controller,一旦重新选举时,这个任期会自动增1,那么不同任期的Controller的epoch值是不同的,那么旧的controller一旦发现集群中有新任controller的时候,那么它就会完成退出操作(清空缓存,中断和broker的连接,并重新加载最新的缓存),让自己重新变成一个普通的Broker。
Broker上下线
Controller 在初始化时,会利用 ZK 的 watch 机制注册很多不同类型的监听器,当监听的事件被触发时,Controller 就会触发相应的操作。Controller 在初始化时,会注册多种类型的监听器,主要有以下几种:
- /kafka/admin/reassign_partitions 节点,用于分区副本迁移的监听
- /kafka/isr_change_notification 节点,用于 Partition ISR 变动的监听
- /kafka/admin/preferred_replica_election 节点,用于需要进行 Partition 最优 leader 选举的监听
- /kafka/brokers/topics 节点,用于 Topic 新建的监听
- /kafka/brokers/topics/TOPIC_NAME 节点,用于 Topic Partition 扩容的监听
- /kafka/admin/delete_topics 节点,用于 Topic 删除的监听
- /kafka/brokers/ids 节点,用于 Broker 上下线的监听,记录有哪些kafka服务器在线。
- /kafka/controller节点,辅助选举leader
每台 Broker 在上线时,都会与ZK建立一个建立一个session,并在 /brokers/ids下注册一个节点,节点名字就是broker id,这个节点是临时节点,该节点内部会有这个 Broker 的详细节点信息。Controller会监听/brokers/ids这个路径下的所有子节点,如果有新的节点出现,那么就代表有新的Broker上线,如果有节点消失,就代表有broker下线,Controller会进行相应的处理,Kafka就是利用ZK的这种watch机制及临时节点的特性来完成集群 Broker的上下线。无论Controller监听到的哪一种节点的变化,都会进行相应的处理,同步整个集群元数据。
Broker工作流程
Producer
一般情况下,生产者在把消息均衡地分布到在主题的所有分区上,而并不关心消息会被写到哪个分区。如果我们想要把消息写到指定的分区,可以通过自定义分区器来实现。
Consumer
消费者一定是归属于某个消费组中的,消费者可以订阅一或多个主题,并按照分区中消息的顺序来读取。消费者通过检查消息的偏移量 (offset) 来区分读取过的消息。偏移量是一个不 断递增的数值,在创建消息时,Kafka 会把它添加到其中,在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或者重启,它还可以重新获取该偏移量,以保证读取状态不会丢失。
Consumer Group
消费者组由一个或者多个消费者组成,同一个组中的消费者对于同一条消息只消费一次。
每个消费者组都有一个 ID,即 group ID。组内的所有消费者协调在一起来消费 一个订阅主题的所有分区。当然,每个分区只能由同一个消费组内的一个消费者来消费,但可以由不同的消费组来消费。partition 数量决定了每个 consumer group 中并发消费者的最大数。
因此要合理设置消费者组中的消费者数量,避免出现消费者闲置。
Topic
Kafka 的消息通过 Topics(主题) 进行分类,Kafka中有两个固定的,用于记录消费者偏移量和事务处理的主题,一个主题可以被分为若干个 Partitions(分区),一个分区就是 一个提交日志 (commit log)。消息以追加的方式写入分区,然后以先入先出的顺序读取。Kafka 通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上,这意味着一个 Topic 可以横跨多个服务器,以提供比单个服务器更强大的性能。
由于一个 Topic 包含多个分区,因此无法在整个 Topic 范围内保证消息的顺序性,但可以保证消息在单个分区内的顺序性。
Partition分区
Kafka消息传输采用发布、订阅模式,所以消息生产者必须将数据发送到一个主题,假如发送给这个主题的数据非常多,那么主题所在broker节点的负载和吞吐量就会受到极大的考验,甚至有可能因为热点问题引起broker节点故障,导致服务不可用解决方案就是分区。
topic是逻辑上的概念,而partition是物理上的概念,每个 topic 包含一个或者多个partition,每个分区保存部分 topic 的数据,所有的 partition 当中的数据全部合并起来, 就是一个 topic 当中的所有的数据。一个 broker 服务下,有多个Topic,每个Topic可以创建多个分区,broker 数与分区数没有关系; 在 kafka 中,每一个分区会有一个编号,编号从 0 开始。
单个分区的消息是有序的,而全局的topic的多个分区的消息是无序的。这就是为什么一条消息只能被同一个消费者组里面的一个消费者消费,这样就某种程度上保证了消息的不重复消费和乱序消费。
分区好处
- 合理使用存储资源:海量资源按照分区切割成一块块存储在多台broker,合理控制分区任务,实现负载均衡。
- 提高并行度:生产者以分区为单位发送数据,消费者以分区为单位消费数据。
生产者发送消息的分区策略
- 默认分区器DefaultPartitioner
- 自定义分区
自己创建类实现Partitioner接口,重写partition方法配置分区器package com.atguigu.test; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * TODO 自定义分区器实现步骤: * 1. 实现Partitioner接口 * 2. 重写方法 * partition : 返回分区编号,从0开始 * close * configure */ public class KafkaPartitionerMock implements Partitioner { /** * 分区算法 - 根据业务自行定义即可 * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata * @return 分区编号,从0开始 */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
package com.atguigu.test; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; public class ProducerPartitionTest { public static void main(String[] args) { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configMap.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartitionerMock.class.getName()); KafkaProducer<String, String> producer = null; try { producer = new KafkaProducer<>(configMap); for ( int i = 0; i < 1; i++ ) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i); final Future<RecordMetadata> send = producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if ( e != null ) { e.printStackTrace(); } else { System.out.println("数据发送成功:" + record.key() + "," + record.value()); } } }); } } catch ( Exception e ) { e.printStackTrace(); } finally { if ( producer != null ) { producer.close(); } } } }
文件存储Segment
每个partition对应一个log文件,该log文件存储的就是生产的数据,生产的数据不断追加到log文件,为了防止log文件过大导致数据定位效率低下,kafka采取了分片和索引,把每个log文件分成多个 segment 文件段,每个segment包括index文件、log文件、timeindex文件等等,统一放在一个文件夹下,文件夹命名规则是:topic名+分区序号
kafka消费完数据之后不会立刻删除,而是有专门的清理机制,默认保存数据7天,7天后删除,而timeindex文件就记录了数据的时间
index是稀疏索引
偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该作为文件名命名规则:
- 00000000000000000000.index:索引文件,记录偏移量映射到 .log 文件的字节偏移量,此映射用于从任何特定偏移量读取记录
- 0000000000000000000.timeindex:时间戳索引文件,此文件包含时间戳到记录偏移量的映射,该映射使用.index文件在内部映射到记录的字节偏移量。这有助于从特定时间戳访问记录
- 00000000000000000000.log:此文件包含实际记录,并将记录保持到特定偏移量,文件名描述了添加到此文件的起始偏移量,如果日志文件名为 00000000000000000004.log ,则当前日志文件的第一条数据偏移量就是4(偏移量从 0 开始)
分区的副本
在创建主题时,Kafka 会首先决定如何在 broker 间分配分区副本,它遵循以下原则:
- 在所有 broker 上尽可能均匀地分配分区副本,负载均衡;
- 确保分区的每个副本分布在不同的 broker 上;
- 如果使用了 broker.rack 参数为 broker 指定了机架信息,那么会尽可能的把每个分区的副本分配到不同机架的 broker 上,以避免一个机架不可用而导致整个分区不可用。
- 分区数可以>brokers数,但是副本因子必须<=可用broker数,这样才能保证每个副本分布在不同的broker上,进而保证数据的完整性。
Why分区副本
为了保证高可用(提高负载均衡和系统伸缩性),kafka 的分区是多副本的,如果一个副本丢失了,还可以从其他borker的副本中获取分区数据。但这要求对应副本数据必须是完整的,这是 Kafka 数据一致性的基础,所以才需要使用 controller broker 来进行专门的管理。
注意!follower副本会周期性地同步leader副本的数据,同步数据的过程是有一定延迟的,所以副本之间的数据可能是不同的。
Kafka 的单个主题被分为多个分区,每个分区可以有多个副本 (可以在创建主题时使用 replication-factor 参数进行指定)。其中一个副本是Leader副本,所有的读写请求都直接发送给Leader副本;其他副本是Follower副本,分布在不同的broker上,需要通过复制来保持与Leader副本数据一致,当Leader副本不可用时,会从ISR中选一个Follower副本成为新Leader。Leader选举依赖Controller。
手动调整分区副本存储

kafka默认均匀分布在所有broker上
手动调整副本存储,需要创建存储计划 json文件,然后在创建主题分区及副本的时候,指定存储计划文件。
副本Leader分区自动平衡
但是自动平衡会消耗大量资源,影响性能,不建议频繁触发自动平衡。
增加副本数量
增加副本因子不能通过命令行直接增加,需要创建副本存储计划json文件并执行副本计划。
副本Leader选举
副本Leader故障恢复
leader故障后会从ISR踢出,从follower产生新的leader,此时可能出现其他的follower数据比新的leader多,那么多的数据就会截掉,保证和leader数据一致。但是只能保证数据一致,不能保证不丢失。
副本Follower故障恢复
- follower2故障,被踢出ISR
- follower2恢复之后截掉HW之后的数据
- follower从HW位置开始向Leader同步数据,等follower的LEO>=该分区的HW,也就是追上Leader,就可以重新加入ISR。
ISR机制
每个分区都有一个 ISR(in-sync Replica) 列表,用于维护所有同步的、可用的副本。Leader副本必然是同步副本,而对于Follower副本来说,它需要满足以下条件才能被认为是同步副本:
- 与 Zookeeper 之间有一个活跃的会话,即必须定时向 Zookeeper 发送心跳;
- 在规定的时间(replica.lag.time.max.ms,默认30s)内从Leader副本那里低延迟地获取过消息。
如果副本不满足上面条件的话,就会被从 ISR 列表中移除,直到满足条件才会被再次加入。
OSR
不在ISR中的副本就在OSR,OSR和ISR统称AR(assigned Repllicas)
不完全首领选举
对于副本机制,在 broker 级别有一个可选的配置参数 unclean.leader.election.enable ,默认值 为 fasle,代表禁止不完全的首领选举。这是针对当Leader副本挂掉且 ISR 中没有其他可用副本时,是否允许某个不完全同步的副本成为Leader副本,这可能会导致数据丢失或者数据不一致,在某些对数据一致 性要求较高的场景 (如金融领域),这可能无法容忍的,所以其默认值为 false,如果你能够允许部分数据 不一致的话,可以配置为 true。
最少同步副本
ISR 机制的另外一个相关参数是 min.insync.replicas , 可以在 broker 或者主题级别进行配置,代表 ISR 列表中至少要有几个可用副本。这里假设设置为 2,那么当可用副本数量小于该值时,就认为整个分区处于不可用状态。此时客户端再向分区写入数据时候就会抛出异常 org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
- 副本数(replication-factor):消息保存在几个 broker(服务器)上, 一般情况下副本数等于 broker 的个数。
- follower 通过拉的方式从 leader 同步数据。 消费者和生产者都是从 leader读写数据,不与 follower 交互。如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。
- 副本因子的作用:让 kafka 读取数据和写入数据时的可靠性。 副本因子是包含本身,相同的副本因子不能放在同一个 broker。
- 如果某一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个中, 选择一个 leader。 如果所有的副本都挂了,生产者如果生产数据到指定分区的话,将写入不成功。 1sr 表示:当前可用的副本。
数据请求
请求机制
在所有副本中,只有领导副本才能进行消息的读写处理。由于不同分区的领导副本可能在不同的 broker 上,如果某个 broker 收到了一个分区请求,但是该分区的领导副本并不在该 broker 上,那么 它就会向客户端返回一个 Not a Leader for Partition 的错误响应。 为了解决这个问题,Kafka 提供了元数据请求机制。
- 首先集群中的每个 broker 都会缓存所有主题的分区副本信息,客户端会定期发送元数据请求,然后将获取的元数据进行缓存。定时刷新元数据的时间间隔可以通过为客户端配置 metadata.max.age.ms来进行指定。有了元数据信息后,客户端就知道了领导副本所在的broker,之后直接将读写请求发送给对应的broker即可。
- 如果在定时请求的时间间隔内发生的分区副本的选举,则意味着原来缓存的信息可能已经过时了,此时 还有可能会收到 Not a Leader for Partition 的错误响应,这种情况下客户端会再次求发出元数据请求,然后刷新本地缓存,之后再去正确的 broker 上执行对应的操作。
- 需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。
- Kafka 所有数据的写入和读取都是通过零拷贝来实现的
生产者详解
生产者发送消息的过程
- Kafka 的main线程会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发 送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
- 接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器 就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区。
- 紧接着,这条记录被添加到一个记录批次里,该批次数据积累到一定值(batch.size)后,这个批次里的所有消息会被sender线程发送到相同的主题和分区上。如果在规定时间(linger.ms)内该批次没有达到规定的batch.size,sender线程同样会把数据发送。linger.ms单位是ms,默认0ms
- 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。sender线程会重新执行写入请求,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。写入成功后,请求会从sender线程中删除。(后面消息可靠性会详解)
消息可靠性
对于生产者发送的数据,我们有的时候是不关心数据是否已经发送成功的,我们只要发送就可以了。在这种场景中,消息可能会因为某些故障或问题导致丢失,我们将这种情况称之为消息不可靠。虽然消息数据可能会丢失,但是在某些需要高吞吐,低可靠的系统场景中,这种方式也是可以接受的,甚至是必须的。
但是在更多的场景中,需要确定数据是否发送成功且Kafka是否接收到数据,也就是要保证数据不丢失,这就是所谓的消息可靠性保证。
而这个确定的过程一般是通过Kafka给我们返回的响应确认结果(Acknowledgement)来决定的,这里的响应确认结果简称为ACK应答。
根据场景,Kafka提供了3种应答处理,可以通过配置对象进行配置。
ACK应答
- ACK=0
当生产数据时,生产者将数据通过网络客户端将数据发送到网络数据流中的时候,Kafka就对当前的数据请求进行了响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。但这其实并不能保证Kafka能正确地接收到数据,消息不可靠,但是通信效率高,消息吞吐量也高。- ACK=1
当生产数据时,Leader副本将数据接收到并写入到了日志文件后,就会对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。这种方式消息可靠性较高,但是此时只有Leader节点存储了数据,还没有备份到follower副本,那么一旦当前存储数据的broker节点出现了故障,数据也依然会丢失。- ACK=-1(默认)
当生产数据时,Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后,再对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。这种是消息最可靠的,但是吞吐量有所下降。注意!这里同步的是ISR中的follower副本,只要ISR中的所有副本接收到了数据就会响应。
数据重试
由于网络或服务节点的故障,Kafka在传输数据时,可能会导致数据丢失,所以我们才会设置ACK应答机制,尽可能提高数据的可靠性。但其实在某些场景中,数据的丢失并不是真正地丢失,而是“虚假丢失”,比如ACK应答设置为1,也就是说一旦Leader副本将数据写入文件后,Kafka就可以对请求进行响应了。
此时,如果由于网络故障的原因,Kafka并没有成功将ACK应答信息发送给Producer,那么此时对于Producer来讲,以为kafka没有收到数据,所以就会一直等待响应,一旦超过某个时间阈值,就会发生超时错误,Producer就会认为数据已经丢了。
此时,Producer会尝试对超时的请求数据进行重试(retry)操作,将数据再次发送给Kafka,就可能出现数据重复。
数据乱序
数据重试(retry)功能除了可能会导致数据重复以外,还可能会导致数据乱序。假设需要将编号为1,2,3的三条连续数据发送给Kafka。每条数据会对应于一个连接请求,此时,如果第一个数据的请求出现了故障,而第二个数据和第三个数据的请求正常,那么Broker就收到了第二个数据和第三个数据,并进行了应答。
为了保证数据的可靠性,Producer会将第一条数据重新放回到缓冲区的第一个。进行重试操作,如果重试成功,Broker就会收到第一条数据,数据的顺序已经被打乱了。
如上图,在1.x版本之前,为了保证数据有序性,每个broker只能缓存一个请求,在1.x之后,开启幂等性,可以缓存多个请求,请求会在kafka服务端重新排序。
同步发送
异步发送
异步发送是生产者和RecordAccumulator的异步,上面的同步发送,是生产者把每批次数据发送给RecordAccumulator,该批次满足要求后由sender线程拉取发送到kafka集群,然后才是下一批次,按批次一批批发送给kafka集群,而异步则是不管上一批有没有发送到kafka集群,下一批直接由生产者发送到RecordAccumulator。
生产者提高吞吐量
压缩算法
配置参数compression.type,默认为none,支持snappy、gzip、lz4、zstd压缩算法
消费者详解
消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度。此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段。
push&pull
- 如果数据由Kafka进行推送(push),那么多个分区的数据同时推送给消费者进行处理,明显一个消费者的消费能力是有限的,那么消费者无法快速处理数据,就会导致数据的积压,从而导致网络,存储等资源造成极大的压力,影响吞吐量和数据传输效率。
- 如果kafka的分区数据在内部可以存储的时间更长一些,再由消费者根据自己的消费能力向kafka申请(拉取)数据,那么整个数据处理的通道就会更顺畅一些。Kafka的Consumer就采用的这种拉取数据的方式。
消费者组调度器
消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在kafka中,我们称之为消费者组调度器(Group Coordinator)
Group Coordinator是Broker上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息。每个Broker都有一个Group Coordinator对象,负责管理多个消费者组,但每个消费者组只有一个Group Coordinator
消费者分配分区策略
- 同一个消费者组的消费者都订阅同一个主题,所以消费者组中的多个消费者可以共同消费一个主题中的所有数据。
- 为了避免数据被重复消费,一个分区的数据只能被同组中的一个消费者消费,但是反过来,一个消费者是可以消费多个分区数据的。
- 消费者组中的消费者数量最好不要超出主题分区数量,就会导致多出的消费者是无法消费数据的,造成了资源的浪费。
消费者Leader
消费者中的每个消费者到底消费哪一个主题分区,这个分配策略其实是由消费者的Leader决定的,这个Leader称之为群主。群主是多个消费者中,第一个加入组中的消费者,其他消费者称之为Follower,称呼上有点类似与分区副本的Leader和Follower。
当消费者加入群组的时候,会发送一个JoinGroup请求。群主负责给每一个消费者分配分区。每个消费者只知道自己的分配信息,只有群主知道群组内所有消费者的分配信息。
指定分配策略的基本流程:
- 第一个消费者设定group.id为test,向当前负载最小的节点发送请求查找消费调度器
- 找到消费调度器后,消费者向调度器节点发出JOIN_GROUP请求,加入消费者组。
- 当前消费者当选为群主后,根据消费者配置中分配策略设计分区分配方案,并将分配好的方案告知调度器
- 此时第二个消费者申请加入消费者组
- 加入成功后,kafka将消费者组状态切换到准备rebalance,关闭和消费者的所有链接,等待它们重新加入。客户端重新申请加入,kafka从消费者组中挑选一个作为leader,其它的作为follower。
- Leader会按照分配策略对分区进行重分配,并将方案发送给调度器,由调度器通知所有的成员新的分配方案。组成员会按照新的方案重新消费数据
分区再均衡
- 因为群组里的消费者共同读取主题的分区,所以当一个消费者被关闭或发生崩溃时,它就离开了群组, 原本由它读取的分区将由群组里的其他消费者来读取。同时在主题发生变化时 , 比如添加了新的分区,也会发生分区与消费者的重新分配,分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。正是因为再均衡,所以消费费者群组才能保证高可用性和伸缩性。
- 消费者通过向群组协调器所在的 broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费 者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发再均衡。
监听分区再均衡
因为分区再均衡会导致分区与消费者的重新划分,有时候希望在再均衡前执行一些操作:比如提交已经处理但是尚未提交的偏移量,关闭数据库连接等。此时可以在订阅主题时候,调用 subscribe 的重载方法传入自定义的分区再均衡监听器。
偏移量Offset
Kafka 的每一条消息都有一个偏移量属性,记录了其在分区中的位置,偏移量是一个单调递增的整数。 消费者通过往一个叫作 _consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果有消费者退出或者新分区加入,此时就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分 区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。 因为这个原因,所以如果不能正确提交偏移量,就可能会导致数据丢失或者重复出现消费。
LSO
起始偏移量(Log Start Offset),每个分区副本都有起始偏移量,用于表示副本数据的起始偏移位置,初始值为0。
LSO一般情况下无需更新,但是如果数据过期,或用户手动删除数据时,Leader的Log Start Offset可能发生变化,Follower副本的日志需要和Leader保持严格的一致,因此,如果Leader的该值发生变化,Follower自然也要发生变化保持一致。
LEO
日志末端位移(Log End Offset),表示下一条待写入消息的offset,每个分区副本都会记录自己的LEO。对于Follower副本而言,它能读取到Leader副本 LEO 值以下的所有消息。
HW
高水位值(High Watermark),定义了消息可见性(对于消费者而言),标识了一个特定的消息偏移量,消费者只能读取到水位线以下的的数据。同时这个偏移量还可以帮助Kafka完成副本数据同步操作。
这就是所谓的木桶理论:木桶中容纳水的高度,只能是水桶中最短的那块木板的高度。这里将整个分区看成一个木桶,其中的数据看成水,而每一个副本就是木桶上的一块木板,那么这个分区(木桶)可以被消费者消费的数据(容纳的水)其实就是数据最少的那个副本的最后数据位置(木板高度)。
HW高水位线会随着follower的数据同步操作,而不断上涨,也就是说,follower同步的数据越多,那么水位线也就越高,那么消费者能访问的数据也就越多。
(详见上面的follower故障)
手动提交偏移量
用户可以通过将 enable.auto.commit 设为 false ,然后手动提交偏移量。基于用户需求手动提交偏移量可以分为两大类:
手动提交当前偏移量:即手动提交当前轮询的最大偏移量;
手动提交固定偏移量:即按照业务需求,提交某一个固定的偏移量。
按照 Kafka API,手动提交偏移量又可以分为同步提交和异步提交。
同步提交
通过调用 consumer.commitSync() 来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏 移量。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*同步提交*/ consumer.commitSync(); }
如果某个提交失败,同步提交还会进行重试,这可以保证数据能够最大限度提交成功,但是同时也会降 低程序的吞吐量。基于这个原因,Kafka 还提供了异步提交的 API。
异步提交
异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待 Broker 的响应。代码如下:
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*异步提交并定义回调*/ consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.out.println("错误处理"); offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset())); } } }); }
- 异步提交存在的问题是,在提交失败的时候不会进行自动重试,实际上也不能进行自动重试。假设程序 同时提交了 200 和 300 的偏移量,此时 200 的偏移量失败的,但是紧随其后的 300 的偏移量成功了, 此时如果重试就会存在 200 覆盖 300 偏移量的可能。同步提交就不存在这个问题,因为在同步提交的 情况下,300 的提交请求必须等待服务器返回 200 提交请求的成功反馈后才会发出。基于这个原因,某 些情况下,需要同时组合同步和异步两种提交方式。
- 虽然程序不能在失败时候进行自动重试,但是我们是可以手动进行重试的,你可以通过一个 Map offsets 来维护你提交的每个分区的偏移量,然后当失败时候,你 可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你 已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。
自动提交偏移量
将消费者的 enable.auto.commit 属性配置为 true 即可完成自动提交的配置。 此时每隔固定 的时间,消费者就会把 poll() 方法接收到的最大偏移量进行提交,提交间隔由auto.commit.interval.ms 属性进行配置,默认值是 5s。
使用自动提交是存在隐患的,假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了 再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减 小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。
截至 尚硅谷kafka3.x P39
更多推荐
所有评论(0)