使用 SASL/PLAIN 认证

SASL/PLAIN 认证是最简单的认证方式,网上也有很多大神写过相关的文章,不过关于docker-compose的几乎没有,所以就以下文章了,kafka使用的官方镜像,.

直接上dockercompose.yml配置

version: ‘2’

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
hostname: zookeeper
restart: always
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_MAXCLIENTCNXNS: 0
ZOOKEEPER_AUTHPROVIDER.1: org.apache.zookeeper.server.auth.SASLAuthenticationProvider
ZOOKEEPER_REQUIRECLIENTAUTHSCHEME: sasl
ZOOKEEPER_JAASLOGINRENEW: 3600000
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/zk_server_jaas.conf
volumes:
- /home/zx/project/secrets:/etc/kafka/secrets
kafka:
image: confluentinc/cp-kafka:5.1.2
hostname: broker
container_name: kafka
depends_on:
- zookeeper
ports:
- “9092:9092”
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: ‘zookeeper:2181’
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LISTENERS: SASL_PLAINTEXT://172.18.0.3:9092
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://172.18.0.3:9092
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
KAFKA_SUPER_USERS: User:admin
volumes:
- /home/zx/project/secrets:/etc/kafka/secrets
depends_on:
- zookeeper

  1. volumes ,把kafka_server_jaas.conf,zk_server_jaas.conf文件所在的位置挂载出来,此处我本地存放的目录为/home/zx/project/secrets;
  2. KAFKA_ADVERTISED_LISTENERS,KAFKA_LISTENERS中的ip请修改为docker容器中的ip,可输入以下命令查看, sudo docker exec -it kafka容器id ip addr show

把kafka_server_jaas.conf,zk_server_jaas.conf 放入指定的路径

1.kafka_server_jaas.conf内容

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“admin”
password=“admin-secret”
user_admin=“admin-secret”
user_alice=“alice-secret”;
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“admin”
password=“admin-secret”;
};

2.zk_server_jaas.conf内容

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“admin”
password=“admin-secret”
user_admin=“admin-secret”;
};
##KafkaProducer代码

Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SASL_PLAINTEXT://172.18.0.3:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 添加认证配置
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
        // 创建producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        List<Future<RecordMetadata>> sends = new ArrayList<>();
        for (int i = 0; i < 10; i ++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord("testtopic1", "aaa" + i);
            Future<RecordMetadata> send = producer.send(producerRecord, (m, e) -> {
                System.out.println(m.offset());
                if (e != null) {
                    System.out.println("send error: " + e);
                }
            });
            sends.add(send);
        }
        boolean exit = false;
        while (!exit) {
            for (Future<RecordMetadata> send : sends) {
                if (!send.isDone()) {
                    exit = false;
                    break;
                }
                exit = true;
            }
        }

##Consumer代码

Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SASL_PLAINTEXT://172.18.0.3:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1");
        // 添加认证配置
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
        // 创建consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("testtopic1"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record: records) {
                System.out.printf("offset = %d, key= %s , value = %s\n", record.offset(), record.key(), record.value());
            }
        }

链接`: https://blog.csdn.net/russle/article/details/81041135

链接`: http://orchome.com/270

链接`: https://github.com/confluentinc/cp-docker-images/tree/5.1.2-post/examples/kafka-cluster-sasl

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐