查看已经创建的topic

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

创建topic

创建分区和副本数为1的topic

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic acltest --partitions 1 --replication-factor 1

创建kafka用户

创建writer用户

./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter -add-config 'SCRAM-SHA-256=[iterations=8192,password=pwd],SCRAM-SHA-512=[password=pwd]' --entity-type users --entity-name kafkawriter

创建reader用户

./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter -add-config 'SCRAM-SHA-256=[iterations=8192,password=pwd],SCRAM-SHA-512=[password=pwd]' --entity-type users --entity-name kafkareader

删除指定用户

./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin

查看指定用户

./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name kafkawriter

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name kafkareader

给topic授权

./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.54:2181 --add --allow-principal User:kafkawriter --operation Write --topic bigData

./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.54:2181 --add --allow-principal User:admin --operation Read --topic bigData

查看topic权限

./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.5.54:2181 --list --topic bigData

修改server.properties,开启sasl

listeners=SASL_PLAINTEXT://192.168.1.214:9092

advertised.listeners=SASL_PLAINTEXT://192.168.1.214:9092

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

allow.everyone.if.no.acl.found=false

super.users=User:admin

# 启用SCRAM机制,采用SCRAM-SHA-512算法

sasl.enabled.mechanisms=SCRAM-SHA-256

# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# broker间通讯使用PLAINTEXT,本例中不演示SSL配置

security.inter.broker.protocol=SASL_PLAINTEXT

修改bin/kafka-server-start.sh启动脚本

 -Djava.security.auth.login.config=/opt/apps/kafka_2.13-3.7.0/config/kafka_admin_jaas.conf 

kafka_admin_jaas.conf 文件内容

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="pwd";
};
 

重启kafka服务

新建生产者配置

config/producer.conf

security.protocol=SASL_PLAINTEXT

sasl.mechanism=SCRAM-SHA-256

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafkawriter" password="pwd";

给生产者授权

./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.1.214:2181 --add --allow-principal User:kafkawriter --operation Write --topic bigData

生产者发送消息

./bin/kafka-console-producer.sh --broker-list 192.168.1.214:9092 --topic bigData --producer.config ./config/producer.conf

新建消费者配置

security.protocol=SASL_PLAINTEXT

sasl.mechanism=SCRAM-SHA-256

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafkareader" password="pwd";

cat config/producer.conf

给消费者授权

./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.1.214:2181 --add --allow-principal User:kafkareader --operation Read --topic bigData

授权group分组

./bin/kafka-acls.sh --authorizer kafka.security.authorizer.AclAuthorizer --authorizer-properties zookeeper.connect=192.168.1.214:2181 --add --allow-principal User:kafkareader --operation Read --topic bigData --group testgroup

消费者消费消息

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.214:9092 --topic bigData --consumer.config ./config/consumer.conf  --from-beginning --group testgroup

以上就是配置sasl的全过程了,kafka使用版本为kafka_2.13-3.7.0,文章中的ip地址经过变动,测试人员需保证ip地址一致

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐