Kafka 日志详解

Apache Kafka日志存储在物理磁盘上各种数据的集合,日志按照topic分区进行文件组织,每一个分区日志由一个或者多个文件组成。生产者发送的消息被顺序追加到日志文件的末尾。

在这里插入图片描述

如上图所述,Kafka主题被划分为3个分区。在Kafka中,分区是一个逻辑工作单元,其中记录被顺序附加分区上 (kafka只能保证分区消息的有序性,而不能保证消息的全局有序性)。但是分区不是存储单元,分区进一步划分为Segment - 段,这些段是文件系统上的实际文件。为了获得更好的性能和可维护性,可以创建多个段,而不是从一个巨大的分区中读取,消费者现在可以更快地从较小的段文件中读取。创建具有分区名称的目录,并将该分区的所有段作为各种文件进行维护。图中 topic主题分为3个分区

  • Partition 0 - 目前有三个segment文件段组成,Segment0、Segment1已经写满,目前Segment 2处于活跃状态
  • Partition 1
  • Partition 2

Segment

文件说明

|── my-topic-0
   ├── 00000000000000000000.index
   ├── 00000000000000000000.log
   ├── 00000000000000000000.timeindex
   ├── 00000000000000001007.index
   ├── 00000000000000001007.log
   ├── 00000000000000001007.snapshot
   ├── 00000000000000001007.timeindex

文件说明:

  • **.log 文件 - 此文件包含实际记录,并将记录保持到特定偏移量,文件名描述了添加到此文件的起始偏移量
  • .index 文件 - ** 索引文件,记录偏移量映射到 .log 文件的字节偏移量,此映射用于从任何特定偏移量读取记录
  • **.timeindex 文件- 时间戳索引文件,此文件包含时间戳到记录偏移量的映射,该映射使用.index文件在内部映射到记录的字节偏移量。这有助于从特定时间戳访问记录
  • .snapshot 文件 - 包含用于避免重复记录序列ID的生产者快照。出现Leader选举时使用,避免出现数据重复

数据长度

之前提到过,log文件的文件名表示该文件的起始偏移量。那么从上面的文件我们可以分析出,第一个日志段00000000000000000000.log包含从偏移量0到偏移量1006的记录。原因是下一个段00000000000000001007.log具有从偏移量1007开始的记录,这称为活动段。

在这里插入图片描述

当前活跃的Segment段文件是唯一可用于读取和写入的文件,而用户可以使用其他日志段(非活动)读取数据。当活动段变满(由log.segment.bytes配置,默认为1 GB)或配置的时间(log.roll.hours或log.roll.ms,默认为7天)过去时,该段将被滚动。这意味着活动段将以只读模式关闭并重新打开,并且将以读写模式创建新的段文件(活动段)。

log.roll.hours

Segment 日志保留的时间配置,单位为小时,默认168小时,即7天。

Type:int
Default:168
Valid Values:[1,…]
Importance:high
Update Mode:read-only

在生产环境中,该参数需要结合业务实际情况进行合理配置,否则就会出现磁盘爆满的问题。

我之前的工作经验中出现过此类问题,由于生产者产生数据频率较快,在2-3天之内就已经将500G的硬盘占满,但是kafka默认保留7天日志,导致数据没有及时清理,从而导致磁盘占满的问题。后面经过权衡,配置了48小时,之后再没有出现暴磁盘的现象。

位移索引

索引有助于消费者从任何指定偏移量或使用任何时间范围读取数据。如前所述,.index文件包含一个索引,该索引将逻辑偏移量映射到.log文件中记录的字节偏移量。您可能希望每个记录都可以使用此映射,但它不能以这种方式工作 。

如何在索引文件中生成新的索引项由log.index.interval.bytes参数定义,默认值为4096字节。这意味着在日志中每添加4096个字节后,就会向索引文件中添加一个索引项。假设生产者向Kafka主题发送消息,占100字节。在这种情况下,在日志文件中每追加41条记录(41*100=4100字节)后,将向.index文件中添加一个新的索引项。

在这里插入图片描述

消费者从指定偏移量位置读取数据的,步骤大致如下:

  1. 根据topic名称搜索.index文件。例如,如果偏移量为1191,将搜索其名称值小于1191的索引文件。索引文件的命名约定与日志文件相同(这一点非常重要)
  2. 在.index文件中搜索请求的偏移量所在的索引项
  3. 使用映射的字节偏移量访问**.log**文件,并开始使用该字节偏移量的记录

时间索引

消费者可能还希望从特定的时间戳读取记录。这就是使用到.timeindex索引文件。它维护一个时间戳和偏移量映射(映射到.index文件中的索引项),映射到.log文件中的实际字节偏移量。

在这里插入图片描述

如图索引,根据时间戳读取数据,比根据位移读取数据要复杂一些,多经历了一个数据查找的步骤。

指定读取

现在写两个按照指定位移/时间戳读取消息的demo。

位移读取

public class OffsetConsumer {

    public static void main(String[] args) {
        String bootstrapServers = "127.0.0.1:9092";
        String topic = "topic_t40";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // assign
        TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
        long offsetToReadFrom = 200L;
        consumer.assign(Arrays.asList(partitionToReadFrom));

        // seek to offset 200
        consumer.seek(partitionToReadFrom, offsetToReadFrom);

        boolean keepOnReading = true;

        // poll for new data
        while(keepOnReading){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records){
                System.out.println("Message received " + record.value() + ", partition " + record.partition() + ", offset=" + record.offset());
            }
        }
    }
}

在这里插入图片描述

如上图所示,指定读取分区0,从偏移量200开始读取数据

时间戳读取

public class TimestampConsumer {

    public static void main(String[] args) throws Exception{
        String topicName = "topic_t40";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_hp1");
        props.put("client.id", "client_01");
        props.put("enable.auto.commit", true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
        List<TopicPartition> topicPartitionList = partitions
                .stream()
                .map(info -> new TopicPartition(topicName, info.partition()))
                .collect(Collectors.toList());
        consumer.assign(topicPartitionList);

        Map<TopicPartition, Long> partitionTimestampMap = topicPartitionList.stream()
                .collect(Collectors.toMap(tp -> tp, tp -> 1672239981330L));
        Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap);
// Force the consumer to seek for those offsets
        partitionOffsetMap.forEach((tp, offsetAndTimestamp) -> consumer.seek(tp, offsetAndTimestamp.offset()));

        boolean keepOnReading = true;
        while(keepOnReading){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records){
                System.out.println("Message received " + record.value() + ", partition " + record.partition() + ", offset=" + record.offset() + ", timestamp=" + record.timestamp());
            }
        }
    }
}

在这里插入图片描述

如上图所示,打印出来的消息时间都大于指定时间。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐