kafka 0.10.2.2配置kerberos
公司最新需要新搭建一套带kerberos的kafka集群,kerberos已经有别的团队搭建完成,此文章只记录kafka配置kerberos的部分。在kerberos配置kafka用户# 创建kafka/hadoop03用户kadmin.local -q "addprinc -randkey kafka/hadoop03"# 导出keytab文件kadmin.local -q
·
公司最新需要新搭建一套带kerberos的kafka集群,kerberos已经有别的团队搭建完成,此文章只记录kafka配置kerberos的部分。
- 在kerberos配置kafka用户
# 创建kafka/hadoop03用户 kadmin.local -q "addprinc -randkey kafka/hadoop03" # 导出keytab文件 kadmin.local -q "xst -k kafka.keytab -norandkey kafka/hadoop03"
此处由于是测试环境只有一个节点,故只在kerberos中只申请一个kafka账户,将导出的keytab文件放到kafka的config目录下。
- 配置config/server.properties
listeners=SASL_PLAINTEXT://hadoop03:9092 # 这部分配置来自kafka官网 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI # service name得和申请的kerberos用户一致,之前申请的kafka用户是kafka/hadoop03,此处即kafka sasl.kerberos.service.name=kafka
- 编辑config/kafka_server_jaas.conf
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/kafka/config/kafka.keytab" principal="kafka/hadoop03@CATCHER92.CN"; }; // Zookeeper client authentication Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/kafka/config/kafka.keytab" principal="kafka/hadoop03@CATCHER92.CN"; }; // Kafka client authentication KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/kafka/config/kafka.keytab" principal="kafka/hadoop03@CATCHER92.CN"; };
kafka_server_jaas.conf共有3部分组成,第1部分和第3部分看标题就能明白配置的用途。第二部分Client是配置kafka所连接的zookeeper时候的认证信息,如果zookeeper没有配置kerberos可以忽略该部分内容。
- 配置bin/kafka-run-class.sh
# Kerberos options KAFKA_KERBEROS_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" # Launch mode if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup $JAVA $KAFKA_KERBEROS_OPTS $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec $JAVA $KAFKA_KERBEROS_OPTS $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi
增加KAFKA_KERBEROS_OPTS配置项,其中配置kerberos配置文件和kafka认证文件。并将该参数添加到启动时候的$JAVA后边。
- 启动kafka测试
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
至此kafka已经配置完成kerberos。
- 为了方便终端中测试kafka需要配置下config/consumer.properties和config/producers.properties
security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
在两个配置文件最后添加上述内容即可。
- 测试创建topic
/opt/kafka/bin/kafka-topics.sh --zookeeper hadoop03:2181/kafka --create --topic c1p1_test --partitions 1 --replication-factor 1
- 终端测试
# 启动生产端 /opt/kafka/bin/kafka-console-producer.sh --topic c1p1_test --broker-list hadoop03:9092 --producer.config /opt/kafka/config/producer.properties # 启动消费端 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop03:9092 --topic c1p1_test --consumer.config /opt/kafka/config/consumer.properties
- java客户端连接
consumer端
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerDemo { public static void main(String[] args) { System.setProperty("java.security.krb5.conf", "/tmp/krb5.conf"); System.setProperty("java.security.auth.login.config", "/tmp/kafka_client_jaas.conf"); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop03:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("c1p1_test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
producer端
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerDemo { public static void main(String[] args) { System.setProperty("java.security.krb5.conf", "/tmp/krb5.conf"); System.setProperty("java.security.auth.login.config", "/tmp/kafka_client_jaas.conf"); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop03:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) { producer.send(new ProducerRecord<>("c1p1_test", "key" + i, "value" + i)); } producer.close(); } }
-
kafka_client_jaas.conf
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/tmp/catcher92.keytab" serviceName=kafka principal="catcher92/user@CATCHER92.CN"; };
-
参考连接
kafka官网配置部分
更多推荐
已为社区贡献1条内容
所有评论(0)