springboot 和 kafka搭建要注意 springboot 的版本和kafka的版本要一致,不然容易导致出错。

我这的版本是springboot 2.2.2  kafka的版本是 Kafka version: 2.3.1

首先安装 zookeeper 和kafka 这里不详细介绍,下面主要写代码:

entity 层的代码非常简单就一个User 类,如下:

package com.kafka.demoka.entity;

public class User {
    private  int id;
    private  String name;
    private  int age;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

}

 

生产者的代码如下:

 

package com.kafka.demoka.controller;


import com.alibaba.fastjson.JSON;
import com.kafka.demoka.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class kafkaController {

    @Autowired
    private KafkaTemplate kafkaTemplate;


    @RequestMapping("/send")
    public void send(){
        User user= new User();
        user.setName("good");
        user.setAge(14);
        kafkaTemplate.send("user", JSON.toJSONString(user));
    }
}

消费者的代码如下

 

package com.kafka.demoka.common;

import com.alibaba.fastjson.JSONObject;
import com.kafka.demoka.entity.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
public class kafkaConsumer {
    @KafkaListener(topics = "user")
    public void consumer(ConsumerRecord consumerRecord) {
        Optional<Object> kafkaMassage = Optional.ofNullable(consumerRecord.value());
        if (kafkaMassage.isPresent()) {
            Object o = kafkaMassage.get();
            User person = JSONObject.parseObject(o.toString(), User.class);
            // 相关的业务代码操作
            System.out.println(person.getName());
        }
    }
}

 

配置文件如下:

server:
  port: 6081
spring:
  kafka:
    listener:
      missing-topics-fatal: false  # listener的属性missingTopicsFatal设置为true 会报错
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory:  33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id:  user-log-group
      auto-commit-interval: 100
      enable-auto-commit: true
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    bootstrap-servers: localhost:9092

pom文件如下面所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.2.2.RELEASE</version>
      <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>com.kafka</groupId>
   <artifactId>demoka</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>demoka</name>
   <description>Demo project for Spring Boot</description>

   <properties>
      <java.version>11</java.version>
   </properties>

   <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
      </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.4.3.RELEASE</version>
        </dependency>

        <!--<dependency>-->
            <!--<groupId>org.apache.kafka</groupId>-->
            <!--<artifactId>kafka-clients</artifactId>-->
            <!--<version>2.4.1</version>-->
        <!--</dependency>-->

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>

      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
         <scope>test</scope>
         <exclusions>
            <exclusion>
               <groupId>org.junit.vintage</groupId>
               <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
   </dependencies>

   <build>
      <plugins>
         <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
      </plugins>
   </build>

</project>

 

特别注意:missing-topics-fatal: false # listener的属性missingTopicsFatal设置为true 会报错

还有就是 springboot的版本和 kafka的版本也要对应,否则也会报错。

 

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐