Apache Kafka API AdminClient Scram账户的创建与删除
由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便。为了解决这部分问题,笔者去读了Kafka Scala的源码,从中梳理出来这部分内容供给大家参考。
前言
由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便。为了解决这部分问题,笔者去读了Kafka Scala的源码,从中梳理出来这部分内容供给大家参考。重要:如果你的版本升级到2.7.0及其以上,请参考【Apache Kafka API AdminClient Scram账户的操作(增删改查)】。更多内容请点击【Apache Kafka API AdminClient 目录】。
Scala版本
为了操作Scala源码,必须有相应版本的包,怎么看你的Scala版本呢?这个就是在Kafka核心包的<artifactId>
键值对里面,如下kafka_2.13后面对应的2.13就是Scala的版本,这个2.13版本同样也是Kafka官方推荐使用的版本,因此我们也就以这个版本为例子去操作账户的创建与删除。要提醒的是如果你使用的版本是Scala 2.12大概率会报错。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId> <!--此处对应的就是Scala版本-->
<version>2.7.0</version>
</dependency>
获取JAAS认证文件
在开始之前首先得知道什么是Scram账户认证,如果不清楚建议参考【Kafka 如何给集群配置Scram账户认证】,这篇帖子里对Scram认证以及配置有很详细的介绍,这里就不多说废话了,我们已经有了一个kafka-broker-jaas.conf
文件用来登陆Zookeeper。
Client{
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="kafka1234";
};
为什么要登陆Zookeeper呢?因为Kafka把认证机制做到了Zookeeper里,而要操作这些需要一个Zookeeper的登陆认证。在这里Zookeeper相当于一个分布式注册中心,随者Kafka的不断升级,Kafka官方也在不断地减少对Zookeeper的依赖。截止到2.7.0版本,当使用命令行创建账户的时候就会收到提示说未来版本可能会下线--zookeeper
参数,转而使用参数--broker.server
,但是目前还是兼容的。而且Kafka API的2.7.0版本,似乎也引入了操作Scram的API,但是之前的版本还是需要操作Zookeeper,因此我们还是需要这样进行账号的操作。希望2.7.0版本的Scram API会比较好用吧,等笔者弄明白了再分享出来。言归正传,我们可以通过下面几行代码先行把登陆Zookeeper的认证文件kafka-broker-jaas.conf
加载到系统中来。
static {
//获取文件路径,这里笔者使用的是项目路径,也可以用绝对路径,目的是访问到文件,什么方法自由选择
String path = KafkaCreateUser.class.getClass().getResource("/").getPath();
//拿到文件对象
File f = new File(path+"kafka-broker-jaas.conf");
//存储到系统参数对象中,以备后续使用
System.setProperty("java.security.auth.login.config", f.getAbsolutePath());
}
使用Scala方法创建账户
获取了文件对象以后,就可以调用Scala中的方法了,我们主要调用的方法是AdminZkClient.changeConfigs(entityType: String, entityName: String, configs: Properties)
方法,这个语法是Scala中的语法,有点类似Java,我们直接用就可以了,Sample如下。
public void createAccount() throws NoSuchAlgorithmException {
//获取ZookeeperClient对象,这里的/kafka是笔者建了一个zk上的目录,如果直接用192.168.33.101:2181
ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
//获取Kafka Scala AdminZkClient对象
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
//构造Properties
Properties properties=new Properties();
//构造Scram认证机制ScramMechanism为SCRAM_SHA_512
ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");
//构造Scram证书credential,这里"password_1234"就是真实的密码
ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());
//转化为认证串
String credentialString=ScramCredentialUtils.credentialToString(credential);
//添加到properties中备用
properties.put(scramMechanism.mechanismName(),credentialString);
//创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串
adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);
}
使用Scala方法查询存在的账户
和创建一样也需要使用Scala中的方法,这次使用的是依然是AdminZkClient
类中的方法,一个是fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String)
去查找指定的账户信息,其次是用fetchAllEntityConfigs(entityType: String)
查找Kafka服务器中所有的账户信息,Sample如下。
public void findAccount() {
//获取ZookeeperClient对象
ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
//获取Kafka Scala AdminZkClient对象
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
//为了查看结果,构造一个Properties对象用来承接返回值
Properties properties=new Properties();
//指定账号查询
properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");
//构建一个接收参数
Map<String, Properties> propertiesAll=new HashMap<>();
//查询所有信息
propertiesAll =adminZkClient.fetchAllEntityConfigs(ConfigType.User());
}
使用Scala方法删除账户
说完创建和查找,那就剩删除了。删除对象其实分为两步,第一步清空Kafka集群上保存的信息,第二部删除Zookeeper上对应的节点。清空信息用的还是changeConfigs()
方法,删除节点用的则是Zookeeper包里的delete(final String path, int version)
方法,Sample如下。
public void deleteAccount() throws InterruptedException, KeeperException {
//获取ZookeeperClient对象
ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
//获取Kafka Scala AdminZkClient对象
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties());
//获取Zookeeper对象
ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();
//调用delete方法把对应的节点删除,版本设置为-1是不检测版本号
//注意"/config/users/"这个路径是固定的,Zk里面就是这样存的,"kaf_aaa"是自己拼的账号名字
zooKeeper.delete("/config/users/" +"kaf_aaa", -1);
}
总结
到此Kafka Scram账户相关的操作告一段落,如果想要删除账户同时清除账号下的权限,可以参考【Apache Kafka API AdminClient 账号对Topic权限赋予与移除】,自己做一个循环删除即可。
附:完整的Sample和注释
public class KafkaUserOperation {
//加载zookeeper sasl机制授权登陆的配置文件
static {
String path = KafkaUserOperation.class.getClass().getResource("/").getPath();
File f = new File(path+"zk-client-jaas.conf");
System.setProperty("java.security.auth.login.config", f.getAbsolutePath());
}
public void createAccount() throws NoSuchAlgorithmException {
//获取ZookeeperClient对象
ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
//获取Kafka Scala AdminZkClient对象
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
//构造Properties
Properties properties=new Properties();
//构造Scram认证机制ScramMechanism为SCRAM_SHA_512
ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");
//构造Scram证书credential,这里"password_1234"就是真实的密码
ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());
//转化为认证串
String credentialString=ScramCredentialUtils.credentialToString(credential);
//添加到properties中备用
properties.put(scramMechanism.mechanismName(),credentialString);
//创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串
adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);
}
public void findAccount() {
//获取ZookeeperClient对象
ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
//获取Kafka Scala AdminZkClient对象
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
//为了查看是否成功,构造一个Properties对象用来承接返回值
Properties properties=new Properties();
//指定账号查询
properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");
Map<String, Properties> proAll=new HashMap<>();
//查询所有信息
proAll=adminZkClient.fetchAllEntityConfigs(ConfigType.User());
}
public void deleteAccount() throws InterruptedException, KeeperException {
//获取ZookeeperClient对象
ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
//获取Kafka Scala AdminZkClient对象
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties());
//获取Zookeeper对象
ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();
//调用delete方法把对应的节点删除,版本设置为-1是不检测版本号
zooKeeper.delete("/config/users/"+"kaf_aaa", -1);
}
}
更多推荐
所有评论(0)