kafka producer发送消息的时候,可以指定key,这个key的作用是为消息选择存储分区,key可以为空,当指定key且不为空的时候,kafka是根据key的hash值与分区数取模来决定数据存储到那个分区,那么当key为null的时候,kafka又是如何存储的呢?可能很多人都会说随机选择一个分区进行存储,但是具体是怎么实现的呢?虽然可以这么说,但是还不够严谨,下面通过kafka的源码来解读下key=null是怎么存储的:

从上面源码可以看出,当key=null时,kafka是先从缓存中取分区号,然后判断缓存的值是否为空,如果不为空,就将消息存到这个分区,否则重新计算要存储的分区,并将分区号缓存起来,供下次使用。

那么这个缓存时间是由什么决定呢,再来看下源码:

从上面源码可以看出,kafka定义了一个全局变量,这个变量值是配置参数中的topic.metadata.refresh.interval.ms设置的值,也就是说在这个时间内,key=null的消息都会往缓存起来的这个分区存储,当时缓存过时之后,就会重新计算分区号,将计算结果缓存起来。也就是说在key为null的情况下,Kafka并不是每条消息都随机选择一个Partition;而是每隔topic.metadata.refresh.interval.ms才会随机选择一次!

sarama指定key代码:

关于key和分区

在创建消息时既可以指定key也可以不指定。Key除了可以保存额外的信息之外,还用于决定消息将会写入哪个分区,也就是说具有相同key的消息都会保存在同一分区。

当key为空且使用默认的分区器时,消息会被随机发送到指定topic的其中一个可用分区,会使用round-robin算法均衡分区间的消息。

当key不为空且使用默认的分区器时,Kafka会计算该key的hash值(使用其自己的hash算法,因此当升级Java版本时hash值不会改变),并使用得到的hash值把消息映射到特定的分区。因为把一个key始终映射到同一分区是非常重要的,所以需要使用一个topic的所有分区来计算映射关系,而不仅仅是可用的分区。这意味着,如果当写入消息到一个不可用的分区时,会出现异常,但是这种情况很少见。

只要一个topic的分区数量不变,key与分区的映射关系就能保证一致。但是如果你添加一个新的分区到一个topic时,虽然存在的数据仍然会保存在原来的分区里,但具有相同key的新消息不能保证还会写入到原来的分区。所以在创建topic时最好预先定义好需要的分区数量,避免后期添加新的分区造成映射关系的不一致。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐