目录

消息的设计思想

Kafka 日志存储结构

日志删除

日志压缩 


消息的设计思想

消息是服务的源头,一切的设计都是为了将消息从一端送到另一端。

        这里面涉及到消息的结构,消息体不能太大,太大容易造成存储成本上升,网络传输开销变大,所以消息体只需要包含必要的信息,最好不要冗余。

        消息最好也支持压缩,通过压缩可以在消息体本身就精简的情况下变的更小,那么存储和网络开销可以进一步降低。

        消息是要持久化的,被消费掉的消息不能一直存储,或者说非常老的消息被再次消费的可能性不大,需要一套机制来清理老的消息,释放磁盘空间,如何找出老的消息是关键,所以每个消息最好带个消息生产时的时间戳,通过时间戳计算出老的消息,在合适的时候进行删除。

        消息也是需要编号的,编号一方面代表了消息的位置,另一方面消费者可以通过编号找到对应的消息。

        大量的消息如何存储也是个问题,全部存储在一个文件中,查询效率低且不利于清理老数据,所以采用分段,通过分段的方式把大的日志文件切割成多个相对小的日志文件来提升维护性,这样当插入消息的时候只要追加在段的最后就行,但是在查找消息的时候如果把整个段加载到内存中一条一条找,似乎也需要很大的内存开销,所以需要一套索引机制,通过索引来加速访问对应的Message。

总结一条kafka的消息包含创造时间消息的序号支持消息压缩,存储消息的日志是分段存储,并且是有索引的。

Kafka 日志存储结构

来看一个整体的kafka日志目录结构图:

Kafka 将消息存储到磁盘中,随着写入数据不断增加,磁盘占用空间越来越大,为了控制占用空间就需要对消息做一定的清理操作。 Kafka 存储日志结构分析中每一个分区副本(Replica)都对应一个 Log,而 Log 又可以分为多个日志分段(LogSegment),这样就便于 Kafka 对日志的清理操作。

Kafka提供了两种日志清理策略:

  1. 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段(LogSegment)。
  2. 日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。

这里我们可以通过 Kafka Broker 端参数 log.cleanup.policy 来设置日志清理策略,默认值为 “delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将 log.cleanup.policy 设置为 “compact”,这样还不够,必须还要将log.cleaner.enable(默认值为 true)设为 true

如果想要同时支持两种清理策略, 可以直接将 log.cleanup.policy 参数设置为“delete,compact”。

日志删除

Kafka 的日志管理器(LogManager)中有一个专门的日志清理任务通过周期性检测和删除不符合条件的日志分段文件(LogSegment),这里我们可以通过 Kafka Broker 端的参数 log.retention.check.interval.ms 来配置,默认值为300000,即5分钟。

在 Kafka 中一共有3种保留策略:

1.基于时间策略

日志删除任务会周期检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs) 来寻找可删除的日志段文件集合(deletableSegments)

其中retentionMs可以通过 Kafka Broker 端的这几个参数的大小判断的

log.retention.ms > log.retention.minutes > log.retention.hours优先级来设置,默认情况只会配置 log.retention.hours 参数,值为168即为7天。

这里需要注意:删除过期的日志段文件,并不是简单的根据该日志段文件的修改时间计算的,而是要根据该日志段中最大的时间戳 largestTimeStamp 来计算的,首先要查询该日志分段所对应的时间戳索引文件,查找该时间戳索引文件的最后一条索引数据,如果时间戳值大于0,则取值,否则才会使用最近修改时间(lastModifiedTime)。

【删除步骤】:

  1. 首先从 Log 对象所维护的日志段的跳跃表中移除要删除的日志段,用来确保已经没有线程来读取这些日志段。
  2. 将日志段所对应的所有文件,包括索引文件都添加上“.deleted”的后缀。
  3. 最后交给一个以“delete-file”命名的延迟任务来删除这些以“ .deleted ”为后缀的文件。默认1分钟执行一次, 可以通过 file.delete.delay.ms 来配置。

 基于时间保留策略示意图

2.基于日志大小策略

日志删除任务会周期检查当前日志大小是否超过设定的阈值(retentionSize) 来寻找可删除的日志段文件集合(deletableSegments)

其中 retentionSize 这里我们可以通过 Kafka Broker 端的参数log.retention.bytes来设置, 默认值为-1,即无穷大。

这里需要注意的是 log.retention.bytes 设置的是Log中所有日志文件的大小,而不是单个日志段的大小。单个日志段可以通过参数 log.segment.bytes 来设置,默认大小为1G。

【删除步骤】:

  1. 首先计算日志文件的总大小Size和retentionSize的差值,即需要删除的日志总大小。
  2. 然后从日志文件中的第一个日志段开始进行查找可删除的日志段的文件集合(deletableSegments)
  3. 找到后就可以进行删除操作了。

基于日志大小保留策略示意图

3.基于日志起始偏移量 

该策略判断依据是日志段的下一个日志段的起始偏移量 baseOffset 是否小于等于 logStartOffset,如果是,则可以删除此日志分段。

【如下图所示 删除步骤】:

  1. . 首先从头开始遍历每个日志段,日志段 1 的下一个日志分段的起始偏移量为20,小于logStartOffset的大小,将日志段1加入deletableSegments。
  2. 日志段2的下一个日志偏移量的起始偏移量为35,也小于logStartOffset的大小,将日志分段2页加入deletableSegments。
  3. 日志段3的下一个日志偏移量的起始偏移量为50,也小于logStartOffset的大小,将日志分段3页加入deletableSegments。
  4. 日志段4的下一个日志偏移量通过对比后,在logStartOffset的右侧,那么从日志段4开始的所有日志段都不会加入deletableSegments。
  5. 待收集完所有的可删除的日志集合后就可以直接删除了。

基于日志起始偏移量保留策略示意图

日志压缩 

日志压缩 Log Compaction 对于有相同key的不同value值,只保留最后一个版本。如果应用只关心 key 对应的最新 value 值,则可以开启 Kafka 相应的日志清理功能,Kafka会定期将相同 key 的消息进行合并,只保留最新的 value 值。

Log Compaction 可以类比 Redis 中的 RDB 的持久化模式。我们可以想象下,如果每次消息变更都存 Kafka,在某一时刻, Kafka 异常崩溃后,如果想快速恢复,可以直接使用日志压缩策略, 这样在恢复的时候只需要恢复最新的数据即可,这样可以加快恢复速度。

日志压缩策略示意图 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐