简单明了的SpringBoot使用Kafka收发消息的例子
引入依赖:<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>只需要增加
·
引入依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
只需要增加这一个依赖
这里有大坑,版本不能随便用。kafka的client包必须和安装的kafka版本对应
下面是spring官方给的对应关系https://spring.io/projects/spring-kafka
我装的kafka是2.1.0版本,所以应该使用的kafka-client是1.0.x或1.1.x或2.0.0
但是spring-kafka已经依赖了kafka-client,我们需要确认依赖进的是哪个版本
项目的spring-boot-starter-parent版本:2.0.5.RELEASE
点spring-kafka进去看到
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.2</version>
kafka-client的版本是1.0.2,满足要求
继续开发代码:
目录:
就是简单的三个文件
先看配置文件:
server:
port: 8081
spring:
kafka:
# 生产者
producer:
bootstrap-servers: x.x.x.x.:9092
# 消费者
consumer:
enable-auto-commit: true # 指定消息被消费之后自动提交偏移量
group-id: mytest # 消费者组
auto-offset-reset: latest # 从最近的地方开始消费
bootstrap-servers: x.x.x.x:9092
生产者代码:
/**
* 生产者
* 使用@EnableScheduling注解开启定时任务
*/
@Component
@EnableScheduling
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 定时任务
*/
@Scheduled(cron = "0/5 * * * * ?")
public void send(){
String message = UUID.randomUUID().toString();
ListenableFuture future = kafkaTemplate.send("my_topic", message);
future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
}
}
使用一个定时任务,每隔5秒发送一条消息
使用的topic名是my_topic
消费者代码:
/**
* 消费者
* 使用@KafkaListener注解,可以指定:主题,分区,消费组
*/
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"my_topic"})
public void receive(String message){
System.out.println("消费消息:" + message);
}
}
代码也很简单,就是一个@KafkaListener的注解使用。这个注解可以配置监听topic,groupid等等,细节可以继续研究
这里就是监听我的my_topic
测试:
well done
更多推荐
已为社区贡献2条内容
所有评论(0)