SpringCloud08_消息驱动(Stream)
1、消息驱动Stream1.1 基本概念(1)引入目的构建消息驱动的微服务框架,为多种消息中间件提供统一的消息编程模型,目前仅支持RabbitMQ与KafKa;(2)标准MQ生产者与消费者之间靠消息媒介传递消息内容;消息必须走特定通道;(3)实现原理应用程序通过inputs与outputs来与SpringCloudStream的Binder对象交互。Inputs对应于消费者,Ouputs对应于生产
·
1、消息驱动Stream
1.1 基本概念
(1)引入目的
- 构建消息驱动的微服务框架,为多种消息中间件提供统一的消息编程模型,目前仅支持RabbitMQ与KafKa;
(2)标准MQ
- 生产者与消费者之间靠消息媒介传递消息内容;
- 消息必须走特定通道;
(3)实现原理
- 应用程序通过inputs与outputs来与SpringCloudStream的Binder对象交互。Inputs对应于消费者,Ouputs对应于生产者;
- 通过定义绑定器Binder作为中间对象负责与消息中间件的交互,实现了应用程序与消息中间件细节之间的解耦;
- Stream的消息通信方式遵循发布-订阅模式,Topic主题进行广播,在RabbitMQ为Exchange,在Kafka为Topic;
1.2 Stream编码常用注解
(1)基本流程
- Binder:连接中间件,屏蔽差异;
- Channel:通道,队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过频道(Channel)对队列进行配置;
- Source与Sink:消息的输入与输出;
(2)注解
1.3 Stream使用
(1)搭建消息驱动生产发送者
- 引入RabbitMQ中间件,也可以使用其他的,如Kafaka等
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2022</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>cloud-stream</groupId>
<artifactId>cloud-stream</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
</project>
- 配置文件
server:
port: 8801
eureka:
client:
register-with-eureka: true #是否要注册
fetchRegistry: true #是否抓取注册信息
service-url:
defaultZone: http://eureka7001:7001/eureka #,http://eureka7002:7002/eureka
instance: #修改主机名
instance-id: stream-out
prefer-ip-address: true #访问路径显示IP
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
datasource:
url: jdbc:mysql://localhost:3306/springboot-mybatisplus?serverTimezone=Asia/Shanghai&useSSL=false&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8
username: root
password: 123456
driver-class-name=com: mysql.cj.jdbc.Driver
binder: defaultRabbit 报红,但是不影响。
- 主启动
package com.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication(scanBasePackages = "com.stream")
public class StreamProviderApplication {
public static void main(String[] args) {
SpringApplication.run(StreamProviderApplication.class, args);
}
}
- 消息发送MessageService
package com.stream.Service;
public interface MessageService {
public String send();
}
- 消息发送实现MessageServiceImpl
package com.stream.Service.Iml;
import com.stream.Service.MessageService;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import java.util.UUID;
@EnableBinding(Source.class) //定义消息推送管道
public class MessageServiceImpl implements MessageService {
@Resource
private MessageChannel output; //消息发送管道
@Override
public String send() {
String msg = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(msg).build());
return null;
}
}
- 消息发送Controller
package com.stream.Controller;
import com.stream.Service.MessageService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController {
@Resource
private MessageService messageService;
@GetMapping("/send")
public String send(){
return messageService.send();
}
}
- 启动并登录RabbitMQ,访问接口
(2)搭建消息驱动消费接收者
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2022</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>stream-consumer</groupId>
<artifactId>stream-consumer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
</project>
- application.yml:input变为output
server:
port: 8802
spring:
application:
name: cloud-stream-comsumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
#**************区别,消息发送者output,接受者input
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
datasource:
url: jdbc:mysql://localhost:3306/springboot-mybatisplus?serverTimezone=Asia/Shanghai&useSSL=false&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8
username: root
password: 123456
driver-class-name=com: mysql.cj.jdbc.Driver
eureka:
client:
register-with-eureka: true #是否要注册
fetchRegistry: true #是否抓取注册信息
service-url:
defaultZone: http://eureka7001:7001/eureka #,http://eureka7002:7002/eureka
instance: #修改主机名
instance-id: stream-in
prefer-ip-address: true #访问路径显示IP
- 主启动
package com.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication(scanBasePackages = "com.consumer")
public class StreamConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(StreamConsumerApplication.class, args);
}
}
- 消息接收Controller
package com.consumer.Controller;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class StreamConsumerController {
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("接收到消息为:"+message.getPayload()+ "-8802");
}
}
- 访问:浏览器调用消息发送接口发送消息
1.4 消息重复消费Group
- 同一个消息被两个服务接收,就会出现重复消费问题,比如以上例子,再建一个相同的消息接收方,Stream就会默认为是两个组,就会出现重复消费;
- 同一个group中的多个消费者是竞争关系,只要保证消息被其中一个应用消费,同组其他应用就无法消费;
- 不同组之间可以实现重复消费。
分组前:
添加配置分组
分组后:
1.5 消息持久化
- 给其中一个消息接受方去掉group分组属性;
- 然后停掉两个消息接受服务;
- 发送4条消息;
- 启动消息接受服务;
- 去掉group的消息接受方不会接受消息,另一方会接受到4条消息;
更多推荐
已为社区贡献2条内容
所有评论(0)