引入依赖:

 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>

只需要增加这一个依赖

这里有大坑,版本不能随便用。kafka的client包必须和安装的kafka版本对应

下面是spring官方给的对应关系https://spring.io/projects/spring-kafka

简单明了的SpringBoot使用Kafka收发消息的例子

 

我装的kafka是2.1.0版本,所以应该使用的kafka-client是1.0.x或1.1.x或2.0.0

但是spring-kafka已经依赖了kafka-client,我们需要确认依赖进的是哪个版本

项目的spring-boot-starter-parent版本:2.0.5.RELEASE

点spring-kafka进去看到

 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>1.0.2</version>

kafka-client的版本是1.0.2,满足要求

继续开发代码:

目录:

就是简单的三个文件

简单明了的SpringBoot使用Kafka收发消息的例子

 

先看配置文件

server:
 port: 8081
spring:
 kafka:
 # 生产者
 producer:
 bootstrap-servers: x.x.x.x.:9092
 # 消费者
 consumer:
 enable-auto-commit: true # 指定消息被消费之后自动提交偏移量
 group-id: mytest # 消费者组
 auto-offset-reset: latest # 从最近的地方开始消费
 bootstrap-servers: x.x.x.x:9092

生产者代码

/**
 * 生产者
 * 使用@EnableScheduling注解开启定时任务
 */
@Component
@EnableScheduling
public class KafkaProducer {
 @Autowired
 private KafkaTemplate kafkaTemplate;
 /**
 * 定时任务
 */
 @Scheduled(cron = "0/5 * * * * ?")
 public void send(){
 String message = UUID.randomUUID().toString();
 ListenableFuture future = kafkaTemplate.send("my_topic", message);
 future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
 }
}

使用一个定时任务,每隔5秒发送一条消息

使用的topic名是my_topic

消费者代码

/**
 * 消费者
 * 使用@KafkaListener注解,可以指定:主题,分区,消费组
 */
@Component
public class KafkaConsumer {
 @KafkaListener(topics = {"my_topic"})
 public void receive(String message){
 System.out.println("消费消息:" + message);
 }
}

代码也很简单,就是一个@KafkaListener的注解使用。这个注解可以配置监听topic,groupid等等,细节可以继续研究

这里就是监听我的my_topic

测试:

简单明了的SpringBoot使用Kafka收发消息的例子

 

well done

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐