一。环境准备

windows下安装kafka可以参考这一篇博客:https://blog.csdn.net/w546097639/article/details/88578635

二。项目搭建

1.在pom.xml文件中引入以下依赖:

    <!--kafka依赖 -->
     <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka-test</artifactId>
         <scope>test</scope>
     </dependency>

项目完整pom.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka_demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka_demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>

        <!--kafka依赖 -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--lombok依赖 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>

        <!--json转换工具包依赖 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.配置文件application.yml文件内容如下:

server:
  port: 8001 #端口号
  servlet:
    context-path: /${spring.application.name} #访问前缀

spring:
  application:
    name: kafka-demo #服务名称
  kafka:
    bootstrap-servers: localhost:9092 #kafka地址
    consumer:
      group-id: order-consumer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #kafkaTemplate.send()方法参数类型
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      security:
        protocol: PLAINTEXT
    producer:
      batch-size: 16384
      buffer-memory: 33554432
      retries: 0
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        linger.ms: 1
      security:
        protocol: PLAINTEXT


# kafka自定义消息发送配置
kafka:
  topic:
    kafka-demo-topic: order-event
    kafka-demo-key: order-key
    autoCreate: true


3.kafka配置类内容如下:

package com.example.kafka_demo.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO kafka配置类
 * @date 2021/9/6 15:33
 */
@Configuration
@EnableKafka //开启Kafka监听器标注的端点
public class KafkaConfig {

    @Value("${kafka.topic.kafka-demo-topic}")
    private String orderTopic;

    @Bean
    //条件匹配,如果配置kafka.topic.autoCreate=true的话,该配置类生效,反之则无效
    @ConditionalOnProperty(name = "kafka.topic.autoCreate", havingValue = "true")
    public NewTopic orderTopic(){
        return TopicBuilder.name(orderTopic).build();
    }

}

4.创建kafka接受消息实体类:

package com.example.kafka_demo.dto;

import lombok.*;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO 消息实体
 * @date 2021/9/6 14:56
 */

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ParamsDetail {

    private String orderNumber;

    private String orderSubNumber;

    private String customerNumber;

    private String subAccountNumber;

    private String tradeType;
}

5.创建KafkaService接口,使用KafkaTemplate来发送消息至kafka中:

package com.example.kafka_demo.service;

import com.example.kafka_demo.dto.ParamsDetail;

public interface KafkaService {

    //发送消息给kafka
    public void sendMessage(ParamsDetail paramsDetail);
}

package com.example.kafka_demo.service.impl;

import com.example.kafka_demo.dto.ParamsDetail;
import com.example.kafka_demo.service.KafkaService;
import com.example.kafka_demo.util.JsonUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO KafkaService实现类
 * @date 2021/9/6 15:01
 */
@Service
@Slf4j
public class KafkaServiceImpl implements KafkaService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.topic.kafka-demo-topic}")
    private String topic;

    @Value("${kafka.topic.kafka-demo-key}")
    private String key;

    @Override
    public void sendMessage(ParamsDetail detail) {
        try {
            kafkaTemplate.send(topic, key, JsonUtil.toJson(detail));
        } catch (JsonProcessingException e){
            log.error("对象转换为Json出错!");
        }
    }
}

6.Json与对象相互转换工具类代码如下:

package com.example.kafka_demo.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO Object与Json转换工具类
 * @date 2021/9/6 15:08
 */
public class JsonUtil {

    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    //将json转换成对象
    public static <T> T fromJson(String json, Class<T> clz) throws JsonProcessingException {
        return OBJECT_MAPPER.readValue(json, clz);
    }

    //将对象转换成json格式
    public static String toJson(Object object) throws JsonProcessingException {
        return OBJECT_MAPPER.writeValueAsString(object);
    }
}

7.创建消息的消费者,使用@KafkaListener实现:

package com.example.kafka_demo.listener;

import com.example.kafka_demo.dto.ParamsDetail;
import com.example.kafka_demo.util.JsonUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO kafka消费者
 * @date 2021/9/6 15:23
 */
@Component
public class OrderListener {

    //用于监听kafka,符合条件的消息能够即时消费
    @KafkaListener(topics = {"${kafka.topic.kafka-demo-topic}"}, autoStartup = "${kafka.topic.autoCreate:true}")
    public void processing (String message) throws JsonProcessingException {
        ParamsDetail detail = JsonUtil.fromJson(message, ParamsDetail.class);
        System.err.println(detail);
    }

}

8.最后创建一个Api用于向kafka发送消息:

package com.example.kafka_demo.api;

import com.example.kafka_demo.dto.ParamsDetail;
import com.example.kafka_demo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO kafka测试发送Api
 * @date 2021/9/6 15:46
 */
@RestController
@RequestMapping("/order")
public class OrderApi {

    @Autowired
    private KafkaService kafkaService;

    @GetMapping("/sendMessage")
    public String sendMessage(@RequestBody ParamsDetail paramsDetail){
        kafkaService.sendMessage(paramsDetail);
        return "消息发送成功!";
    }
}

全部创建完成后项目结构如下:
在这里插入图片描述

三。运行结果

1.启动项目,使用postman测试发送接口:
在这里插入图片描述
2.消息成功发送,打开程序查看消息是否成功被消费。控制台输出刚刚发送的消息,说明order-event这个topic下的消息被成功消费:
在这里插入图片描述
3.再次连续发几个不同的消息,发现消息即时存入即时消费:
在这里插入图片描述

四。项目代码下载

本博客项目示例代码已上传至Gitee,有需要的小伙伴可自行clone:
https://gitee.com/hair_gel_king/kafka_demo

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐