公司最新需要新搭建一套带kerberos的kafka集群,kerberos已经有别的团队搭建完成,此文章只记录kafka配置kerberos的部分。

  1. 在kerberos配置kafka用户
    # 创建kafka/hadoop03用户
    kadmin.local -q "addprinc -randkey kafka/hadoop03"
    
    # 导出keytab文件
    kadmin.local -q "xst -k kafka.keytab -norandkey kafka/hadoop03"

    此处由于是测试环境只有一个节点,故只在kerberos中只申请一个kafka账户,将导出的keytab文件放到kafka的config目录下。

  2. 配置config/server.properties
    listeners=SASL_PLAINTEXT://hadoop03:9092
    # 这部分配置来自kafka官网
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=GSSAPI
    sasl.enabled.mechanisms=GSSAPI
    # service name得和申请的kerberos用户一致,之前申请的kafka用户是kafka/hadoop03,此处即kafka
    sasl.kerberos.service.name=kafka

     

  3. 编辑config/kafka_server_jaas.conf
    KafkaServer {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/config/kafka.keytab"
        principal="kafka/hadoop03@CATCHER92.CN";
    };
    
    // Zookeeper client authentication
    Client {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/config/kafka.keytab"
        principal="kafka/hadoop03@CATCHER92.CN";
    };
    
    // Kafka client authentication
    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/config/kafka.keytab"
        principal="kafka/hadoop03@CATCHER92.CN";
    };

    kafka_server_jaas.conf共有3部分组成,第1部分和第3部分看标题就能明白配置的用途。第二部分Client是配置kafka所连接的zookeeper时候的认证信息,如果zookeeper没有配置kerberos可以忽略该部分内容。

  4. 配置bin/kafka-run-class.sh
    # Kerberos options
    KAFKA_KERBEROS_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
    
    # Launch mode
    if [ "x$DAEMON_MODE" = "xtrue" ]; then
      nohup $JAVA $KAFKA_KERBEROS_OPTS $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
    else
      exec $JAVA $KAFKA_KERBEROS_OPTS $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
    fi

    增加KAFKA_KERBEROS_OPTS配置项,其中配置kerberos配置文件和kafka认证文件。并将该参数添加到启动时候的$JAVA后边。

  5. 启动kafka测试
    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

    至此kafka已经配置完成kerberos。

  6. 为了方便终端中测试kafka需要配置下config/consumer.properties和config/producers.properties
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka

    在两个配置文件最后添加上述内容即可。

  7. 测试创建topic
    /opt/kafka/bin/kafka-topics.sh --zookeeper hadoop03:2181/kafka --create --topic c1p1_test --partitions 1 --replication-factor 1

     

  8. 终端测试
    # 启动生产端
    /opt/kafka/bin/kafka-console-producer.sh --topic c1p1_test --broker-list hadoop03:9092 --producer.config /opt/kafka/config/producer.properties
    
    # 启动消费端
    /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop03:9092 --topic c1p1_test --consumer.config /opt/kafka/config/consumer.properties

     

  9. java客户端连接

    consumer端

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaConsumerDemo {
    
        public static void main(String[] args) {
            System.setProperty("java.security.krb5.conf", "/tmp/krb5.conf");
            System.setProperty("java.security.auth.login.config", "/tmp/kafka_client_jaas.conf");
    
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop03:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("c1p1_test"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    
    }

    producer端

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class KafkaProducerDemo {
    
        public static void main(String[] args) {
            System.setProperty("java.security.krb5.conf", "/tmp/krb5.conf");
            System.setProperty("java.security.auth.login.config", "/tmp/kafka_client_jaas.conf");
    
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop03:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            Producer<String, String> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<>("c1p1_test", "key" + i, "value" + i));
            }
            producer.close();
        }
    
    }
  10. kafka_client_jaas.conf

    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/tmp/catcher92.keytab"
        serviceName=kafka
        principal="catcher92/user@CATCHER92.CN";
    };

     

  11. 参考连接

    kafka官网配置部分

     

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐