Apache Kafka API AdminClient 修改Topic的Partition
前言好久没有更新AdminClient的博客,主要是一直在摸索怎么用的,官网写的太简略,只能一边研究官网,一边研究源码注释。而且KafkaAPI更新速度飞一样,版本兼容也是个问题,刚刚知道怎么用的,版本过时了又得看新的。而且有些功能API没有提供,就只能去钻研Scala源码,好在和Java差不了很多,虽然不会写,但是勉勉强强能读懂,对于笔者正在开发的项目来说真是崩溃了。所以等笔者把这些内容整理整理
前言
好久没有更新AdminClient的博客,主要是一直在摸索怎么用的,官网写的太简略,只能一边研究官网,一边研究源码注释。而且KafkaAPI更新速度飞一样,版本兼容也是个问题,刚刚知道怎么用的,版本过时了又得看新的。而且有些功能API没有提供,就只能去钻研Scala源码,好在和Java差不了很多,虽然不会写,但是勉勉强强能读懂,对于笔者正在开发的项目来说真是崩溃了。所以等笔者把这些内容整理整理,让大家操作Kafka更加顺手。下面就是开始博客正文了,更多内容请点击【Apache Kafka API AdminClient 目录】。
查看Topic Partition信息
笔者之前的帖子已经介绍过如何查看一个Topic的信息,其中就包括Partition的信息在内,因此如何查看这些信息,直接点击【Apache Kafka API AdminClient Topic信息的查询】这篇博客即可。在开始之前还是要说下官网对于不同版本对Kafka的处理,还是从2.3.x开始的区分,2.3.x以下版本是一个样,2.3.x以上版本是另一个样。笔者的例子将会份别介绍2.3.x和2.5.x版本是如何使用的。至于最新的2.7.x版本,等笔者的Kafka版本升级到2.7以后再做一次测试,然后会把测试结果更新到相关博客里。
Partition的修改2.3.x以下版本
在开始之前还是要说下官网提供的方法:
Modifier and Type | Method | Description |
---|---|---|
CreatePartitionsResult | createPartitions(Map<String,NewPartitions> newPartitions) | Increase the number of partitions of the topics given as the keys of newPartitions according to the corresponding values. |
abstract CreatePartitionsResult | createPartitions(Map<String,NewPartitions> newPartitions, CreatePartitionsOptions options) | Increase the number of partitions of the topics given as the keys of newPartitions according to the corresponding values. |
Kafka API 2.3.x的版本主要提供了一个方法createPartitions()
方法去增加Partition
数量,另外一个是抽象方法应该是用来给大家扩展用的,后面的参数CreatePartitionsOptions
,也会在前面的方法里new
出来一个空的类,所以我们只要关注第一个直接使用的方法就好了。但是要注意笔者这里所写的是增加Partition数量,官网的代码注释也是这样说的,如果减少是会报错的。
/**
* <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
* according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
* the partition logic or ordering of the messages will be affected.</strong></p>
*
* <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
* See the overload for more details.</p>
*
* @param newPartitions The topics which should have new partitions created, and corresponding parameters
* for the created partitions.
* @return The CreatePartitionsResult.
*/
public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
return createPartitions(newPartitions, new CreatePartitionsOptions());
}
NewPartitions
和Kafka API中其他的方法一样,最终我们也是获取到了一个名为Result
的类。那么我们看下这个方法的参数Map<String, NewPartitions> newPartitions
要怎么用。参数是Map类型,前面的String
泛型就是指Topic
的名字,后面的NewPartition
那就是用来增加的数量了。NewPartitions
要怎么来呢?这里有一个静态方法increaseTo()
,他的返回值就是NewPartitions
类型,这就好办了。万事俱备,我们就可以写个Sample了:假设现在有一个topic:java_kafka_tst2
,我现在要把partition
的数量从1增加到3。
Sample
public void changePartitions230() throws ExecutionException, InterruptedException {
//构建Map
Map<String,NewPartitions> newPartitions =new HashMap<>();
//给Map存入Topic名字和想要增加到的partition数量。
// 这里注意increaseTo,参数传入多少就是增加到多少,3就是增加到3,不是1+3=4
newPartitions.put("java_kafka_tst2",NewPartitions.increaseTo(3));
//拿到结果
CreatePartitionsResult result = adminClient.createPartitions(newPartitions);
//执行阻塞方法,等待结果完成
result.all().get();
System.out.println("done");
}
Partition的修改2.5.x以及以上版本
这里说是2.5.x版本,这个版本只能用来修改kafka 2.5及其以上的版本用的,而且如果对2.5.x版本以上的方法使用,那么2.5.x的版本能不能用createPartitions()
方法呢?是可以的,API都会向下兼容,这点无用质疑。Kafka API的2.5.x做了什么修改呢?这个高版本的API可以做到重新分配Partition
的关系,通过这种方式变相的减少Partition
的数量,并不能真正的减少Partition
数量,此方法对于2.7.0依然有效,官网方法如下:
Modifier and Type | Method | Description |
---|---|---|
default AlterPartitionReassignmentsResult | alterPartitionReassignments(Map<TopicPartition,Optional<NewPartitionReassignment> > reassignments) | Change the reassignments for one or more partitions. |
AlterPartitionReassignmentsResult | alterPartitionReassignments(Map<TopicPartition,Optional<NewPartitionReassignment> > reassignments, AlterPartitionReassignmentsOptions options) | Change the reassignments for one or more partitions. |
一共有2个方法,要做Sample的是上表中的第一个方法alterPartitionReassignments(Map<TopicPartition,Optional<NewPartitionReassignment>> reassignments)
,其内部也是调用了第二个方法,后一个参数直接new AlterPartitionReassignmentsOptions()
,其实也是同一个方法。
TopicPartition
看到这些方法,其实最重要的类也就是TopicPartition
类了,这个关系到修改的Topic
的Partition
。首先还是要把它构造出来,官网对这个构造方法写的也是简单明了。但是这个类却设计的非常奇葩,这个类传入的第一个参数的值确实是Topic
的名字;但是第二个参数partition
,并不是Topic
的Partition
数量,而是该Topic
对应的Partition
编号。比如topic1
有四个独立的partition(1、2、3、4)
,如果我们构造TopicPartition("topic1",3)
,其意义为:选择了topic1
的第3号partition
,其构造方法如下。
Constructor and Description |
---|
TopicPartition(String topic, int partition) |
NewPartitionReassignment
NewPartitionReassignment
这个类是除了TopicPartition
以外,第二个值得一说的类了。这个参数构造的并不是新的Partition
,而是把Partition
进行重新分配,比如这样一个例子:topic1
有四个独立的partition(1、2、3、4)
,通过这个类能够把topic1
中的partition(1、2、3)
全部重新Reassignment
成为partition(4)
的副本,这样就变相的减少了能够使用的Partition
数量,而不是直接将某一个Partition
删除,它的构造方法如下:
Constructor and Description |
---|
NewPartitionReassignment(List<Integer> targetReplicas) ) |
Sample
首先我们先把Sample写出来,然后再分析这个方法的优劣。
public void changePartitions250() throws ExecutionException, InterruptedException {
// 构造要修改的topic和对应的partition id
TopicPartition topicPartition = new TopicPartition("java_kafka_tst1", 1);
//构造目标副本的partition id,其实就是broker id
List<Integer> targetReplicas = new ArrayList<>();
//把partition id添加进去
targetReplicas.add(1);
//使用targetReplicas构造newPartitionReassignment
NewPartitionReassignment newPartitionReassignment=new NewPartitionReassignment(targetReplicas);
//构造重新分配的Map
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
//传入构造好的参数topicPartition和newPartitionReassignment
reassignments.put(topicPartition, Optional.of(newPartitionReassignment));
//调用重新分配方法
AlterPartitionReassignmentsResult result = adminClient.alterPartitionReassignments(reassignments);
// 执行方法
result.all().get();
// 等待结果
if (result.all().isDone()) {
System.out.println("done");
} else {
System.out.println("not done");
}
}
效果演示
这个Sample好写,但是比较难以演示,首先我们先看下topic1
最初是什么状态:
可以看到现在一共有四个partition(0、1、2、3)
,分别在Broker(1、2、3、5)
上。现在我们要打散关系,把partition1
和partition2
互为主从,其余的不变,就要在构造TopicPartition
和targetReplicas
的时候如下设置:
public void changePartitions250() throws ExecutionException, InterruptedException {
// 构造要修改的topic和想要Reassign的partition1 构造出来
TopicPartition topicPartition = new TopicPartition("java_kafka_tst1", 1);
List<Integer> targetReplicas = new ArrayList<>();
//指定目标的id,这里添加的2代表partition2,意思就是partition1 ---指向---> partition2
targetReplicas.add(2);
//...code...
}
执行的结果如下:
减少Partition
基于上面的特性,我们就可以把上面的一个一个的合并起来,把partition(0、1、2)
全部Reassign
到partition3
下面,成为partition3
的副本,那么就变相的减少了可以使用的partition
数量了,所以我们针对这样做如下修改:
public void changePartitions250() throws ExecutionException, InterruptedException {
// 构造要修改的topic和对应的partition id
TopicPartition topicPartition0 = new TopicPartition("java_kafka_tst1", 0);
TopicPartition topicPartition1 = new TopicPartition("java_kafka_tst1", 1);
TopicPartition topicPartition2 = new TopicPartition("java_kafka_tst1", 2);
//构造目标副本的partition id,其实就是broker id
List<Integer> targetReplicas = new ArrayList<>();
targetReplicas.add(3);
//...code...
reassignments.put(topicPartition0, Optional.of(newPartitionReassignment));
reassignments.put(topicPartition1, Optional.of(newPartitionReassignment));
reassignments.put(topicPartition2, Optional.of(newPartitionReassignment));
//...code...
}
执行的结果如下,变相的减少了partition
的数量:
多个Partition公用一个副本
同样我们利用这个特性,也可以把一个partition
作为多个partition
的副本,比如下面这样执行,就会使得partition3
成为partition(1、2)
的副本。
public void changePartitions270() throws ExecutionException, InterruptedException {
// 构造要修改的topic和对应的partition id
TopicPartition topicPartition = new TopicPartition("java_kafka_tst1", 3);
//构造目标副本的partition id,其实就是broker id
List<Integer> targetReplicas = new ArrayList<>();
targetReplicas.add(2);
targetReplicas.add(1);
//...code...
}
注意项
1. TopicPartition
这里不可以输入无效的partition id。
比如topic1一共有partition(0、1、2、3)四个partition,那么只有0、1、2、3是可接受的数值,如果输入的是4则会报错。
new TopicPartition("java_kafka_tst1", 9); ——> 这样的写法不可接受。
2. NewPartitionReassignment不接受不存在的Broker id
也就是说在我们构建targetReplicas这个list的时候,一定要选择创建的Broker id。
一共有partition(0、1、2、3)对应broker(1、2、3、5),那么除了1、2、3、5其余的数值都会报错。
new NewPartitionReassignment(targetReplicas);
targetReplicas.add(0); ——> 这样的写法不可接受。
总结
虽然alterPartitionReassignments()
方法给我们提供了修改Partition数量的可能性,但是由于其设计的太过奇葩,导致使用起来也十分的复杂。所以还是老老实实用老版本的createPartitions()
方法吧,别想着折腾这东西了。毕竟Partition数量多也不是什么坏事,拿空间换时间在一定程度上还是比较划算的。
附:混用报错
这里笔者把研究过程中报错的例子贴出来。
Kafka 服务器版本2.3.x,Kafka API版本2.5.x 用listPartitionReassignments()方法报错:
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_PARTITION_REASSIGNMENTS
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at com.company.AlterConfigDemo.changePartitions250(AlterConfigDemo.java:124)
at com.company.Main.main(Main.java:71)
Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_PARTITION_REASSIGNMENTS
Kafka 服务器版本2.3.x,Kafka API版本2.3.x 试图减少topic的partition报错:
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at com.company.AlterConfigDemo.changePartitions230(AlterConfigDemo.java:114)
at com.company.Main.main(Main.java:70)
Caused by: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 2.
更多推荐
所有评论(0)