一、相关配置文件

  1. kafka_client_jaas.conf配置项
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\test.service.keytab"
    useTicketCache=false
    principal="test/testuser@HENGHE.COM"
    serviceName=kafka;
};
  1. krb5.conf配置项
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/

[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log

[libdefaults]
 dns_lookup_realm = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true
 rdns = false
 default_realm = HENGHE.COM
# default_ccache_name = KEYRING:persistent:%{uid}

[realms]
 HENGHE.COM = {
  kdc = henghe-047
  admin_server = henghe-047
  #kdc = ${kerberosSlaves}
 }

[domain_realm]
 .henghe.com = HENGHE.COM
 henghe.com = HENGHE.COM

  1. 密钥配置项相关导出的keytab文件

二、生成者代码示例

package com.yss.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Scanner;

/**
 * @description:
 * @author: 
 * @create: 2020-07-06 11:44
 **/
public class Producer {
    public static void main(String[] args) {

        //在windows中设置JAAS,也可以通过-D方式传入
//        System.setProperty("java.security.auth.login.config", "D:\\resources\\kafka_client_jaas.conf");
//        System.setProperty("java.security.krb5.conf", "D:\\resources\\krb5.conf");
        //在Linux中设置JAAS,也可以通过-D方式传入
        System.setProperty("java.security.auth.login.config", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\krb5.conf");


        Properties props = new Properties();
        props.put("bootstrap.servers", "henghe-020:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");




        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        String topic = "ws_kerberos";

        Scanner scan  = new Scanner(System.in);

        while (true){
            System.out.print(">>");
            String message = scan.nextLine();
            producer.send(new ProducerRecord<String, String>(topic, message));
            System.out.println(message);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

三、消费者代码示例

package com.yss.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @description:
 * @author: 
 * @create: 2020-07-06 12:24
 **/
public class Consumer {
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();

        System.setProperty("java.security.auth.login.config", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\kafka-connect\\5.5.0\\kafka-schema-test\\src\\main\\resources\\krb5.conf");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");


        props.put("bootstrap.servers", "henghe-020:9092");
        props.put("group.id", "test132");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("ws_kerberos"));
        long timeMillis = System.currentTimeMillis();
        long count = 0;
        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, byte[]> record : records) {
                    System.out.println("\n======================================我是分割符==============================================\n");
                    byte[] value = record.value();
                    String s     = new String(value,"utf8");
                    System.out.println(s);
                }
            }
        } finally {
            consumer.close();
            System.out.println("总记录数:" + count + "   耗时:" + (System.currentTimeMillis() - timeMillis) / 1000);
        }
    }
}

简单的demo示例,具体根据业务实现。

四、动态参数示例

public class KerberosTest {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
//        System.setProperty("java.security.auth.login.config", "D:\\ysstech\\Medusa\\runtime\\src\\main\\resources\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\ysstech\\Medusa\\runtime\\src\\main\\resources\\krb5.conf");
        props.put("bootstrap.servers", "henghe-37:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "qwe");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,
                "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"D:/ysstech/Medusa/runtime/src/main/resources/henghe.user.keytab\" principal=\"henghe@HENGHE.COM\";");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(Collections.singletonList("test-topic"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐