Kafka 架构及原理分析

Kafka适合什么样的场景?

它可以用于两大类别的应用:

  1. 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
  2. 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

为了理解 Kafka 是如何做到以上所说的功能,从下面开始,我们将深入探索Kafka 的特性。

定位

  • 消息中间件
  • 消息引擎
  • 分布式实时流处理平台

简介

使用场景

  • 大数据领域
    • 网站行为分析
    • 日志聚合
    • 应用监控
    • 流式数据处理
    • 在线和离线数据分析
  • 数据集成
    • 消息导入 MaxCompute、OOS、RDS、Hadoop、HBase 等离线数据仓库
  • 流计算集成
    • 集成流计算引擎
      • StreamCompute
      • E-MapReduce
      • Spark
      • Storm

架构

依赖 Zookeeper 实现配置和节点管理

Kafka架构

如上图所示,一个 Kafka 集群架构中:

  • 3 台 Broker 。
  • 两个 Topic : Topic0 和 Topic1 。
    • Topic0 有 2 个分区: partition0 和 partition1 ,每个分区一共 3 个副本。
    • Topic1 只有 1 个分区: partition0 ,每个分区一共 3 个副本。
  • 图中红色字体的副本代表是 Ieader ,黑色字体的副本代表是 follower 。
  • 绿色的线代表是数据同步。蓝色的线是写消息,橙色的线是读消息,都是针对 leader 节点。
  • 有两个消费者组的两个分区。
    • 第一个消费者组,消费了 topic0 的两个分区。
    • 第二个消费者组,既消费 topic0 ,又消费 topic1 :
      • 第 1 个消费者,消费 topic0 的 partition0 ,还消费 topic1 的 partition0 。
      • 第 2 个消费者,消费 topic0 的 partition1。
      • 第 3 个消费者,没有 partition 可以消费。
Broker
message
message
Producer
Broker
Consumer
Broker
  • 只支持一种消费模式poll

  • 支持一次性获取多条(参数传入数目)

Topic
  • 使用Topic的订阅和发送,来实现生产者和消费者的关联,多对多
message
message
message
Producer
Broker
Topic
Topic
Consumer
Topic
  • Topic 分区扩展,增强并发访问能力
message
message
message
Producer
Broker
Topic
Topic
Partition1
Partition2
Partition3
Partition4
Consumer
Topic
副本机制
message
message
message
message
message
message
message
message
Producer
Broker
Broker
Partition0
Partition1
Partition0
Partition1
Consumer
副本数目一定小于等于节点数目
  • 副本同步主节点数据,但是不允许读 follow 节点,避免读写不一致的问题,降低延迟。
存储

存储文件

  • *.index 索引文件
  • *.log 数据文件
  • *.timeindex 时间戳索引文件

数据分段(针对文件过大,超出 1G)

  • segment
消费分组

消费分组

  • 消费组数目小于等于 Topic 数目
  • 消费者可以消费多个分区
消费编号

消费编号

  • 连续消费
  • 切换消费者
  • consumer_offset-[0~49] 保存消费者消费的偏移量

数据多写支持

业务场景:数据同步存储到 mysql、ES

基于 binlog 实现主从复制

主从复制

canal

伪装为 slave 节点,进行数据同步,解析 binlog,可以对接 kafka,实现数据的多写。

数据多写

MYSQL的数据修改,通过 kafka 完成数据变更的自动推送,实现多写操作。

Kafka 的进阶功能

消息幂等性

enable.idempotence=true

  • PID Producer ID 生产者编号

  • sequence number 消费者编号

  • 只能保证单分区、单一会话

事务

操作 API

  • 初始化producer.initTransactions()
  • 开启producer.beginTransaction()
  • 提交producer.commitTransaction()

场景:

  • 发送多条消息

  • 发送消息到多个 topic 或多个 partition

  • 消费以后发出消息 consume-process-produce

实现原理:

  • 2PC
  • Transaction Coordinator 协调者
  • 事务日志:topic记录 __transaction_state
  • 生产者事务 ID:transaction.id 发送前进行参数设置,可以使用 UUID

事务原理

1.initTansactions API 注册事务 ID
3.生产者把消息写入
4.事务完成后更新消息状态为已提交
4'.事务回滚
Producer
Coordinator
2.记录事务日志
目标分区
事务状态
删除分区中冗余数据
消费者消费消息

特性

  • 高吞吐、低延迟
  • 高伸缩性(Partition 分区)
  • 持久性、可靠性(副本机制,持久化方案)
  • 容错性(副本机制、节点选举)
  • 高并发

原理分析

生产者原理

KafkaProducer

生产者

  • ProducerInterceptor 拦截器
    • onSend
    • onAcknowledgement
    • close
    • configure
  • Serializer 序列化器
    • key、value
    • Protobufer
  • Partition 分区器
分区路由
  • 指定分区:使用指定的分区
  • 没有指定分区
    • 定义了分区器:按自定义的规则
    • 未定义分区器:
      • key 非空:hash 以后取余
      • key 为空: 整数自增取模
累加器

累加器

  • 先放入累加器
  • 满了或新建状态,唤醒发送线程

服务端响应

ack

  • 奇数副本节点,确保投票、同步成功(半数工作正常的节点确定 )

    • Leader 选举
    • 数据同步是否完成

    哪些节点等待同步完成?

    • 服务端 ISR: in-sync replica set
      • 动态节点,保留所有工作正常的节点信息
      • 移除规则:
        • 和 Leader 节点保持同步的最大时间间隔 replica.lag.time.max.ms
        • 大于间隔,移除;反之,加入。

    如何判断消息发送成功?

    => 见下一节 服务端 ACK

服务端 ACK

不等待 ACK

=> props.put(“acks”,“0”)

效率最高,可靠性低

不等待 ack

默认:Leader 落盘成功则返回 ACK

=> props.put(“acks”,“1”)

可靠性较低,还是有丢失消息风险

落盘 ack

Leader 和全部 Follpwers 落盘返回 ACK

=> props.put(“acks”,"-1") 或 props.put(“acks”,“all”)

性能最差,可靠性高,但是还是有可能会带来问题,最后环节响应 ACK 失败,发送端如果设置了 retries 重发参数,会发生消息重复的问题。

可靠 ack

分区存储

存储形式

特点

  • 分区存储对应一个目录 xxx-[分区下标]

分区

副本的存储

如图:topic 对应 3 个分区,每个分区一共 3 个副本。

副本

  • 为什么没有像 MySQL 那样设置读写分离?
    • 单调的读写一致性
    • 不用考虑复杂的读写一致性问题

查看副本情况

查看副本

  • 3个分区

  • 3个副本

  • ISR: 当前和 Leader 节点保持同步的节点集合

如图:topic 对应 4 个分区,每个分区 2 个副本

副本2

查看副本情况

副本2

  • 4个分区
  • 2个副本
副本分配规则

AdminUtils.scala -> assignReplicasToBrokers

  • 副本因子不能大于 Broker 的个数
  • 第一个分区(编号为0)的第一个副本放置位置式随机从 brokerList 选择的
  • 其他分区的第一个副本放置位置相对于第 0 个分区依次往后移 nextReplicaShift

分配规则

日志存储格式

日志格式

segment 分段

分割方式

  • 大小分割:log.segment.bytes 单个日志段的最大大小,默认 1073741824 -> 1G

  • 时间分割:

    • log.roll.hours 新日志段轮转时间间隔(小时为单位),次要配置为log.roll.ms
    • log.roll.ms 新日志段轮转时间间隔(毫秒为单位),如果未设置,则使用log.roll.hours
  • 索引写满:log.index.size.max.bytes offset 索引的最大字节数,默认10485760 -> 10M

索引

offset index 偏移量索引

索引

如上图,通过kafka-dump-log.sh脚本查看索引文件。

索引特点:

  • Hash 索引结构
  • 稀疏索引:
    • 并非为每一条数据建立索引
    • 建立条件:log.index.interval.bytes 添加 offset 索引字段大小间隔,默认 4096, 4KB
      • 设置的越大,代表扫描的速度越快,索引越稀疏,也更加耗内存(查找数据和维护索引的开销增大)
      • 设置的越小,代表扫描的速度越慢,索引越密集,也更加省内存
      • 时间复杂度:O(log2n)+O(m) n 表示文件个数,m 表示稀疏程度

索引结构

time index 时间戳索引

  • 定义消息的时间戳类型:log.message.timestamp.type=CreateTime/logAppendTime
    • 消息创建时间
    • 日志追加时间

索引检索过程

  • 根据 offset 匹配 segment

  • 根据 index 索引文件中的 offset 找到消息的 postion

  • 根据 position 从 log 文件中比较,最终找到消息

为什么不用 B+tree 做索引结构?

  • 业务场景决定,写消息远大于读消息频次
  • 数据量过大,大量写消息对 B+tree 压力过大
消息清理策略
  • 开关:log.cleaner.enable=true 默认为true。这意味着cleanup.policy = compact的主题默认被压缩,根据 log.cleaner.dedupe.buffer.size,128 MB的堆将被分配给清理进程。您可以根据您使用的压缩主题来查看 log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。(0.9.0.1中的显著变化

  • 策略:log.cleanup.policy=delete/compact 超出保留窗口期的日志段的默认清理策略。用逗号隔开有效策略列表。有效策略:deletecompact

    • 删除
    • 压紧,删除重复的 key:Log Cleaner默认启用。这会启动清理的线程池。

    压紧

  • 周期:log.retention.check.interval.ms=300 000 日志清理器检查是否有日志符合删除的频率(以毫秒为单位)

  • 过期定义:

    • log.retention.hours 日志删除的时间阈值(小时为单位) 默认 168 小时,即 1 个星期
    • log.retention.minutes 日志删除的时间阈值(分钟为单位),如果未设置,将使用log.retention.hours的值。
    • log.retention.ms 日志删除的时间阈值(毫秒为单位),如果未设置,将使用log.retention.minutes的值。
  • 文件限制:

    • log.retention.bytes 日志删除的大小阈值。
    • log.segment.bytes 单个日志段文件最大大小。

Leader 选举

  • 谁主持选举?Broker Controller
  • 谁参与选举?AR = ISR + OSR
    • AR :所有副本
    • ISR:保持同步的副本
    • OSR:没有保持同步的副本
    • 参数配置:unclean.leader.election.enable 指定副本是否能够不在 ISR 中选举为 Leader,会导致数据丢失,默认为 false。
  • 主从如何同步?
    • PacificA (Microsoft)
PacificA
  • 优先算法:默认设置 ISR 的第一个副本为 Leader
主从同步

主从

  • LEO:Log End Offset,下一条等待写入的消息的 offset(最新的 offset + 1)
  • HW:High Watermark,ISR 中最小的 LEO => 限制消费者最后可以消费的消息,小于 HW 的消息才可以被消费,确保一致性

LEO1

主从同步

LEO2
LEO3

  • Follower 节点会向 Leader 发送一个 fetch 请求,leader 向 follower 发送数据后,需要更新 follower 的 LEO
  • Follower 接收到数据响应后,依次写入消息并更新 LEO
  • Leader 更新 HW (ISR 最小的 LEO)
故障处理

​ Follower 故障

  • 之前记录的 HW,删除 高于 HW 标识的数据,恢复后重新从 HW 之前的 offset 开始同步数据

Leader 故障

  • 之前记录的 HW,删除 高于 HW 标识的数据,重新从新 Leader 同步数据

  • 问题:

    • 保证了副本间的数据一致性
    • 会发生消息丢失和重复

消费者原理

根据 offset 和时间戳进行消费

消费者

offset 的存储

__consumer_offsets => topic 的存储结构

  • GroupMetadata:保存了消费组中各个消费者的信息(每个消费者有编号)
  • OffsetAndMetadata:保存了消费组和各个 partition 的 offset 位移信息元数据

存储结构

  • group.id 取 hash 后 ,对50取模,获取消费组对应绑定分区的下标

  • 消费策略:auto.offset.reset,默认值为 lastest

    • none: 当前没有找到之前的 offset 时抛出异常
    • earliest: 自动从最早的消息开始消费
    • lastest:最近的 offset 开始消费
  • 提交偏移量,commit 后更新消费组的 offset

    • 自动:enable.auto.commit=true

    • 手动:enable.auto.commit=false

      • API:consumer.commitSync();

消费者分配

消费者分配在这里插入图片描述

  • RangeAssignor:默认分配原则,范围固定分配

在这里插入图片描述

  • RoundRobinAssignor:轮询方式分配


  • StickyAssignor :粘滞策略(相对均匀策略,每次基本都不一样)

分区重分配

rebalance 针对分区少,消费者多的情况

性能分析

  • 无需第三方进行消息存储,采用磁盘文件直接存储
  • 磁盘 I/O 本身速度很慢,Kafka 如何优化实现低延迟、高吞吐的目标?
    • 顺序读写 I/O
    • 索引
    • 批量读写和文件压缩
    • 零拷贝

磁盘寻址

  • 盘面旋转
  • 磁头
  • 磁道
  • 扇区
  • 扇面

Kafka 日志文件顺序存放 -> 磁盘顺序读写。

Sequential disk (磁盘顺序读写) 比 Random SSD (固态的随机读写)要更快。

零拷贝

  • 内核空间、用户空间

  • DMA (Direct Memory Access) 直接内存访问

  • 传统 I/O

    • 四次拷贝
    • 四次用户态和内核台的切换
    • 系统方法的2次调用 read、write
    • 2次 CPU 数据拷贝
  • 零拷贝:Linux sendfile 方法,省去 CPU 拷贝的过程,提升至少一倍的性能。

零拷贝实现代码

传输层封装代码:PlaintextTransportLayer

技术总结

  • 主从选举:会选择一个 broker 作为 “controller”节点。controller 节点负责检测 brokers 级别故障,并负责在 broker 故障的情况下更改这个故障 Broker 中的 partition 的 leadership 。这种方式可以批量的通知主从关系的变化,使得对于拥有大量partition 的broker ,选举过程的代价更低并且速度更快。如果 controller 节点挂了,其他 存活的 broker 都可能成为新的 controller 节点。

  • 分布式:

    • 日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性。
    • 每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。
  • 消费者:

    • 消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。
    • 如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例。
    • 如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程。
  • Kafka 建立在 JVM 之上:

    • 对象的内存开销非常高,通常是所存储的数据的两倍(甚至更多)。
    • 随着堆中数据的增加,Java 的垃圾回收变得越来越复杂和缓慢。
  • Kafka 对消息的存储和缓存严重依赖于文件系统:

  • 日志存储:

    • 当旧的数据保留时间超过指定时间、日志大达到规定大小后就丢弃
    • 至少保证日志包含每一个key的最终值而不只是最近变更的完整快照。这意味着下游的消费者可以获得最终的状态而无需拿到所有的变化的消息信息。
  • 高速读写:

    • 网络层相当于一个 NIO 服务, sendfile(零拷贝) 的实现是通过 MessageSet 接口的 writeTo 方法完成的.这样的机制允许 file-backed 集使用更高效的 transferTo 实现,而不在使用进程内的写缓存。线程模型是一个单独的接受线程和 N 个处理线程,每个线程处理固定数量的连接。这种设计方式经过大量的测试,发现它是实现简单而且快速的。协议保持简单以允许未来实现其他语言的客户端.

REFERENCES


image-20200927235342666

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐