👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

✊✊ 感觉对你有帮助的朋友,可以给博主一个三连,非常感谢 🙏🙏🙏

在这里插入图片描述

1、生产者写入分区的策略有哪些?

生产者写入分区的策略主要有以下几种:

  1. 轮询分区策略:生产者可以使用轮询策略将消息依次写入每个分区,实现负载均衡。在每次发送消息时,生产者会按照轮询的方式选择下一个可用的分区,并将消息写入该分区。这样可以确保消息均匀地分布在各个分区中。

  2. 随机分区策略:Kafka生产者随机的将消息写入分区,有可能会造成消息的分布不均,所以这个策略基本上也很少用。

  3. 按 key 分区策略:Kafka生产者基于消息的键(key)进行哈希计算,然后将消息写入对应的分区。这种策略可以保证具有相同键的消息被写入到相同的分区,从而保证消息的顺序性。

  4. 自定义分区策略:Kafka生产者可以使用自定义分区策略来决定将消息写入哪个分区。

2、轮询分区策略

轮询分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class RoundRobinPartitioner implements Partitioner {
   
    private int currentPartition;
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 初始化当前分区索引
        currentPartition = 0;
    }
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         // 轮询选择下一个分区
        int selectedPartition = currentPartition;
        currentPartition = (currentPartition + 1) % numPartitions;
         return selectedPartition;
    }
    
    @Override
    public void close() {
        // 可选:清理资源
    }
}

partition 方法会使用一个变量 currentPartition 来记录当前选择的分区索引。每次调用 partition 方法时,会将 currentPartition 增加 1,并通过取模运算来确保选择的分区索引始终在分区数范围内。

要使用轮询分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RoundRobinPartitioner");

3、随机分区策略

随机分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
 import java.util.List;
import java.util.Map;
import java.util.Random;

public class RandomPartitioner implements Partitioner {
    
    private final Random random = new Random();
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return random.nextInt(numPartitions);
    }
    
    @Override
    public void close() {
   
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
     
    }
}

partition 方法会随机选择一个分区返回。 random.nextInt(numPartitions) 方法会生成一个小于分区数的随机数,作为分区的索引。

要使用随机分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RandomPartitioner");

4、按 key 分区策略

按 key 分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
 import java.util.List;
import java.util.Map;

public class KeyPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         if (keyBytes == null) {
            // 如果 key 为 null,则使用轮询分区策略
            return Math.abs(key.hashCode()) % numPartitions;
        } else {
            // 使用 key 的哈希码来确定分区
            return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    
    @Override
    public void close() {
        // 可选:清理资源
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 可选:配置方法
    }
}

partition 方法会检查 key 是否为 null。如果 key 为 null,就会使用轮询分区策略,通过计算 key 的哈希码并对分区数取模来确定分区。如果 key 不为 null,则使用 key 的字节数组的哈希码来确定分区。

要使用基于 key 的分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.KeyPartitioner");

5、自定义分区策略

自定义分区的代码如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         // 自定义分区逻辑
        // 根据消息的 key 或 value 来选择分区
        // 这里以 key 的哈希值作为分区选择依据
        int partition = Math.abs(key.hashCode()) % numPartitions;
         return partition;
    }
    
    @Override
    public void close() {
        // 可选:清理资源
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 可选:配置分区器
    }
}

partition 方法根据消息的 key 或 value 来选择分区。这里使用 key 的哈希值进行取模运算,以确保选择的分区索引在分区数范围内。

要使用自定义分区策略,您需要在生产者配置中设置 partitioner.class 属性为您自定义分区器的类名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");

写在最后

通过y以上这些实现,生产者将根据自定义的分区策略来选择分区来发送消息。您可以根据自己的需求,实现不同的分区逻辑。

💕💕 本文由激流原创,原创不易,希望大家关注、点赞、收藏,给博主一点鼓励,感谢!!!
🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐