我要做什么?

实现Nacos动态配置kafka认证信息,使每个微服务读取同一个kafka配置,并生成文件注入到环境变量中。

为什么要这么做?

首先我们看下

Kafka-java接入demo,如图:

1.prod_client_jaas.conf文件

KafkaClient{
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka_1"
  password="密码";
  };

2.cons_client_jaas.conf

KafkaClient{
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka_1"
  password="密码";
  };

3.producer

package com.sensetime.kafka;
 
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
 
 
/**
 * Producer
 *
 */
public class App
{
    public static void main( String[] args )
    {
 
 
        String fsPath=System.getProperty("user.dir");
        System.out.println(fsPath);
        System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/prod_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
        System.out.println("===================配置文件地址"+fsPath+"/conf/prod_client_jaas.conf");
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092");           //此处为kafka接入点
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty ("security.protocol", "SASL_PLAINTEXT");
        props.setProperty ("sasl.mechanism", "PLAIN");
        Producer producer = null;
        try {
            producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                String msg = "Message " + i;
                producer.send(new ProducerRecord("testtime", msg));                  //此处为创建的topic
                System.out.println("Sent:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

4.consumer

package com.sensetime.kafka;
 
import java.util.Arrays;import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
 
 
public class Consumer {
    public static void main(String[] args) {
    
        String fsPath=System.getProperty("user.dir");
        System.setProperty("java.security.auth.login.config", ""+fsPath+"/conf/cons_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
        System.out.println("===================配置文件地址"+fsPath+"/conf/cons_client_jaas.conf");
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:9092,ip:9092,ip:9092");           //kafka接入点
        props.put("group.id", "group1");                                                                //创建的group
        props.put("group.name", "1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty ("security.protocol", "SASL_PLAINTEXT");
        props.setProperty ("sasl.mechanism", "PLAIN");
        KafkaConsumer kafkaConsumer = new Kafkansumer<>(props);
        kafkaConsumer.subscribe(Arrays.asList("testtime"));                                           //此处为订阅的topic
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("线程1"+":"+"Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId());
            }
 
        }
    }
 
}

这个配置认证方式的痛点在于1: 需要在springboot刚启动还未进行kafka建立连接之前,将认证信息注入到环境变量里边

2:需要每个微服务都要配置认证信息的文件。考虑到我们使用Nacos作为配置中心,我的想法是利用Nacos进行配置认证信息,并在springboot启动后kafka实例化前,读取认证信息,设置到环境变量里。

我是怎么做的?

首先看图所示,springboot启动时候会在refreshContext(context)之前执行初始化applyInitializers,当spring执行这个类PropertySourceBootstrapConfiguration的时候,会执行Nacos的相关获取配置解析配置的方法,所以,我只也搞一个同样的initalizer,并且改initalizer排序在Nacos执行之后,不就解决了吗,对的,就是这样的

 

第一种:手动设置initializer方式实现代码如下:

注意:如果我们自己定义启动执行前的类需如下防范
public class MyApiApplication {
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(MyApiApplication.class);
        application.addInitializers(new KafkaSaslConfiguration());
        application.run(args);
    }
}

public class KafkaSaslConfiguration implements
        ApplicationContextInitializer<ConfigurableApplicationContext>, Ordered {

    private static Logger log = LoggerFactory.getLogger(KafkaSaslConfiguration.class);

    private static final String STORE_CONS_CLIENT_JAAS_PATH = "./conf/cons_client_jaas.conf";

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 11;
    }

    @Override
    public void initialize(ConfigurableApplicationContext applicationContext) {
        ConfigurableEnvironment environment = applicationContext.getEnvironment();
        String username = environment.getProperty("kafka-sasl-username");
        String password = environment.getProperty("kafka-sasl-password");
        if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) {
            log.error("kafka sasl need kafka-sasl-username and kafka-sasl-password,please set value for Nacos.");
            System.exit(-1);
        }
        String kafkaClient = "KafkaClient{\n" +
                "  org.apache.kafka.common.security.plain.PlainLoginModule required\n" +
                "  username=\"" + username + "\"\n" +
                "  password=\"" + password + "\";\n" +
                "  };";

        String clientPath = System.getProperty("user.dir") + "/conf/cons_client_jaas.conf";
        System.setProperty("java.security.auth.login.config", clientPath);
        ResourceUtils.readStringToDisk(kafkaClient, STORE_CONS_CLIENT_JAAS_PATH);
        log.info("java.security.auth.login.config: {}", clientPath);
    }

}

第二种方法,是利用springboot自动配置原理,进行配置,自动配置原理https://blog.csdn.net/luman1991/article/details/106668582

# Initializers
org.springframework.context.ApplicationContextInitializer=\
com.项目包.KafkaSaslConfiguration

认真写写博客,写写生活点滴

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐