• 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。
  • 📝个人公众号:爱敲代码的小黄
  • 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神
  • 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人

一、生产者消息分区机制

当我们在使用 Kafka 时,我们肯定希望将数据均匀的分配到所有服务器上。这样,我们的负载均衡就变的及其完美。

1. 分区原因

简单来说,Kafka 的消息组织方式结构:主题 - 分区 - 副本 - 消息

一条消息,只能保存到一个分区内,不会在多个分区保存多份。

image-20220309235653221

简单想想,为什么我们的 Kafka 已经有 Topic 了,还需要做一个分区出来呢?

主要的原因在于:实现系统的高伸缩性,不同的分区能够放置到不同节点的机器上,我们可以通过添加机器增加整体系统的吞吐量

当然,这里也可以使用 AKF 来进行解释:

AKF 立方体也叫做scala cube,它在《The Art of Scalability》一书中被首次提出,旨在提供一个系统化的扩展思路。

AKF 把系统扩展分为以下三个维度:

  • X 轴:直接水平复制应用进程来扩展系统。
  • Y 轴:将功能拆分出来扩展系统。
  • Z 轴:基于用户信息扩展系统。

与我们 Kafka 相对应:

  • X 轴:使用可靠的副本机制
  • Y 轴:不同的功能拆分—Topic
  • Z轴:不同的用户消费数据—partition

分区的概念很早之前就已经引进了,比如:

  • MongoDBElasticsearch 叫做分片 Shard
  • HBase 叫做 Region
  • Cassandra 叫做 vnode

2. 分区策略

Kafka 的分区策略一般为 轮询、随机、按 key 值。当然,我们也可以自定义分区策略。

2.1 轮询策略

顺序分配。比如一个主题下面有 3 个分区,消息分配的格式如下:

image-20220310000621321

Kafka 默认是轮询策略。轮询策略有非常优秀的负载均衡表现,总能保证消息最大限度的平均分配到所有分区上。

2.2 随机策略

随机的将我们的消息放置到任意一个分区上。

image-20220310000744010

实现方式:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 根据分区的大小,随机生成
return ThreadLocalRandom.current().nextInt(partitions.size());

如果想追求数据的均匀分布,还是使用轮询策略比较好。

2.3 按消息键保存策略

kafka 允许为每套消息定义消息键,简称 Key

这个 key 可以是一个有明确业务含义的字符串,如客户代码、部门编号、业务ID等,我们可以将相同的 key 分到一个分区。

image-20220310223832077

实现方式:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 相同的hash分到一个分区
return Math.abs(key.hashCode()) % partitions.size();
2.4 其他分区策略

如果上述三种策略不能支撑你的业务发展,那么你可以尝试自定义你自己的策略。

我们需要实现 public interface Partitioner extends Configurable, Closeable {} 该接口,定义自己的分区策略

public class UserDefinePartitioner implements Partitioner {
    private AtomicInteger counter = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取该Topic下所有的分区数
        List<PartitionInfo> partitioners = cluster.partitionsForTopic(topic);
        int numPartitioners = partitioners.size();
        if (keyBytes == null) {
            // 原子性增加一
            int addIncrement = counter.getAndIncrement();
            // 取模
            return addIncrement % numPartitioners;
        } else {
            // 对keyBytes进行Hash然后取模
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitioners;
        }
    }

    @Override
    public void close() {
        System.out.println("close");
    }

    @Override
    public void configure(Map<String, ?> map) {
        System.out.println("configure");
    }
}

2.5 案例展示

目前我们有一批数据,这一批数据有因果关系,所以我们处理这一批数据必须要保持其 有序性。否则我们先处理了果,再处理因,肯定会出现业务问题。

我们使用最简单的方法,建立一个 Topic,Topic 只含有一个分区,将我们的因和果发到 Topic,实现顺序性。这样做虽然实现了顺序性,但是丧失了 Kafka 带来的高吞吐了和负载均衡能力。

我们将我们因的 key 设置为 because,将果的key 设置成 result,重写我们的分配策略 return Math.abs(key.hashCode()) % partitions.size();。这样,我们的生产者会将因和果分别发送到不同的分区,不同的消费者指定消费不同的分区(consumer.assign()),实现相应的业务逻辑。

二、生产者压缩算法

压缩(comparession),用时间换取空间的经典 trade-off 思想,具体来说,就是用 CPU 时间去换取磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。

1. 怎样压缩

kafka 有两个类消息格式,分别为 v1 和 v2 版本。

Kafka 的消息层次分为两次:消息集合(message set)以及消息(message)。一个消息集合包含多个日志项(record item),日志项是真正封装消息的地方。

v1:message set、message

v2:record batch、record

image-20220310235102022

我们的 v2 版本将消息的公共部分抽取出来放到了外层消息集合里面,这样不需要每一条消息都保存这些数据了。

我们原来的 V1 版本中,每条消息都需要执行 CRC 校验,但有些情况下消息的 CRC 值是会变化的。比如在 Broker 端可能对时间戳字段进行更新或执行消息格式转化更新 CRC 值,所以,我们没条消息都执行 CRC 检验没必要,不仅浪费空间还浪费时间。因此在 V2 版本中,消息的 CRC 校验工作就被移动到了消息集合这一层。

V1 版本保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息字段中;V2 版本的做法是对整个消息集合进行压缩。

性能测试:

image-20220311000856633

2. 何时压缩

压缩可能发生的地方:生产者端、Broker 端

生产者可配置 props.put("compression.type", "gzip"); 即可开启 gzip 压缩

在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?

大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存。有两种例外情况就可能让 Broker 重新压缩消息。

  • Broker 指定了和 Producer 端不同的压缩算法
    • 这是是生产者和 Broker 的关联。一旦在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升。
  • Broker 端发生了消息格式转换。一般是 V1 和 V2 消息版本转换带来的。会使其丧失零拷贝的功效。
    • 这里指的是消费者去 Broker 消费消息时,我们 Broker 消息版本和消费者的消息版本不一致,需要从内核态切换到用户态,将消息进行格式转换,随后发给我们的消费者。

3. 何时解压缩

解压缩通常发生在消费者。一般情况下,我们会将压缩算法封装到消息集合中,消费者通过压缩算法进行解压缩。

Producer 端压缩、Broker 端保持、Consumer 端解压缩。

Broker 也会进行解压缩,每个压缩过的消息集合在 Broker 端写入时,都要发生解压缩操作,目的是为了对消息执行各种验证。但这种操作对CPU性能严重降低。

国内京东的小伙伴们刚刚向社区提出了一个 bugfix,建议去掉因为做消息校验而引入的解压缩。据他们称,去掉了解压缩之后,Broker 端的 CPU 使用率至少降低了 50%。

4. 压缩算法对比

下面这张表是 Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果:

image-20220311001712482

  • 吞吐量方面:LZ4 > Snappy > zstd 和 GZIP
  • 压缩比方面:zstd > LZ4 > GZIP > Snappy
  • 网络带宽:使用 Snappy 算法占用的网络带宽最多,zstd 最少
  • CPU使用率:压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。

当你的CPU资源充足、环境带宽有限时,我建议你开启压缩。毕竟带宽比 CPU 更稀缺。压缩算法一般选择 zstd

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐