前言

很久没有更新Kafka API相关的文档了,因为笔者工作变动Kafka这部分内容在工作中接触的就相对于之前少了一些。但架不住kafka官方还是一如既往的勤奋,官方操作Scram账户的创建与删除这部分已经更新了好久了,这次也算是填坑吧,主要就是针对alterUserScramCredentials方法做一个功能说明和demo。给网上少之又少的Kafka API中文使用教程做个增补,本次基于Kafka API 2.8.0,同时适用于2.7.0版本,但是该版本API并不是很完善,创建的账户可能会有无法读写Topic的问题。如果你使用的Kafka还没有升级到这么高的版本,请参考【Apache Kafka API AdminClient Scram账户的创建与删除】这篇博客,其是针对2.7.0及其以下的Scram账户操作。更多内容请点击【Apache Kafka API AdminClient 目录】

操作Scram账户的方法

首先我们先看官方文档中对于操作Scram账户是怎么说的,查询官方的文档只提供了一个方法:

Modifier and TypeMethodDescription
default AlterUserScramCredentialsResultalterUserScramCredentials(List< UserScramCredentialAlteration> alterations)Alter SASL/SCRAM credentials for the given users.
AlterUserScramCredentialsResultalterUserScramCredentials(List< UserScramCredentialAlteration> alterations, AlterUserScramCredentialsOptions options)Alter SASL/SCRAM credentials.

根据官方给的文档描述,上表中第二个方法是对第一个方法的扩展,所以这里不做讨论。我们主要集中于alterUserScramCredentials()方法的使用上。根据描述,该方法只接受一个泛型为UserScramCredentialAlteration 的List参数,接着看下UserScramCredentialAlteration是什么样子。

Field Summary

Modifier and TypeFieldDescription
protectedString user

Constructor Summary

ModifierConstructorDescription
protectedUserScramCredentialAlteration(String user)

Method Summary

Modifier and TypeMethodDescription
Stringuser()

根据官方文档描述里面只有一个String类型的user字段,显然不可能只根据这一个字段构造一个能够使用的账户。其实UserScramCredentialAlteration是一个抽象的类,它还有两个子类UserScramCredentialDeletionUserScramCredentialUpsertion。从名字就可以看出来,创建大概是UserScramCredentialUpsertion,删除则应该是UserScramCredentialDeletion。所以我们传入的List中的泛型,也就是这两个类的具体实现了。

创建一个Scram账户

UserScramCredentialUpsertion

官方文档对UserScramCredentialUpsertion类的解释是:

A request to update/insert a SASL/SCRAM credential for a user.

看来这个类不仅仅能做创建,而且也能做更新,大方向是没问题了。首先我们还是看下创建UserScramCredentialUpsertion这个类要怎么构造:

ConstructorDescription
UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password)Constructor that generates a random salt
UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password, byte[] salt)Constructor that accepts an explicit salt
UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, String password)Constructor that generates a random salt

从上述表格中可以看到这个类有三个构造方法,构建这个类除了需要父类提供的user名字以外,还需要另外两个参数ScramCredentialInfopassword。可以看到其实最后一个构造方法passwordString,这个可以说是最方便的了。第二个中为什么还要有一个salt参数,笔者深入代码只发现了一句话new Field("salt", Type.COMPACT_BYTES, "A random salt generated by the client.")。可能是给用户自定义什么功能用的,但是kafka自己也会生成这个东西,由于官方文档和源码都没有找到更多的解释,我们暂且忽略,如果哪位笔者知道这点,也行评论区不吝赐教。

ScramCredentialInfo

那么我们就转向ScramCredentialInfo这个类的构造:

ConstructorDescription
ScramCredentialInfo(ScramMechanism mechanism, int iterations)

可以看到这个类的构建就比较简单明了,两个参数一个是ScramMechanism也就是Scram认证机制,这是一个enum类型内置SCRAM_SHA_256SCRAM_SHA_512UNKNOWN三个类型,UNKNOWN类型会在创建时报异常。另外一个iterations是循环次数the number of iterations used when creating the credential,代表你要创建的账户个数,也就是List.size()吧。

示例代码
public void createAccount(String name, String pwd, String salt) throws ExecutionException, InterruptedException {
    //创建User列表
    List<UserScramCredentialAlteration> alterations = new ArrayList<>();
    //构造Scram认证机制信息,这里笔者选择了SCRAM_SHA_512,大家也可以选择 ScramMechanism.SCRAM_SHA_256
    //alterations.size()此时为0,或许会报错,可以试下传入数字构造,比如下面添加了一个认证信息,那么这里传入数字1。
    // ScramCredentialInfo info=new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, alterations.size()); //这里时间久远,忘记当时写例子的场景了
    ScramCredentialInfo info=new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 10000);
    //三个UserScramCredentialAlteration构造方法,三选一笔者选了一个最简单的
    //UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd.getBytes());
    //UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd.getBytes(),salt.getBytes());
    UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd);
    //添加认证信息到列表
    alterations.add(userScramCredentialAdd);
    //执行方法,并拿到返回结果
    AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations);
    //阻塞等待结果完成
    result.all().get();
}

通过官网给出的解释,我们知道对账户的更新操作也是使用UserScramCredentialUpsertion类进行,因此只要保证user name一致即可对相应的user中的内容进行更新,这点大家自己验证吧。

删除一个Scram账户

删除账户用的则是子类UserScramCredentialDeletion,一样我们先看它的构造方法:

ConstructorDescription
UserScramCredentialDeletion(String user, ScramMechanism mechanism)

果然删除就不会像增加或者修改一样墨迹,只有一个构造方法可用,参数上面也都说过了,直接上示例代码:

public void deleteAccount(String name) throws ExecutionException, InterruptedException {
    //创建删除列表
    List<UserScramCredentialAlteration> alterations = new ArrayList<>();
    //构建删除用的UserScramCredentialAlteration
    UserScramCredentialAlteration userScramCredentialDel=new UserScramCredentialDeletion(name,ScramMechanism.SCRAM_SHA_512);
    //添加认证信息到列表
    alterations.add(userScramCredentialDel);
    //执行方法,并拿到返回结果
    AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations);
    //阻塞等待结果完成
    result.all().get();
}

查询Scram账户信息

说完对Scram账户的增删改以后,剩下的自然就剩下查询了,官方也提供了一个方法用于查询:

Modifier and TypeMethodDescription
default DescribeUserScramCredentialsResultdescribeUserScramCredentials()Describe all SASL/SCRAM credentials.
default DescribeUserScramCredentialsResultdescribeUserScramCredentials(List< String> users)Describe SASL/SCRAM credentials for the given users.
DescribeUserScramCredentialsResultdescribeUserScramCredentials(List< String> users, DescribeUserScramCredentialsOptions options)Describe SASL/SCRAM credentials.

可以看到大体上也是只有一个方法describeUserScramCredentials(),只不过对这个方法做了一个重载,用于指定查询某些用户的信息,参数很简单不用多说,直接上示例代码:

public void describeAccount() throws ExecutionException, InterruptedException {
	//***************************************查询所有用户信息*****************************************************
    //查询所有的账户,这也是默认方法
    DescribeUserScramCredentialsResult result = adminClient.describeUserScramCredentials();
    //执行方法,并拿到返回结果
    Map<String, UserScramCredentialsDescription> future = result.all().get();
    //输出
    future.forEach((name,info)-> System.out.println("[ScramUserName:"+name+"]:[ScramUserInfo:"+info.toString()+"]"));

    //***************************************这里是分割线*****************************************************
    //***************************************查询指定的用户信息*****************************************************
    //构造指定的用户列表
    List<String> userScramList=new ArrayList<>();
    //添加两个用户
    userScramList.add("user1");
    userScramList.add("user2");
    //传入特定用户列表执行方法,并拿到返回结果
    DescribeUserScramCredentialsResult targetResult = adminClient.describeUserScramCredentials(userScramList);
    //执行方法,并拿到返回结果
    Map<String, UserScramCredentialsDescription> targetFuture = targetResult.all().get();
    //输出
    targetFuture.forEach((name,info)-> System.out.println("[ScramUserName:"+name+"]:[ScramUserInfo:"+info.toString()+"]"));
}

结语

这一篇终于把之前的坑填上了,自从笔者不再需要对Kafka平台进行维护以后,Kafka的版本也发生了好多巨大的更新,一个最明显的变化就是Kafka的去zookeeper操作。笔者之前操纵2.7.0的时候,Kafka官方还是警告2.8版本最好不要用于生产环境,而今天官方的态度已经有了一个模糊的转变,可见Kafka已经大体上完成了对Zookeeper的解耦工作。由于笔者工作的变动不再管理Kafka相关的平台内容,所以这部分内容就会更新的比较慢,毕竟懒癌晚期的笔者能补上这个坑已经算是祖坟冒烟了。不过好在笔者已经够把大多数关键的Kafka管理操作的API使用方法更新完毕,希望这些内容对网上寥寥无几的Kafka文档有所补充,也希望这些博客能够帮助到那些正在管理Kafka平台的小伙伴们,希望以后还有机会对更新的版本进行更深一步的探究吧。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐