前言

由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便。为了解决这部分问题,笔者去读了Kafka Scala的源码,从中梳理出来这部分内容供给大家参考。重要:如果你的版本升级到2.7.0及其以上,请参考【Apache Kafka API AdminClient Scram账户的操作(增删改查)】。更多内容请点击【Apache Kafka API AdminClient 目录】

Scala版本

为了操作Scala源码,必须有相应版本的包,怎么看你的Scala版本呢?这个就是在Kafka核心包的<artifactId>键值对里面,如下kafka_2.13后面对应的2.13就是Scala的版本,这个2.13版本同样也是Kafka官方推荐使用的版本,因此我们也就以这个版本为例子去操作账户的创建与删除。要提醒的是如果你使用的版本是Scala 2.12大概率会报错。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId> <!--此处对应的就是Scala版本-->
    <version>2.7.0</version>
</dependency>

获取JAAS认证文件

在开始之前首先得知道什么是Scram账户认证,如果不清楚建议参考【Kafka 如何给集群配置Scram账户认证】,这篇帖子里对Scram认证以及配置有很详细的介绍,这里就不多说废话了,我们已经有了一个kafka-broker-jaas.conf文件用来登陆Zookeeper。

Client{
	org.apache.zookeeper.server.auth.DigestLoginModule required
	username="kafka"
	password="kafka1234";
};

为什么要登陆Zookeeper呢?因为Kafka把认证机制做到了Zookeeper里,而要操作这些需要一个Zookeeper的登陆认证。在这里Zookeeper相当于一个分布式注册中心,随者Kafka的不断升级,Kafka官方也在不断地减少对Zookeeper的依赖。截止到2.7.0版本,当使用命令行创建账户的时候就会收到提示说未来版本可能会下线--zookeeper参数,转而使用参数--broker.server,但是目前还是兼容的。而且Kafka API的2.7.0版本,似乎也引入了操作Scram的API,但是之前的版本还是需要操作Zookeeper,因此我们还是需要这样进行账号的操作。希望2.7.0版本的Scram API会比较好用吧,等笔者弄明白了再分享出来。言归正传,我们可以通过下面几行代码先行把登陆Zookeeper的认证文件kafka-broker-jaas.conf加载到系统中来。

static {
    //获取文件路径,这里笔者使用的是项目路径,也可以用绝对路径,目的是访问到文件,什么方法自由选择
    String path = KafkaCreateUser.class.getClass().getResource("/").getPath();
    //拿到文件对象
    File f = new File(path+"kafka-broker-jaas.conf");
    //存储到系统参数对象中,以备后续使用
    System.setProperty("java.security.auth.login.config", f.getAbsolutePath());
}

使用Scala方法创建账户

获取了文件对象以后,就可以调用Scala中的方法了,我们主要调用的方法是AdminZkClient.changeConfigs(entityType: String, entityName: String, configs: Properties)方法,这个语法是Scala中的语法,有点类似Java,我们直接用就可以了,Sample如下。

public void createAccount() throws NoSuchAlgorithmException {
	//获取ZookeeperClient对象,这里的/kafka是笔者建了一个zk上的目录,如果直接用192.168.33.101:2181
    ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
    //通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
    KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
    //获取Kafka Scala AdminZkClient对象
    AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
    //构造Properties
    Properties properties=new Properties();
    //构造Scram认证机制ScramMechanism为SCRAM_SHA_512
    ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");
    //构造Scram证书credential,这里"password_1234"就是真实的密码
    ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());
    //转化为认证串
    String credentialString=ScramCredentialUtils.credentialToString(credential);
    //添加到properties中备用
    properties.put(scramMechanism.mechanismName(),credentialString);
    //创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串
    adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);
}

使用Scala方法查询存在的账户

和创建一样也需要使用Scala中的方法,这次使用的是依然是AdminZkClient类中的方法,一个是fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String)去查找指定的账户信息,其次是用fetchAllEntityConfigs(entityType: String)查找Kafka服务器中所有的账户信息,Sample如下。

public void findAccount() {
	 //获取ZookeeperClient对象
     ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
     //通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
     KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
     //获取Kafka Scala AdminZkClient对象
     AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
 	 //为了查看结果,构造一个Properties对象用来承接返回值
     Properties properties=new Properties();
     //指定账号查询
     properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");
	 //构建一个接收参数
     Map<String, Properties> propertiesAll=new HashMap<>();
     //查询所有信息
     propertiesAll =adminZkClient.fetchAllEntityConfigs(ConfigType.User());
}

使用Scala方法删除账户

说完创建和查找,那就剩删除了。删除对象其实分为两步,第一步清空Kafka集群上保存的信息,第二部删除Zookeeper上对应的节点。清空信息用的还是changeConfigs()方法,删除节点用的则是Zookeeper包里的delete(final String path, int version)方法,Sample如下。

public void deleteAccount() throws InterruptedException, KeeperException {
	//获取ZookeeperClient对象
    ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
    //通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
    KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
    //获取Kafka Scala AdminZkClient对象
    AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
    adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); 
    //获取Zookeeper对象
    ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();
    //调用delete方法把对应的节点删除,版本设置为-1是不检测版本号
	//注意"/config/users/"这个路径是固定的,Zk里面就是这样存的,"kaf_aaa"是自己拼的账号名字
    zooKeeper.delete("/config/users/" +"kaf_aaa", -1);
}

总结

到此Kafka Scram账户相关的操作告一段落,如果想要删除账户同时清除账号下的权限,可以参考【Apache Kafka API AdminClient 账号对Topic权限赋予与移除】,自己做一个循环删除即可。

附:完整的Sample和注释

public class KafkaUserOperation {
    //加载zookeeper sasl机制授权登陆的配置文件
    static {
        String path = KafkaUserOperation.class.getClass().getResource("/").getPath();
        File f = new File(path+"zk-client-jaas.conf");
        System.setProperty("java.security.auth.login.config", f.getAbsolutePath());
    }
    
    public void createAccount() throws NoSuchAlgorithmException {
    	//获取ZookeeperClient对象
        ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
        //通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
        KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
        //获取Kafka Scala AdminZkClient对象
        AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
        //构造Properties
        Properties properties=new Properties();
        //构造Scram认证机制ScramMechanism为SCRAM_SHA_512
        ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");
        //构造Scram证书credential,这里"password_1234"就是真实的密码
        ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());
        //转化为认证串
        String credentialString=ScramCredentialUtils.credentialToString(credential);
        //添加到properties中备用
        properties.put(scramMechanism.mechanismName(),credentialString);
        //创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串
        adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);
	}
    
    public void findAccount() {
    	//获取ZookeeperClient对象
        ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
        //通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
        KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
        //获取Kafka Scala AdminZkClient对象
        AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
    	//为了查看是否成功,构造一个Properties对象用来承接返回值
        Properties properties=new Properties();
        //指定账号查询
        properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");
        Map<String, Properties> proAll=new HashMap<>();
        //查询所有信息
        proAll=adminZkClient.fetchAllEntityConfigs(ConfigType.User());
	}
	
    public void deleteAccount() throws InterruptedException, KeeperException {
    	//获取ZookeeperClient对象
        ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");
        //通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则
        KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);
        //获取Kafka Scala AdminZkClient对象
        AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
        adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); 
        //获取Zookeeper对象
        ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();
        //调用delete方法把对应的节点删除,版本设置为-1是不检测版本号
        zooKeeper.delete("/config/users/"+"kaf_aaa", -1);
	}
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐