Springboot 接入 带有SASL/PLAIN 的kafka
yml 配置文件添加kafka:template:producer:#kafka ip 端口bootstrap-servers: xx.xx.65.8:8123key-deserializer: org.apache.kafka.common.serialization.StringDeserializerva...
·
yml 配置文件添加
kafka:
template:
producer:
#kafka ip 端口
bootstrap-servers: xx.xx.65.8:8123
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
consumer:
bootstrap-servers: xx.xx.65.8:8123
group-id: group-1
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
//初始化系统属性
static {
System.setProperty("java.security.auth.login.config", "D:/kafka_client_jaas.conf");
}
POM 文件配置
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
发送消息
/**
* 使用kafka模板方法.
* */
@Autowired
private KafkaTemplate<String, Object> template;
D:/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="mooc"
password="moocpswd";
};
参考地址
https://support.huaweicloud.com/devg-dms/Kafka-java-demo.html
或者在yml文件这种配置
spring:
#允许重写方法,kafka必要注解
kafka:
bootstrap-servers: 10.xx.67.10:9092
consumer:
#判断是队列模式还是发布订阅模式
group-id: force
#producer将试图批处理消息记录,以减少请求次数。
batch-size: 16384
retries: 0
buffer-memory: 33554432
enable-auto-commit: true
auto-commit-interval: 1000
#latest:自动复位offset为最新
auto-offset-reset: latest
concurrency: 3
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
#一个用于跟踪调查的ID ,最好同group.id相同
# client-id: force
#用于链接带密码的kafka 配置,如果kafka没有密码需要注释掉
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="mooc" password="moocpswd";
更多推荐
已为社区贡献1条内容
所有评论(0)