前言

好久没有更新AdminClient的博客,主要是一直在摸索怎么用的,官网写的太简略,只能一边研究官网,一边研究源码注释。而且KafkaAPI更新速度飞一样,版本兼容也是个问题,刚刚知道怎么用的,版本过时了又得看新的。而且有些功能API没有提供,就只能去钻研Scala源码,好在和Java差不了很多,虽然不会写,但是勉勉强强能读懂,对于笔者正在开发的项目来说真是崩溃了。所以等笔者把这些内容整理整理,让大家操作Kafka更加顺手。下面就是开始博客正文了,更多内容请点击【Apache Kafka API AdminClient 目录】

查看Topic Partition信息

笔者之前的帖子已经介绍过如何查看一个Topic的信息,其中就包括Partition的信息在内,因此如何查看这些信息,直接点击【Apache Kafka API AdminClient Topic信息的查询】这篇博客即可。在开始之前还是要说下官网对于不同版本对Kafka的处理,还是从2.3.x开始的区分,2.3.x以下版本是一个样,2.3.x以上版本是另一个样。笔者的例子将会份别介绍2.3.x和2.5.x版本是如何使用的。至于最新的2.7.x版本,等笔者的Kafka版本升级到2.7以后再做一次测试,然后会把测试结果更新到相关博客里。

Partition的修改2.3.x以下版本

在开始之前还是要说下官网提供的方法:

Modifier and TypeMethodDescription
CreatePartitionsResultcreatePartitions(Map<String,NewPartitions> newPartitions)Increase the number of partitions of the topics given as the keys of newPartitions according to the corresponding values.
abstract CreatePartitionsResultcreatePartitions(Map<String,NewPartitions> newPartitions, CreatePartitionsOptions options)Increase the number of partitions of the topics given as the keys of newPartitions according to the corresponding values.

Kafka API 2.3.x的版本主要提供了一个方法createPartitions()方法去增加Partition数量,另外一个是抽象方法应该是用来给大家扩展用的,后面的参数CreatePartitionsOptions,也会在前面的方法里new出来一个空的类,所以我们只要关注第一个直接使用的方法就好了。但是要注意笔者这里所写的是增加Partition数量,官网的代码注释也是这样说的,如果减少是会报错的。

/**
 * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
 * according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
 * the partition logic or ordering of the messages will be affected.</strong></p>
 *
 * <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
 * See the overload for more details.</p>
 *
 * @param newPartitions The topics which should have new partitions created, and corresponding parameters
 *                      for the created partitions.
 * @return              The CreatePartitionsResult.
 */
public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
    return createPartitions(newPartitions, new CreatePartitionsOptions());
}

NewPartitions

和Kafka API中其他的方法一样,最终我们也是获取到了一个名为Result的类。那么我们看下这个方法的参数Map<String, NewPartitions> newPartitions要怎么用。参数是Map类型,前面的String泛型就是指Topic的名字,后面的NewPartition那就是用来增加的数量了。NewPartitions要怎么来呢?这里有一个静态方法increaseTo(),他的返回值就是NewPartitions类型,这就好办了。万事俱备,我们就可以写个Sample了:假设现在有一个topic:java_kafka_tst2,我现在要把partition的数量从1增加到3。

Sample

public void changePartitions230() throws ExecutionException, InterruptedException {
    //构建Map
    Map<String,NewPartitions> newPartitions =new HashMap<>();
    //给Map存入Topic名字和想要增加到的partition数量。
    // 这里注意increaseTo,参数传入多少就是增加到多少,3就是增加到3,不是1+3=4
    newPartitions.put("java_kafka_tst2",NewPartitions.increaseTo(3));
    //拿到结果
    CreatePartitionsResult result = adminClient.createPartitions(newPartitions);
    //执行阻塞方法,等待结果完成
    result.all().get();
    System.out.println("done");
}

Partition的修改2.5.x以及以上版本

这里说是2.5.x版本,这个版本只能用来修改kafka 2.5及其以上的版本用的,而且如果对2.5.x版本以上的方法使用,那么2.5.x的版本能不能用createPartitions()方法呢?是可以的,API都会向下兼容,这点无用质疑。Kafka API的2.5.x做了什么修改呢?这个高版本的API可以做到重新分配Partition的关系,通过这种方式变相的减少Partition的数量,并不能真正的减少Partition数量,此方法对于2.7.0依然有效,官网方法如下:

Modifier and TypeMethodDescription
default AlterPartitionReassignmentsResultalterPartitionReassignments(Map<TopicPartition,Optional<NewPartitionReassignment>> reassignments)Change the reassignments for one or more partitions.
AlterPartitionReassignmentsResultalterPartitionReassignments(Map<TopicPartition,Optional<NewPartitionReassignment>> reassignments, AlterPartitionReassignmentsOptions options)Change the reassignments for one or more partitions.

一共有2个方法,要做Sample的是上表中的第一个方法alterPartitionReassignments(Map<TopicPartition,Optional<NewPartitionReassignment>> reassignments),其内部也是调用了第二个方法,后一个参数直接new AlterPartitionReassignmentsOptions(),其实也是同一个方法。

TopicPartition

看到这些方法,其实最重要的类也就是TopicPartition类了,这个关系到修改的TopicPartition。首先还是要把它构造出来,官网对这个构造方法写的也是简单明了。但是这个类却设计的非常奇葩,这个类传入的第一个参数的值确实是Topic的名字;但是第二个参数partition,并不是TopicPartition数量,而是该Topic对应的Partition编号。比如topic1有四个独立的partition(1、2、3、4),如果我们构造TopicPartition("topic1",3),其意义为:选择了topic1的第3号partition,其构造方法如下。

Constructor and Description
TopicPartition(String topic, int partition)

NewPartitionReassignment

NewPartitionReassignment这个类是除了TopicPartition以外,第二个值得一说的类了。这个参数构造的并不是新的Partition,而是把Partition进行重新分配,比如这样一个例子:topic1有四个独立的partition(1、2、3、4),通过这个类能够把topic1中的partition(1、2、3)全部重新Reassignment成为partition(4)的副本,这样就变相的减少了能够使用的Partition数量,而不是直接将某一个Partition删除,它的构造方法如下:

Constructor and Description
NewPartitionReassignment(List<Integer> targetReplicas) )

Sample

首先我们先把Sample写出来,然后再分析这个方法的优劣。

	public void changePartitions250() throws ExecutionException, InterruptedException {
		// 构造要修改的topic和对应的partition id
		TopicPartition topicPartition = new TopicPartition("java_kafka_tst1", 1);
		//构造目标副本的partition id,其实就是broker id
		List<Integer> targetReplicas = new ArrayList<>();
		//把partition id添加进去
		targetReplicas.add(1);
		//使用targetReplicas构造newPartitionReassignment
		NewPartitionReassignment newPartitionReassignment=new NewPartitionReassignment(targetReplicas);
		//构造重新分配的Map
		Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
		//传入构造好的参数topicPartition和newPartitionReassignment
		reassignments.put(topicPartition, Optional.of(newPartitionReassignment));
		//调用重新分配方法
		AlterPartitionReassignmentsResult result = adminClient.alterPartitionReassignments(reassignments);
		// 执行方法
		result.all().get();
		// 等待结果
		if (result.all().isDone()) {
			System.out.println("done");
		} else {
			System.out.println("not done");
		}
	}

效果演示

这个Sample好写,但是比较难以演示,首先我们先看下topic1最初是什么状态:
在这里插入图片描述

可以看到现在一共有四个partition(0、1、2、3),分别在Broker(1、2、3、5)上。现在我们要打散关系,把partition1partition2互为主从,其余的不变,就要在构造TopicPartitiontargetReplicas的时候如下设置:

	public void changePartitions250() throws ExecutionException, InterruptedException {
		// 构造要修改的topic和想要Reassign的partition1 构造出来
		TopicPartition topicPartition = new TopicPartition("java_kafka_tst1", 1);
		List<Integer> targetReplicas = new ArrayList<>();
		//指定目标的id,这里添加的2代表partition2,意思就是partition1 ---指向---> partition2 
		targetReplicas.add(2);
		//...code...
	}

执行的结果如下:
在这里插入图片描述

减少Partition

基于上面的特性,我们就可以把上面的一个一个的合并起来,把partition(0、1、2)全部Reassignpartition3下面,成为partition3的副本,那么就变相的减少了可以使用的partition数量了,所以我们针对这样做如下修改:

	public void changePartitions250() throws ExecutionException, InterruptedException {
		// 构造要修改的topic和对应的partition id
		TopicPartition topicPartition0 = new TopicPartition("java_kafka_tst1", 0);
		TopicPartition topicPartition1 = new TopicPartition("java_kafka_tst1", 1);
		TopicPartition topicPartition2 = new TopicPartition("java_kafka_tst1", 2);
		//构造目标副本的partition id,其实就是broker id
		List<Integer> targetReplicas = new ArrayList<>();
		targetReplicas.add(3);
		//...code...
		reassignments.put(topicPartition0, Optional.of(newPartitionReassignment));
		reassignments.put(topicPartition1, Optional.of(newPartitionReassignment));
		reassignments.put(topicPartition2, Optional.of(newPartitionReassignment));
		//...code...
	}

执行的结果如下,变相的减少了partition的数量:
在这里插入图片描述

多个Partition公用一个副本

同样我们利用这个特性,也可以把一个partition作为多个partition的副本,比如下面这样执行,就会使得partition3成为partition(1、2)的副本。

public void changePartitions270() throws ExecutionException, InterruptedException {
		// 构造要修改的topic和对应的partition id
		TopicPartition topicPartition = new TopicPartition("java_kafka_tst1", 3);
		//构造目标副本的partition id,其实就是broker id
		List<Integer> targetReplicas = new ArrayList<>();
		targetReplicas.add(2);
		targetReplicas.add(1);
		//...code...
	}

在这里插入图片描述

注意项

1. TopicPartition
这里不可以输入无效的partition id。
比如topic1一共有partition(0123)四个partition,那么只有0123是可接受的数值,如果输入的是4则会报错。
new TopicPartition("java_kafka_tst1", 9); ——> 这样的写法不可接受。

2. NewPartitionReassignment不接受不存在的Broker id
也就是说在我们构建targetReplicas这个list的时候,一定要选择创建的Broker id。
一共有partition(0123)对应broker(1235),那么除了1235其余的数值都会报错。
new NewPartitionReassignment(targetReplicas);
targetReplicas.add(0); ——> 这样的写法不可接受。

总结

虽然alterPartitionReassignments()方法给我们提供了修改Partition数量的可能性,但是由于其设计的太过奇葩,导致使用起来也十分的复杂。所以还是老老实实用老版本的createPartitions()方法吧,别想着折腾这东西了。毕竟Partition数量多也不是什么坏事,拿空间换时间在一定程度上还是比较划算的。

附:混用报错

这里笔者把研究过程中报错的例子贴出来。

Kafka 服务器版本2.3.x,Kafka API版本2.5.x 用listPartitionReassignments()方法报错:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_PARTITION_REASSIGNMENTS
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at com.company.AlterConfigDemo.changePartitions250(AlterConfigDemo.java:124)
	at com.company.Main.main(Main.java:71)
Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_PARTITION_REASSIGNMENTS

Kafka 服务器版本2.3.x,Kafka API版本2.3.x 试图减少topic的partition报错:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at com.company.AlterConfigDemo.changePartitions230(AlterConfigDemo.java:114)
	at com.company.Main.main(Main.java:70)
Caused by: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2.
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐