【手撕 Kafka】深入学习 Kafka 写入日志的原理
服务端将生产者产生的消息集存储到日志文件,要考虑对消息集进行分段存储。如图6-3所示,服务端将消息追加到日志文件,并不是直接写人底层的文件,具体步骤如下。1. 每个分区对应的日志对象管理了分区的所有日志分段。2. 将消息集追加到当前活动的日志分段,任何时刻,都只会有一个活动的日志分段3. 每个日志分段对应一个数据文件和索引文件,消息内容会追加到数据文件中。4. 操作底层数据的接口是文件通道,消息集
👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO 专家博主
⛪️ 个人社区:个人社区
💞 个人主页:个人主页
🙉 专栏地址:✅带你手撕 Kafka
🙉八股文专题:剑指大厂,手撕 Java 八股文
1、Kafka 写入日志的步骤
服务端将生产者产生的消息集存储到日志文件,要考虑对消息集进行分段存储。如图6-3所示,服务端将消息追加到日志文件,并不是直接写人底层的文件,具体步骤如下。
- 每个分区对应的日志对象管理了分区的所有日志分段。
- 将消息集追加到当前活动的日志分段,任何时刻,都只会有一个活动的日志分段
- 每个日志分段对应一个数据文件和索引文件,消息内容会追加到数据文件中。
- 操作底层数据的接口是文件通道,消息集提供一个writeFullyTo()方法,参数是文件通道
- 消息集(ByteBufferMessageSet)的writeFullyTo()方法,调用文件通道的write()方法,将底层包含消息内容的字节缓冲区(ByteBuffer)写到文件通道中。
- 字节缓冲区写到文件通道中,消息就持久化到日志分段对应的数据文件中了
2、消息集
生产者发送消息时,会在客户端将属于同一个分区的一批消息,作为一个生产请求发送给服务端。Java版本和Scala版本的生产者在客户端生成的消息集对象不一样,Java版本的消息内容本身就是字节缓冲区(ByteBuffer)Scala版本则是消息集(Message*)。为了兼容两个版本,两者都要转换为底层是字节缓冲区的ByteBufferMessageSet对象。
伪代码如下:
// Java版本的生产者客户端传递的消息内容是ByteBuffer,无需额外处理
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
// Scala版本的客户端传递Message对象,要将消息集填充到字节缓冲区中
def this(codec: CompressionCodeccounter:LongRef,messages: Message*) {
// create()的返回值是ByteBuffer,通过this()再调用类级别的构造函数
this(create(0ffsetAssigner(counter,messages.size),messages:_*))
}
}
// Scala版本将消息集写到字节缓冲区中,并创建一个ByteBufferMessageSet对象
object ByteBufferMessageSet {
def create(offsetAssigner:OffsetAssigner ,messages;Message*):ByteBuffer={
val buffer = ByteBuffer,allocate(MessageSet,messageSetSize(messages))
// 将每条消息写入到字节缓冲区中
for(message<- messages)
writeMessage(buffer,message,offsetAssignernextAbsoluteOffset())
buffer.rewind()
buffer
}
def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
buffer.putLong(offset)// 消息的偏移量
buffer.putInt(message.size)// 消息大小
buffer.put(message.buffer) // 消息内容
message.buffer.rewind()
}
}
消息集中的每条消息(Message)都会被分配一个相对偏移量,而每一批消息的相对偏移量都是从0开始的。
下图给出了一个示例,生产者写到分区P的第一批消息有4条消息,对应的偏移量是[0,1,2,3];第二批消息有3条消息,对应的偏移量是[0,1,2]。客户端每次发送给服务端的一批消息,它的字节缓冲区只属于这一批消息,字节缓冲区不是共享的数据结构。
消息集中的每条消息由3部分组成:偏移量、数据大小、消息内容。如所示,每条消息除了保存消息的键值内容外,还保存一些其他数据,比如校验值、魔数、键的长度、值的长度等。
注意:Scala版本的消息格式在Message.scala类中,Java版本的消息格式在Record,java中。
消息集中每条消息的第一部分内容是偏移量。Kafka存储消息时,会为每条消息都指定一个唯一的偏移量。同一个分区的所有日志分段,它们的偏移量从0开始不断递增。不同分区的偏移量之间没有关系,所以说Kaka只保证同一个分区的消息有序性,但是不保证跨分区消息的有序性。
消息集中每条消息的第二部分是当前这条消息的长度。消息长度通常不固定,而且在读取文件时客户端可能期望直接定位到指定的偏移量。记录消息长度的好处是:如果不希望读取这条消息,只需要读取出消息长度这个字段的值,然后跳过这些大小的字节,这样就可以定位到下一条数据的起始位置。
第三部分是消息的具体内容,和消息集的第二部分类似,每条消息的键值之前也都会先记录键的长度和值的长度。注意:消息格式是在客户端定义的消息集在传给服务端之前,就用ByteBufferMessageSet封装好。服务端接收的每个分区消息就是ByteBufferMessageSet。
消息集的writeMessage()方法将每条消息(Message)填充到字节缓冲区中,缓冲区会暂存每个分区的一批消息 这个方法实际上是在客户端调用的,填充消息 才,会为这批消息设置从开始递增的偏移量,如下所示,在服务端调用文件通道的写方法时,才会将消息集字节缓冲区的内容刷写到文件中。
2、Kafka 日志追加方式
服务端将每个分区的消息追加到日志中,是以日志分段为单位的。当日志分段累加的消息达到阙值大小(文件大小达到1GB)时,会新创建一个日志分段保存新的消息,而分区的消息总是追加到最新的日志分段中。每个日志分段都有一个基准偏移量(segmentBaseoffset,或者叫baseoffset),这个基准偏移量是分区级别的绝对偏移量,而且这个值在日志分段中是固定的。有了这个基准偏移量,就可以计算出每条消息在分区中的绝对偏移量,最后把消息以及对应的绝对偏移量写到日志文件中。
日志追加方法中的messages参数是客户端创建的消息集,这里面的偏移量是相对偏移量。在追加到日志分段时,validMessages变量已经是绝对偏移量了,具体步骤如下。
- 对客户端传递的消息集进行验证,确保每条消息的(相对)偏移量都是单调递增的。
- 删除消息集中无效的消息。如果大小一致,直接返回messages,否则会进行截断。
- 为有效消息集的每条消息分配(绝对)偏移量。
- 将更新了偏移量值的消息集追加到当前日志分段中。
- 更新日志的偏移量(下一个偏移量 nextOffsetMetadata )必要时调用flush()方法刷写磁盘。
nextOffsetMetadata 读写操作发生在 务端处理生产请求和拉取请求时,具体步骤如下:
- 生产者发送消息集给服务端,服务端会将这一批消息追加到日志中。
- 每条消息需要指定绝对偏移量,服务端会用nextoffsetMetadata的值作为起始偏移量。
- 服务端将每条带有偏移量的消息写入到日志分段中。
- 服务端会获取这一批消息中最后一条消息的偏移量,加上一后更新nextoffsetMetadata。
- 消费线程(消费者或备份副本)会根据这个变量的最新值拉取消息。一旦变量值发生变化消费线程就能拉取到新写入的消息。
nextoffsetMetadata变量是一个关于日志的偏移量元数据对象(LogoffsetMetadata)。日志的偏移量元数据都是从当前活动的日志分段(activeSegment)获取相关的信息:下一条消息的偏移量、当前日志分段的基准偏移量、当前日志分段的大小。
3、分析和验证消息集
对消息集进行分析和验证,主要利用了Kafka中“分区的消息必须有序”这个特性。分析和验证方法的返回值是一个日志追加信息(LogAppendInfo)对象,该对象的内容包括:消息集第一条和最后条消息的偏移量、消息集的总字节大小、偏移量是否单调递增。
日志追加信息表示消息集的概要信息,但并不包括消息内容。日志追加信息对象也是追加日志方法的最后返回值。服务端上层类(比如分区、副本管理器调用追加日志的方法,期望得到这一批消息的概要信息,比如第一个偏移量和最后一个偏移量。这样,它们就可以根据偏移量计算出一共追加了多少条消息(服务端接收的消息集和最后真正被追加的消息数量可能会不一样)。上层类甚至还可以做一些复杂的业务逻辑处理,比如根据最后一个偏移量判断被延迟的生产请求是否可以完成。相关代码如下:
//对要追加的消息集进行分析和验证,消息太大或者无效会被丢弃
def analyzeAndValidateMessageSet(messages:ByteBufferMessageSet)={
var shallowMessageCount = 0 //消息数量
var validBytesCount =0 //有效字节数
//第一条消息和最后一条 (循环时表示上一条消息的偏移量)消息的偏移量
var firstOffset,astoffset = -1L
var monotonic =true // 是否单调递增
for(messageAndOffset <- messages,shallowIterator) {
// 在第一条消息中更新firstoffset
if(firstOffset <0) firstOffset = messageAndOffset.offset
if(lastOffset >= messageAndOffset.offset) monotonic = false
//每循环一条消息,就更新
lastoffsetLastOffset = messageAndOffset.offset
val m= messageAndoffset.message
val messageSize = MessageSet.entrySize(m)
m.ensureValid()//检查消息是否有效
shallowMessageCount +=1
validBytesCount += messageSize
}
LogAppendInfo(firstOffset,lastOffset, sourceCodec,targetCodec,shallowMessageCount,validBytesCount,monotonic)
}
前面说过,消息集对象中消息的偏移量是从0开始的相对偏移量,并且它的底层是一个字节缓冲区。那么要获得消息集中第一条消息和最后一条消息的偏移量,只能再把字节缓冲区解析出来,读取每一条消息的偏移量。这里因为还要对每条消息进行分析和验证,所以读取消息是不可避免的。
分析消息集的每条消息时,都会更新最近的偏移量(lastoffset)但只会在分析第一条消息时更新起始偏移量(firstoffset)。判断消息集中所有消息的偏移量是否单调递增,只需要比较最近的偏移量和当前消息的偏移量。如果每次处理一条消息时,当前消息的偏移量都比最近的偏移量值(上-条消息的偏移量)大,说明消息集是单调递增的。
对消息集的每条消息都验证和分析后,下一步要为消息分配绝对偏移量,最后才能追加到日志分段
4、为消息集分配绝对偏移量
存储到日志文件中的消息必须是分区级别的绝对偏移量。为消息集分配绝对偏移量时,以nextoffsetMetadata的偏移量作为起始偏移量。分配完成后还要更新nextoffsetMetadata的偏移量值。为了保证在分配过程中,获取偏移量的值并加一是一个原子操作,起始偏移量会作为原子变量传人validateMessagesAndAssignoffsets()方法。相关代码如下:
// 消息集 加到 志,获取最近的偏移量作为初始佳
class Log{
def append(l'lessages : ByteBuffe MessageSet) {
// nextOffsetMetadata 表示最近 一条消息的偏移量
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
//offset参数作为原子变量,在分配偏移量时,先获取出值再加一
validMessages=validMessages.validateMessagesAndAssignOffsets(offset)
//offset的返回值是最后一条消息的偏移量再加一,那么最后一条消息就要减一
appendInfo.lastOffset = offset.get - 1
segment.append(appendInfo.firstoffset,validMessages) // 追加消息集
//更新nextoffsetMetadata,用最后一条消息的偏移量加一表示最近下一条
updateLogEndOffset(appendInfo.lastOffset +1)
}
}
//字节缓冲区消息集根据指定的偏移量计数器、更新每条消息的偏移量
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
def validateMessagesAndAssignOffsets(offsetCounter:AtomicLong)={
var messagePosition =0
buffer.mark()//先标记
while(messagePosition < sizeInBytes - MessageSet.LogOverhead){
buffer.position(messagePosition)// 定位到每条消息的起始位置
//以最新的偏移量计数器为基础,每条消息的偏移量都在此基础上不断加一
buffer.putLong(offsetCounter.getAndIncrement())
val messageSize= buffer.getInt()
// 消息的大小//更新消息的起始位置,为下一条消息做准备 (12+消息大小,表示一条完整的消息)
messagePosition += MessageSet.LogOverhead + messageSize
buffer.reset()//重置的时候,回到最开始标记的地方
this// 还是返回字节缓冲区消息集。除了偏移量改了,其他均没有变化
}
}
根据“1.消息集”中消息集的格式,为消息分配偏移量,实际上是更新每条消息的偏移量数据(offset)。消息的大小(size)和消息内容(Message)都不需要变动。现在的问题主要是:如何在字节缓冲区中定位到每条消息的偏移量所在位置。定位消息偏移量的方式有两种:一种是按照顺序完整地读取每条消息,这种方式代价比较大,我们实际上只需要更改偏移量,不需要读取每条消息的实际内容;另一种是先读取出消息大小的值,然后计算下一条消息的起始偏移量,最后直接用字节缓冲区提供的定位方法(position())直接定位到下一条消息的起始位置。
因为底层字节缓冲区和消息集对象是一一对应的,所以消息集中第一条消息的偏移量一定是从字节缓冲区的位置0开始的。每条消息的长度计算方式是:8 + 4 + 消息大小。其中,消息大小的值可以从第二部分读取。如表6-2所示,第一条消息中“消息的大小”存的值是3,表示消息本身的内容长度是3,整个消息占用的大小就是:8+4+3=15。
假设偏移量计数器初始值为10(即nextoffsetMetadata的值)第一条消息的偏移量就等于10。分配第一条消息的偏移量时,修改前面8字节的内容为10。接下来要修改第二条消息的偏移量为11,通过读取第一条消息的大小(等于3)再加上12字节,就定位到第二条消息起始位置(等于15)。修改第三条消息的偏移量为12也是类似的,通过读取第二条消息的大小(等于5)再加上12字节(第二条消息总共占用了17字节)就可以定位到第三条消息的起始位置(15再加上17等于32)以此类推,第四条消息的起始位置等于第三条消息占用的12字节再加上32,等于48。在写人每条消息的绝对偏移量后,只会读取消息的大小,不会读取这条消息的实际内容。
消息集经过分配绝对偏移量后,才可以追加到日志分段中,日志分段接收消息集并写到文件中。
精彩专栏推荐订阅:在下方专栏👇🏻
✅ 2023年华为OD机试真题(A卷&B卷)+ 面试指导
✅ 精选100套 Java 项目案例
✅ 面试需要避开的坑(活动)
💕💕 本文由激流原创,原创不易,希望大家关注、点赞、收藏,给博主一点鼓励,感谢!!!
更多推荐
所有评论(0)