《springcloud学习》二十四 springcloud stream集成 rabbitmq
1.简介springcloud stream消息驱动,我们可以理解为一个中间件,MQ,数据交互,以前是生产者,发送数据到MQ,MQ再到消费者。而现在中间多了一个桥梁,所有的交互,都得通过springcloud stream进行交互,他可以简化我们的一个操作,使我们开发人员,更关注于业务逻辑,简化代码,暂时,他只支持rabbitmq和kafka,可以实现多个消息产品的互相切换,只需要配置信息,而..
1.简介
springcloud stream消息驱动,我们可以理解为一个中间件,MQ,数据交互,以前是生产者,发送数据到MQ,MQ再到消费者。而现在中间多了一个桥梁,所有的交互,都得通过springcloud stream进行交互,他可以简化我们的一个操作,使我们开发人员,更关注于业务逻辑,简化代码,暂时,他只支持rabbitmq和kafka,可以实现多个消息产品的互相切换,只需要配置信息,而不用我们开发人员,了解对应消息中间件底层的一些理念,实际上,只兼容一些通用的场景,所以,使用的场景不是很多。
2. 代码
2.1 生产者
2.1.1 pom.xml
<!--整合web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--spring cloud stream-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
2.1.2 application.yml
server:
port: 2000
spring:
application:
name: stream_producer
rabbitmq:
host: 对应ip
username: XXX
password: XXX
virtual-host: /XXX
改成自己对应的rabbitmq对应配置
2.1.3 发送
package com.fqyd.dao;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
/**
* Description:
* Author: wude
* Date: 2019/12/3 11:57
* Modified By:
*/
public interface SendMsg {
@Output("streamTest")
SubscribableChannel sendMsg();
}
注意:默认是topic模式,streamTest是主题
2.1.4 controller类
package com.fqyd.controller;
import com.fqyd.dao.SendMsg;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* Description:
* Author: wude
* Date: 2019/12/3 12:02
* Modified By:
*/
@RestController
public class SendMsgController {
@Autowired
private SendMsg sendMsg;
@GetMapping("/send")
public String sendMsg() {
String msg = UUID.randomUUID().toString();
Message build = MessageBuilder.withPayload(msg).build();
sendMsg.sendMsg().send(build);
return msg;
}
}
2.1.5 启动类
package com.fqyd.producer;
import com.fqyd.dao.SendMsg;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan(basePackages = {"com.fqyd"})
@EnableBinding(SendMsg.class)
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
注意:@EnableBinding,是实现springcloudstream和MQ的绑定
2.2 消费者
2.2.1 pom.xml
<!--整合web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--spring cloud stream-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2.2.2 application.yml
server:
port: 8086
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: XXX
username: XXX
password: XXX
virtual-host: /XXX
2.2.3 监听类
package com.fqyd.dao.impl;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
/**
* Description:
* Author: wude
* Date: 2019/12/3 15:32
* Modified By:
*/
@Component
public class Consumer {
@StreamListener("streamTest")
public void readMsg(String msg){
System.out.println("消费者获取生产者发送的消息:"+msg);
}
}
2.2.4 接收类
package com.fqyd.dao;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
public interface ReceiveMsg {
@Input("streamTest")
SubscribableChannel receive();
}
2.2.5 启动类
package com.fqyd.comsumer;
import com.fqyd.dao.ReceiveMsg;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan(basePackages = {"com.fqyd"})
@EnableBinding(ReceiveMsg.class)
public class ComsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ComsumerApplication.class, args);
}
}
3.测试
注意:1.队列名称是自动生成的,前缀是路由名称,后面是自动生成的。
2.消费者服务关了后,队列直接消失,而不是持久化,会丢失一部分数据
4.问题
启动两个消费者,端口为8085,8086,可以借用idea一个项目启动多个实例
https://blog.csdn.net/qq_16855077/article/details/99292037
访问接口,调用生产者http://localhost:2000/send
可以发现两个消费者,接收的数据重复勒。
解决方案:
增加消费者分组
server:
port: 8086
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: XXXX
username: XXX
password: XXXX
virtual-host: /XXX
bindings:
streamTest: ####路由器名称
group: stream ###消费组
增加如下配置
连续访问http://localhost:2000/send两次,发现输出不一样,说明已经实现了轮询操作
如果你热衷技术,喜欢交流,欢迎加入我们!
更多推荐
所有评论(0)