Springboot集成kafka 简单易懂
kafka集群环境搭建地址见之前博客:https://blog.csdn.net/weixin_43914685/article/details/113802894开始springboot项目整合kafka1.项目依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>
·
kafka集群环境搭建地址见之前博客:
https://blog.csdn.net/weixin_43914685/article/details/113802894
开始springboot项目整合kafka
1.项目依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- yml解析器 -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
2.yml文件配置
bootstrap-servers配置kafka的监听地址,集群的话配置多个地址以逗号隔开
spring:
kafka:
bootstrap-servers: 192.168.1.108:9092,192.168.1.109:9092,192.168.1.110:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default_consumer_group #群组ID
#enable-auto-commit: true
#auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
port: 8500
3.创建测试接口创建生产者消息
kafka0是kafka的主题,要和消费者demo文件中的topics一致
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author lizhe
* @date by 2021/02/13
*/
@RestController
public class ProducerController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@RequestMapping("message/send")
public String send(String msg){
kafkaTemplate.send("kafka0", msg); //使用kafka模板发送信息
return "success";
}
}
4.创建消费者接受消息demo
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author lizhe
* 监听服务器上的kafka是否有相关的消息发过来
* @date by 2021/02/13
*/
@Component
public class ConsumerDemo {
/**
* 定义此消费者接收topics = "demo"的消息,与controller中的topic对应上即可
* @param record 变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息
*/
@KafkaListener(topics = "kafka0")
public void listen (ConsumerRecord<?, ?> record){
System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());
}
}
5.调用api接口发送消息
params加上msg参数
6.查看控制台消息
‘hellp’ 便是收到的消息
topic is kafka0, offset is 3, value is hellp
7.进入linux的kafka接受消息查看
[root@master ~]# kafka-console-consumer.sh --bootstrap-server 192.168.1.108:9092 --topic kafka0 --from-beginning
再次调用接口,会接收到消息
hellp
**
完结
整合完成
**
更多推荐
已为社区贡献3条内容
所有评论(0)