yml 配置文件添加

 

  kafka:
    template:
   
    producer:
#kafka ip 端口
      bootstrap-servers: xx.xx.65.8:8123
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties: 
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT      
    consumer: 
      bootstrap-servers: xx.xx.65.8:8123
      group-id: group-1
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties: 
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT  
	 //初始化系统属性
    static {
    	System.setProperty("java.security.auth.login.config", "D:/kafka_client_jaas.conf");
    }

POM 文件配置

	


       <parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.3.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>


        <dependency>
		    <groupId>org.springframework.kafka</groupId>
		    <artifactId>spring-kafka</artifactId>
		    <version>2.1.2.RELEASE</version>
		</dependency>

	<dependencyManagement>
	    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Finchley.SR1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
	</dependencyManagement>

发送消息

/**
  * 使用kafka模板方法.
 * */
    @Autowired
    private KafkaTemplate<String, Object> template;

D:/kafka_client_jaas.conf

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="mooc"
        password="moocpswd";
};
  
参考地址

https://support.huaweicloud.com/devg-dms/Kafka-java-demo.html

或者在yml文件这种配置

spring:
  #允许重写方法,kafka必要注解
  kafka:
    bootstrap-servers: 10.xx.67.10:9092
    consumer:
      #判断是队列模式还是发布订阅模式
      group-id: force
      #producer将试图批处理消息记录,以减少请求次数。
      batch-size: 16384
      retries: 0
      buffer-memory: 33554432
      enable-auto-commit: true
      auto-commit-interval: 1000
      #latest:自动复位offset为最新
      auto-offset-reset: latest
      concurrency: 3
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      #一个用于跟踪调查的ID ,最好同group.id相同
#      client-id: force
      #用于链接带密码的kafka  配置,如果kafka没有密码需要注释掉
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="mooc" password="moocpswd";

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐