kafka入门实操教程参考:https://blog.csdn.net/tttalk/article/details/121951552?spm=1001.2014.3001.5502

一、kafka简介

1.概念:分布式的发布订阅式的数据流消息系统。使用scala开发。

2.优点
(1)高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,实现高吞吐原理详见Kafka如何实现高性能IO;
(2)可扩展性:kafka集群支持热扩展;
(3)持久性、可靠性:消息被顺序写入到本地磁盘日志文件中(顺序的磁盘读写相比随机的内存更快而且开销更低),并且支持数据备份防止数据丢失;
(4)容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障);
(5)高并发:支持数千个客户端同时读写。

3.应用场景:
(1) 日志聚合:指将不同服务器上的日志收集起来并放入一个日志中心,通过kafka以统一接口服务的方式开放给各种consumer;
(2) 消息队列(MQ):解耦生产者和消费者、缓存消息等。MQ的常见使用场景如流量削峰、数据驱动的任务依赖等等。在MQ领域,除了Kafka外还有传统的消息队列如ActiveMQ和RabbitMQ等;
(3) 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;
(4)运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
(5) 流式处理:比如spark streaming和storm。

二、kafka基本概念

1、Broker:消息中间件处理结点,生产者消费者通过broker进行topic交互,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
2、Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
3、Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
4、Segment:partition物理上由多个segment组成,后面有详细说明。
5、offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

三、kafka消息传输流程

1.生产者发布消息后,到broker的leader中,接着多个leader进行消息同步,然后是每个broker中leader到follwer的同步。
2.消费者通过zookeeper进行注册,通过负载均衡策略挑取一个broker进行topic的消费。
在这里插入图片描述

四、kafka的存储机制

1.topic中partition存储分布
(1)基本原理
在Kafka文件存储中,同一个topic下有多个不同partition,每一个partition为一个文件夹,partiton命名规则为topic名称+有序序号,第一个partiton序号从0開始。如有两个topic:qq,wechat。partion一个为2,一个为3,则会产生|–qq-0|–qq-1|–wechat-0|–wechat-1|–wechat-2这5个文件夹。
(2)多broker、多副本情况
若有多个broker,可以理解平均每个broker分区数=partions*replication-factor/broker数。
这里解释下replication-factor:副本因子,用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
若partions 设置为10,replicationFactor设置为1. Broker为2.分区会均匀在broker。broker1分区为13579,broker2为246810;
若partions 设置为10,replicationFactor设置为2. Broker为2.每个broker都有副本存在。broker1和broker2副本均为1到10;
若partions 设置为3,replicationFactor设置为1. Broker为3.每个broker都有副本存在。broker1分区为1,broker2为2,broker2为3,当一个broker宕机了,该topic就无法使用了;
若partions 设置为3,replicationFactor设置为2. Broker为3.每个broker都有副本存在。broker1分区为12,broker2为23,broker2为13,当一个broker宕机了,该topic还能使用了。

2.partiton中文件存储方式
(1)每一个partion(文件夹)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件里。
(2)但每一个段segment file消息数量不一定相等,这样的特性方便old segment file高速被删除。(默认情况下每一个文件大小为1G)
(3)每一个partiton仅仅须要支持顺序读写即可了。segment文件生命周期由服务端配置参数决定。
这样做的优点就是能高速删除无用文件。有效提高磁盘利用率。

3.partiton中segment文件存储结构
(1)segment file组成:由2大部分组成。分别为index file和data file,此2个文件一一相应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。根据索引文件可以精确定位数据文件。
(2)segment文件命名规则:partion全局的第一个segment从0開始,兴许每一个segment文件名称为上一个segment文件最后一条消息的offset值。数值最大为64位long大小。19位数字字符长度,没有数字用0填充。
offset:在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它能够唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message

4.在partition中怎样通过offset查找message
比如读取offset=888的message,须要通过以下2个步骤查找。
(1)第一步查找segment file
从刚刚我们了解的segment文件命名规则可以知道每个segment最后一条消息的offset值,知道了这个,使用二分法很快就能找到segment file。
(2)第二步通过segment file查找message
通过第一步定位到segment file,然后通过索引index文件就能定位到数据文件。

5.优点
(1)Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
(2)通过索引信息可以快速定位message和确定response的最大大小。
(3)通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
(4)通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
稀疏索引:这类文件是将所有数据记录关键字值分成许多组,每组一个索引项,这种索引称为稀疏索引。这类文件的数据记录要求按关键字顺序排列。因此,其特点是索引项少,管理方便,但插入、删除记录代价较大。

五、kafka与传统MQ消息系统的区别

1.Kafka持久化日志,这些日志可以重复读取和无限期保存
2.kafka是一个分布式系统,以集群的方式运行,在内部通过复制数据提升容错能力和高可用性
3.kafka支持实时的流式处理

六、Kafka如何实现高性能IO

1.批量处理消息。使用send()发送消息时会先缓存起来批量发送给broker,消费也是以批为单位。
2.使用顺序读写。只需要寻址一次就可以连续的读写下去,相比随机读写省去了很多寻址时间
3.利用缓存页PageCache加速消息读写。通过缓存加快读写速度,缓存清理策略一般是LRU(Least recently used,最近最少使用),优先保留最近常使用的那些缓存,缓存命中的效率时比较高的。
LRU实现:
最常见的实现是使用一个链表保存缓存数据,详细算法实现如下:
(1)新数据插入到链表头部;
(2) 每当缓存命中(即缓存数据被访问),则将数据移到链表头部;
(4) 当链表满的时候,将链表尾部的数据丢弃。
在Java中可以使用LinkHashMap去实现LRU。
4.零拷贝技术。在消费端处理消费的过程中,需要把数据复制到用户内存空间,然后再复制到socket缓冲区,然后这个技术可以直接把数据复制到socket缓冲区。

七、Kafka丢数据

1.消息发送
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:
0—表示不进行消息接收是否成功的确认;
1—表示当Leader接收成功时确认;
-1—表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的情况,配置-1时,不会丢失消息,下面分情况来分析消息丢失的场景:
(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2)acks=1,同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;

针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;

2.消息消费
Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;

3.消息重复解决
将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。

八、Zookeeper对于kafka的应用

(1)用于kafka的分布式应用。主要用于集群中不同节点进行通信。用于提交偏移量,节点失败时可以从之前的偏移量中获取,还用于leader检测、分布式同步、管理配置、集群、节点实时状态等

(2)zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。

九、ISR、AR、OSR

(1)概念介绍
ISR:In-Sync Replicas 副本同步队列。
AR:Assigned Replicas 所有副本。AR=ISR+OSR。
(2)运行原理
ISR是由leader维护,follower从leader同步数据有一些延迟,超过设定的阈值时都会把Broker(follower)剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。

十、为什么Kafka不支持读写分离

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:
(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
(2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

十一、kafka事务

1.Kafka的幂等性
(1)使用
开启幂等性功能的方式很简单,将生产者客户端参数enable.idempotence设置为true(默认false)
(2)实现原理
①Kafka引入了producer id(以下简称PID)和序列号(sequence number)这两个概念。每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。
②对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将对应的序列号的值加1。
③broker端会在内存中为每一对维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new = SN_old + 1)时,broker才会接收它。
④如果SN_new< SN_old + 1,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_new> SN_old + 1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,这个异常是一个严重的异常。

(2)kafka跨分区事务
引入序列号来实现幂等也只是针对每一对而言的,也就是说,Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。幂等性不能跨多个分区运作,而事务可以弥补这个缺陷。从生产者的角度分析,通过事务,Kafka可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。
(1)使用
应用程序必须提供唯一的transactionalId,这个transactionalId通过客户端参数transactional.id来显式设置。事务要求生产者开启幂等特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将enable.idempotence设置为true(如果未显式设置,则KafkaProducer默认会将它的值设置为true)。
(2)实现原理
①transactionalId与PID一一对应,两者之间所不同的是transactionalId由用户显式设置,而PID是由Kafka内部分配的。
②为了保证新的生产者启动后具有相同transactionalId的旧生产者能够立即失效,每个生产者通过transactionalId获取PID的同时,还会获取一个单调递增的producer epoch。如果使用同一个transactionalId开启两个生产者,那么前一个开启的生产者会报错。
③具有相同transactionalId的新生产者实例被创建且工作的时候,旧的且拥有相同transactionalId的生产者实例将不再工作。当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。
④KafkaProducer提供了5个与事务相关的方法,详细如下:

//初始化事务
void initTransactions();
//开启事务
void beginTransaction() throws ProducerFencedException;
//消费者提供在事务内的位移提交的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId)
        throws ProducerFencedException;
//提交事务
void commitTransaction() throws ProducerFencedException;
//中止事务,类似于事务回滚
void abortTransaction() throws ProducerFencedException;

⑤在消费端有一个参数isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。

举个例子,如果生产者开启事务并向某个分区值发送3条消息msg1、msg2和msg3,在执行commitTransaction()或abortTransaction()方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在KafkaConsumer内部会缓存这些消息,直到生产者执行commitTransaction()方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了abortTransaction()方法,那么KafkaConsumer会将这些缓存的消息丢弃而不推送给消费端应用。

十二、Lag机制

1.消息堆积
消息堆积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。对于Kafka而言,虽然消息堆积影响其自身性能,但堆积过多有可能会造成磁盘爆满,或者触发日志清除策略而造成消息丢失的情况。
2.Lag
消息堆积是消费滞后(Lag)的一种表现形式,消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量,也称之为消费滞后(Lag)量。
Partition结构:
在这里插入图片描述

①LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
②ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
③HighWatermark:简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
④LogEndOffset:简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。比如在多个ISR副本数的情况下,消息发送到每个Leader后会更新LEO的值,HW就表示多个副本同时达到的日志位移,也就是LEO最小的那个值。由于Leader消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。

Lag=HW - ConsumerOffset

ConsumerOffset获取:
要么在Zookeeper里,要么在_consumer_offsets这个内部topic中

HW获取:
在Kafka的JMX中可以看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”这样一个属性,但是这个值不是LEO而是HW。
通过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO,可见的LEO)

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐