kafka开启Kerberos安全认证Java编程生成者与消费组示例
一、相关配置文件kafka_client_jaas.conf配置项KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=truekeyTab="D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\
·
一、相关配置文件
- kafka_client_jaas.conf配置项
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\test.service.keytab"
useTicketCache=false
principal="test/testuser@HENGHE.COM"
serviceName=kafka;
};
- krb5.conf配置项
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
dns_lookup_realm = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
rdns = false
default_realm = HENGHE.COM
# default_ccache_name = KEYRING:persistent:%{uid}
[realms]
HENGHE.COM = {
kdc = henghe-047
admin_server = henghe-047
#kdc = ${kerberosSlaves}
}
[domain_realm]
.henghe.com = HENGHE.COM
henghe.com = HENGHE.COM
- 密钥配置项相关导出的keytab文件
二、生成者代码示例
package com.yss.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Scanner;
/**
* @description:
* @author:
* @create: 2020-07-06 11:44
**/
public class Producer {
public static void main(String[] args) {
//在windows中设置JAAS,也可以通过-D方式传入
// System.setProperty("java.security.auth.login.config", "D:\\resources\\kafka_client_jaas.conf");
// System.setProperty("java.security.krb5.conf", "D:\\resources\\krb5.conf");
//在Linux中设置JAAS,也可以通过-D方式传入
System.setProperty("java.security.auth.login.config", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\krb5.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "henghe-020:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
String topic = "ws_kerberos";
Scanner scan = new Scanner(System.in);
while (true){
System.out.print(">>");
String message = scan.nextLine();
producer.send(new ProducerRecord<String, String>(topic, message));
System.out.println(message);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
三、消费者代码示例
package com.yss.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @description:
* @author:
* @create: 2020-07-06 12:24
**/
public class Consumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
System.setProperty("java.security.auth.login.config", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\krb5.conf");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
props.put("bootstrap.servers", "henghe-020:9092");
props.put("group.id", "test132");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ws_kerberos"));
long timeMillis = System.currentTimeMillis();
long count = 0;
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("\n======================================我是分割符==============================================\n");
byte[] value = record.value();
String s = new String(value,"utf8");
System.out.println(s);
}
}
} finally {
consumer.close();
System.out.println("总记录数:" + count + " 耗时:" + (System.currentTimeMillis() - timeMillis) / 1000);
}
}
}
简单的demo示例,具体根据业务实现。
四、动态参数示例
public class KerberosTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
// System.setProperty("java.security.auth.login.config", "D:\\ysstech\\Medusa\\runtime\\src\\main\\resources\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\ysstech\\Medusa\\runtime\\src\\main\\resources\\krb5.conf");
props.put("bootstrap.servers", "henghe-37:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "qwe");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"D:/ysstech/Medusa/runtime/src/main/resources/henghe.user.keytab\" principal=\"henghe@HENGHE.COM\";");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)