运维给了账号, 密码,还有jks的秘钥, 现在jks文件上传不到服务器上去, 只能使用Base64编码成配置文件, 在项目启动后再从base64的字符串变成文件.

配置文件如下:

spring:
  # 开始配置
  kafka:
    bootstrap-servers: XXX
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    ssl:
      trust-store-location: /u3+YQ=
      trust-store-password: XXX
    properties:
      sasl:
        jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="XX" password="0aQ";
        mechanism: SCRAM-SHA-512
      security:
        protocol: SASL_SSL
  # 配置结束

package config;

import com.FileToBase64;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author ke.zhang
 * @version 1.0
 * @description: the kafka produce config.
 * @date 2021/7/30 09:00
 */
@Slf4j
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.ssl.trust-store-location}")
    private String trustStoreLocation;
    @Value("${spring.kafka.ssl.trust-store-password}")
    private String trustStorePassword;
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String jaasConfig;
    @Value("${spring.kafka.properties.sasl.mechanism}")
    private String mechanism;
    @Value("${spring.kafka.properties.security.protocol}")
    private String protocol;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * the producer factory config
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        String truststorePath = "./" + UUID.randomUUID().toString().replace("-", "");
        try {
            FileToBase64.decoderBase64File(trustStoreLocation, truststorePath);
        } catch (Exception e) {
            log.error("generate the trustStoreLocation file have a error, is:{},{}", e.getCause(), e.getMessage());
        }
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, protocol);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
        props.put(SaslConfigs.SASL_MECHANISM, mechanism);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);

        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        log.info("custom build the KafkaProducerFactory, config is:{}", props);
        return new DefaultKafkaProducerFactory<String, String>(props);
    }

}

使用kafka tools链接kafka:

1.

2. 

3.

 4.

就能链接上kafka 了. 个人觉得其实这个工具并不好用. 

添加消费者的配置:

Spring:
      kafka:
    bootstrap-servers: XX
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    ssl:
      trust-store-location: XX
      trust-store-password: XX
    properties:
      sasl:
        jaas:
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="XX" password="XX";
        mechanism: SCRAM-SHA-512
      security:
        protocol: SASL_SSL
    consumer:
      group-id: XXX
      # 手动提交
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 60000

/**
     * consumer config
     */
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean enableAutoCommit;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.properties.session.timeout.ms}")
    private Integer timeOut;

    

    /**
     * consumer config
     *
     * @param configurer
     * @return
     */
    @Bean
    @ConditionalOnBean(ConcurrentKafkaListenerContainerFactoryConfigurer.class)
    public ConcurrentKafkaListenerContainerFactory<Object, Object> concurrentKafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory());
        return factory;
    }


    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        String truststorePath = "./" + UUID.randomUUID().toString().replace("-", "");
        try {
            FileToBase64.decoderBase64File(trustStoreLocation, truststorePath);
        } catch (Exception e) {
            log.error("generate the trustStoreLocation file have a error, is:{},{}", e.getCause(), e.getMessage());
        }
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, protocol);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
        props.put(SaslConfigs.SASL_MECHANISM, mechanism);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, timeOut);

        log.info("custom build the KafkaConsumerFactory, config is:{}", props);
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(props);
        return factory;
    }

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐