1. Kafka简介

1.1. 简介

Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃live的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。

在这里插入图片描述

三大特点:

  • 高吞吐量
    可以满足每秒百万级别消息的生产和消费——生产消费。

  • 持久性
    有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。

  • 分布式
    基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体

  • 健壮性。

1.2. 设计目标
  • 高吞吐率 :在廉价的商用机器上单机可支持每秒100万条消息的读写

  • 消息持久化 :所有消息均被持久化到磁盘,无消息丢失,支持消息重放

  • 完全分布式 :Producer,Broker,Consumer均支持水平扩展

  • 同时适应在线流处理和离线批处理

1.3. kafka核心的概念

Kafka的服务:

  • Topic:主题,Kafka处理的消息的不同分类。
  • Broker:消息服务器代理,Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。
  • Partition:Topic物理上的分组,一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定。
  • Message:消息,是通信的基本单位,每个消息都属于一个partition

Kafka服务相关:

  • Producer:消息和数据的生产者,向Kafka的一个topic发布消息。
  • Consumer:消息和数据的消费者,定于topic并处理其发布的消息。
  • Zookeeper:协调kafka的正常运行。

2. Kafka架构之道

2.1. Kafka基本架构

在这里插入图片描述
注意:

  1. Kafka中的broker对于调用者而言都是透明的,也就是说各个broker的地位都是一样的,但是在kafka内部有区分,主要就是controller和非controller之分,controller的角色我们可以在zookeeper的对应目录/kafka/controller中获取对应的brokerid。

  2. 在kafka1.0以下的版本中使用zk来保存kafka消费者的offset(目录为/kafka/consumers/**),但是在kafka1.0以上,不再使用zookeeper来保存,主要原因在于,避免zookeeper负载过高,造成相关联的框架无法使用,此时在kafka提供了一个系统级别的topic:__consumer_offsets来报错偏移量信息。

2.2. Topic & Partition
  • Topic

    逻辑概念,同一个Topic的消息可分布在一个或多个节点(Broker)上
    一个Topic包含一个或者多个Partition
    每条信息都属于且仅属于一个Topic
    Producer发布数据是,必须制定该消息发布到哪一个Topic
    Consumer订阅消息时,也必须制定订阅哪个Topic的消息

在这里插入图片描述

  • Partition

    物理概念,一个Partition只分布在一个Broker上(不考虑备份)
    一个partition物理上对应一个文件夹
    一个Partition包含多个Segment(Segment对用户透明)
    一个Segment对应一个文件,Segment由一个个不可变记录组成
    记录只会被append到Segment中,不会被单独删除或者修改
    清除过期日志时,直接删除一个或多个Segment
    segment文件(log文件)文件名规范: 这个文件里面第一条消息的offset - 1

在这里插入图片描述

2.3. Kafka消息flush和Retention策略
  • flush策略

    ############################# Log Flush Policy #############################
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    ## 每当每一个topic接收到10000条message的时候,就会将数据flush到磁盘
    log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #每个1s flush一次数据
    log.flush.interval.ms=1000
    

    ​ 为了提供kafka的读写数据能力,首先接收数据到kafka内存,不可能无限制的保存在内存,所以必然会将数据flush到磁盘(partition的segement)文件,在flush的时候做了Durability和Latency和Throughput的权衡与取舍。

  • retention策略

    ############################# Log Retention Policy #############################
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    # 日志最小的保留时间:7天,超过这个时间,数据可能会被清理掉
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned(裁剪) from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    ## segement文件如果超过log.retention.bytes的配置,将会可能被裁剪,直到小于log.retention.bytes配置
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    # 一个segment文件最大的大小,超过log.segment.bytes一个G,将会创建一个新的segment文件
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    ## 每隔5分钟,检测一次retention策略是否达到
    log.retention.check.interval.ms=300000
    

    ​ partition对应的文件,就保存在一个个的segment文件中,每一个文件默认大小是1G,但是log.retention.check.interval.ms监测频率是5分钟一次,所以segment文件可能会超过1G,此时就会启动retion策略,将文件裁剪到log.retention.bytes配置,如果超过了log.segment.bytes=1G配置,将会创建一个新的segment文件;默认情况,segment文件会保留7天。

2.4. Kafka消息检索原理
  • message的物理结构

在这里插入图片描述

  • .index文件和.log文件说明

    partition分区目录下的文件列表,主要包含两种类型的文件 x.index索引文件和x.log segment文件,其中x.log保存的是message信息,x.index保存的是索引数据。

在这里插入图片描述

​ 这二者文件的大致结果如下:

在这里插入图片描述

为什么会出现消息offset和文件中的偏移量不一样的问题?

因为一个partition下面有多个segment文件,segment文件当达到retention策略之后将会被裁剪或删除,同时partition中的offset是单调递增的,从0开始增加,但是segment文件中的消息在该文件中的偏移量指的是文件开头到该文件走过的字节长度,显然这两个不一样。

​ 所以,直接根据msg的offset是无法直接读取到消息的,那怎么办?所以此时就需要俺们的x.index中保存的相对偏移量来帮忙了。

​ x.index中保存的内容:

  1. index文件的序号就是message在日志文件中的相对偏移量
  2. OffsetIndex是稀疏索引,也就是说不会存储所有的消息的相对offset和position

也就是说index文件的序号对应的是log文件中的消息偏移量;index文件中的地址栏对应的是log文件中文件中的便宜字节。

  • 通过命令查看segment文件内容
kafka-run-class.sh kafka.tools.DumpLogSegments \
--print-data-log \		--->打印读取到的segment日志文件内容
--files 00000000000000000000.log	--->指定读取的segment日志文件

读取到的数据格式如下:

在这里插入图片描述

其中的offset是该条message在该partition中的偏移量,position为该条消息在该文件中的字节偏移量。

  • 消息检索过程

    以这个partition目录下面,00000000001560140916为例,现在要定位offset 为1560140921的message

  1. 定位到具体的segment日志文件
    由于log日志文件的文件名是这个文件中第一条消息的(offset-1). 因此可以根据offset定位到这个消息所在日志文件:00000000001560140916.log

  2. 计算查找的offset在日志文件的相对偏移量
    segment文件中第一条消息的offset = 1560140917
    计算message相对偏移量:需要定位的offset - segment文件中第一条消息的offset + 1 = 1560140921 - 1560140917 + 1 = 5
    查找index索引文件, 可以定位到该消息在日志文件中的偏移字节为456. 综上, 直接读取文件夹00000000001560140916.log中偏移456字节的数据即可。
    1560140922 -1560140917 + 1 = 6
    如果查找的offset在日志文件的相对偏移量在index索引文件不存在, 可根据其在index索引文件最接近的上限偏移量,往下顺序查找

3.Kafka controller的选举过程

​ 说明:kafka的controller的选举是在所有kafka节点启动的时候发生的,或者当controller挂掉,再从其余的broker中选举出一台作为controller。

​ 所以查看controller的选举入口,最简单就是kafka的启动,通过kafka-server-start.sh脚本,发现该类为kafka.Kafka。

在这里插入图片描述

4. Kafka高性能之道

4.1. 高性能原因
  • 高效使用磁盘
  1. 顺序写磁盘 顺序写磁盘性能高于随机写内存
  2. Append Only 数据不更新,无记录级的数据删除(只会整个segment删除)
  3. 充分利用Page Cache
    I/O Scheduler将连续的小块写组装成大块的物理写从而提高性能
    I/O Scheduler会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
  4. 充分利用所有空闲内存(非JVM内存)
    应用层cache也会有对应的page cache与之对应,直接使用page cache可增大可用cache,如使用heap内的cache,会增加GC负担
  5. 读操作可直接在page cache内进行。如果进程重启,JVM内的cache会失效,但page cache仍然可用
  6. 可通过如下参数强制flush,但并不建议这么做
    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
  7. 支持多Directory
  • 零拷贝

    传统模式下数据从文件传输到网络需要4次数据拷贝,4次上下文切换和2次系统调用

在这里插入图片描述

通过NIO的transferTo/transferFrom调用操作系统的sendfile实现零拷贝。总共2次内核数据拷贝,2次上下文切换和1次系统调用,消除了CPU数据拷贝

在这里插入图片描述

  • 批处理和压缩

    1. Producer和Consumer均支持批量处理数据,从而减少了网络传输的开销

    2. Producer可将数据压缩后发送给broker,从而减少网络传输代价。目前支持Snappy,Gzip和LZ4压缩

  • Partition

    1. 通过Partition实现了并行处理和水平扩展

    2. Partition是Kafka(包括kafka Stream)并行处理的最小单位

    3. 不同Partition可处于不同的Broker,充分利用多机资源

    4. 同一Broker上的不同Partition可置于不同的Directory,如果节点上有多个Disk Drive,可将不同的Drive对应的Directory,从而是Kafka充分利用Disk Drive的磁盘优势

  • ISR

    1. ISR实现了可用性和一致性的动态平衡
      replica.log.time.max.ms=10000
      replica.log.max.messages=4000

    2. ISR可容忍更多的节点失败
      Majority Quorum如果要容忍f个节点失败,至少需要2f+1个节点
      ISR如果要容忍f个节点失败,至少需要f+1个节点

    3. 如何处理Replica Crach
      Leader crash后,ISR中的任何replica皆可竞选称为Leader
      如果所有replica都crash,可选择让第一个recover的replica或者第一个在ISR中的replica称为leader
      unclean.leader.election.enable=true

4.2. kafka性能影响因子
  • producer

在这里插入图片描述

producer和吞吐量成正比

  • consumer

在这里插入图片描述

consumer数据量在没有达到partition个数之前,和消费的吞吐量成正比。

  • partition

在这里插入图片描述

分区格式和生成的吞吐量,在一定范围内,先增长,当达到某一个值之后区域稳定,在上下浮动

  • message-size

在这里插入图片描述

随着message size的增大,生产者对应的每秒生成的记录数在成下降趋势,区里的数据体积成上升趋势。

  • replication

在这里插入图片描述

副本越大,自然需要同步数据的量就越多,自然kafka的生成的吞吐量就越低。

  • 借助kafka脚本来查看kafka集群性能

1.kafka-producer-perf-test.sh

在这里插入图片描述

bin/kafka-producer-perf-test.sh --topic spark \
--num-records 100000 \		-->测试生成多少条记录
--throughput 10000 \		--->生产这的吞吐量,约等于messages/sec
--record-size 10 \			-->每条消息的大小
--producer.config config/producer.properties

2.kafka-consumer-perf-test.sh
在这里插入图片描述

bin/kafka-consumer-perf-test.sh --topic spark \
--broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 \
--messages 100000 ---->总共要读取多少条记录

读取到的结果

start.time=2019-08-06 02:31:23:738	--->开始时间
end.time=2019-08-06 02:31:24:735	--->结束时间
data.consumed.in.MB=0.9534			--->总共消费的数据体积
MB.sec=0.9562						--->每秒钟消费数据体积
data.consumed.in.nMsg=100000		--->总共消费的数据记录数
nMsg.sec=100300.9027				--->每秒钟消费记录数
rebalance.time.ms=47				--->进行rebalance的时间
fetch.time.ms=950					--->抓取这10w条数据总共花费多长时间
fetch.MB.sec=1.0035					--->每秒钟抓取数据体积
fetch.nMsg.sec=105263.1579			--->每秒钟抓取数据记录数
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐