SpringBoot中Kafka的SSL链接
运维给了账号, 密码,还有jks的秘钥, 现在jks文件上传不到服务器上去, 只能使用Base64编码成配置文件, 在项目启动后再从base64的字符串变成文件.配置文件如下:spring:# kafka config startkafka:bootstrap-servers: XXXproducer:value-serializer: org.apache.kafka.common.serial
·
运维给了账号, 密码,还有jks的秘钥, 现在jks文件上传不到服务器上去, 只能使用Base64编码成配置文件, 在项目启动后再从base64的字符串变成文件.
配置文件如下:
spring:
# 开始配置
kafka:
bootstrap-servers: XXX
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
ssl:
trust-store-location: /u3+YQ=
trust-store-password: XXX
properties:
sasl:
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="XX" password="0aQ";
mechanism: SCRAM-SHA-512
security:
protocol: SASL_SSL
# 配置结束
package config;
import com.FileToBase64;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author ke.zhang
* @version 1.0
* @description: the kafka produce config.
* @date 2021/7/30 09:00
*/
@Slf4j
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.ssl.trust-store-location}")
private String trustStoreLocation;
@Value("${spring.kafka.ssl.trust-store-password}")
private String trustStorePassword;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String mechanism;
@Value("${spring.kafka.properties.security.protocol}")
private String protocol;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* the producer factory config
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
String truststorePath = "./" + UUID.randomUUID().toString().replace("-", "");
try {
FileToBase64.decoderBase64File(trustStoreLocation, truststorePath);
} catch (Exception e) {
log.error("generate the trustStoreLocation file have a error, is:{},{}", e.getCause(), e.getMessage());
}
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, protocol);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
props.put(SaslConfigs.SASL_MECHANISM, mechanism);
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
log.info("custom build the KafkaProducerFactory, config is:{}", props);
return new DefaultKafkaProducerFactory<String, String>(props);
}
}
使用kafka tools链接kafka:
1.
2.
3.
4.
就能链接上kafka 了. 个人觉得其实这个工具并不好用.
添加消费者的配置:
Spring:
kafka:
bootstrap-servers: XX
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
ssl:
trust-store-location: XX
trust-store-password: XX
properties:
sasl:
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="XX" password="XX";
mechanism: SCRAM-SHA-512
security:
protocol: SASL_SSL
consumer:
group-id: XXX
# 手动提交
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 60000
/**
* consumer config
*/
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean enableAutoCommit;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.properties.session.timeout.ms}")
private Integer timeOut;
/**
* consumer config
*
* @param configurer
* @return
*/
@Bean
@ConditionalOnBean(ConcurrentKafkaListenerContainerFactoryConfigurer.class)
public ConcurrentKafkaListenerContainerFactory<Object, Object> concurrentKafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
String truststorePath = "./" + UUID.randomUUID().toString().replace("-", "");
try {
FileToBase64.decoderBase64File(trustStoreLocation, truststorePath);
} catch (Exception e) {
log.error("generate the trustStoreLocation file have a error, is:{},{}", e.getCause(), e.getMessage());
}
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, protocol);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
props.put(SaslConfigs.SASL_MECHANISM, mechanism);
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, timeOut);
log.info("custom build the KafkaConsumerFactory, config is:{}", props);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(props);
return factory;
}
更多推荐
已为社区贡献1条内容
所有评论(0)