Spring Cloud Stream使用
Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在这个基础上提供了R
一、什么是Spring Cloud Stream
Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)
。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在这个基础上提供了RocketMQ的支持,简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者
二、 Spring Cloud Stream的一些概念
要理解和使用Spring Cloud Stream需要先明白Spring Cloud Stream提出的一些概念。
- 辅助图
假设:模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream
。Spring Cloud Stream想让我们不关心如何获取数据,如何发送数据,而只专心处理自己的业务。还拿上面的例子来说,假设你现在负责的是系统里的模块3,它的功能是将模块2传来的字符串全部转成大写,然后再将这个转化后的字符串发给模块4。
- Binder
什么是Binder?一句话概括就是具体中间件的统一抽象。一个kafka中间件在Spring Cloud Stream里是一个Binder,一个rabbitMQ中间件也是一个Binder
。官方文档中写道:当你引入spring-cloud-stream依赖的时候,Spring Cloud Stream就会为你的那个中间件生成一个Binder实例,你就可以通过这个Binder实例来和这个消息中间件通信(收发数据)。很容易得出结论,Spring Cloud Stream对底层中间件的差异屏蔽都是基于我们的Binder,Binder适配了不同的消息中间件
(官方文档中写道:Spring Cloud Stream为kafka和rabbitMQ提供了Binder的实现了)。
- Binding
Binding是个比较抽象的概念,那下面的例子来说:
public String handle(String source){
return source.toUpperCase();
}
这是你写的模块3中的业务代码,我们假设你与模块2交互使用的是中间件kafka和与模块4交互使用的是中间件rabbitMQ。也即你的模块的功能就变为了从kafka中获取数据,将获取的字符串数据全转为大写并写出给rabbitMQ。很明显,这里有两个Binder,一个kafka Binder一个rabbitMQ Binder
。而你这个业务处理函数其实也有两个功能:接收中间件的输入和将返回数据输出。再结合Binder,我们可以理解为:
- 函数接收kafka Binder中的输入
- 函数将返回结果写出给rabbitMQ binder。
但是如何表示这种关系呢?也即你现在写了一个函数,怎么表示这个函数的参数是从kafka入的,函数的返回是向rabbitMQ输出的呢?这就需要Binding
。Binding其实就是一座桥,桥的一头是Binder,另一头是你的业务处理函数
。Bindings将外部消息中间件与你的业务处理代码连接在了一起(官方原话是:外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由Binder创建))。
了解了这些其实也就了解Spring Cloud Stream的架构图,Spring Cloud Stream官网中有一张图讲了它的架构:
首先最底层的Middleware是中间件,我们的kafka,rabbitMQ都属于中间件。上一层的Binder已经讲了,是对中间件的一层抽象和封装。再上一层的inputs和outputs其实就是Bindings,我们与Binder的交互就是通过Binding
,其中写出数据就是output,而获取数据就是input。再上层的Application Core就是我们自己的业务代码,可以看到我们的业务代码通过Binding(input、output)与Binder交互,而Binder又负责和具体中间件交互。
- 函数式接口
Spring Cloud Stream 2.x与Spring Cloud Stream 3.x最大的不同就是2.x是基于注解的,而3.x是基于函数式编程的。
还拿上面的例子来说:对于你开发的一个模块而言,它无非三种情况:
- 从上一个模块获取数据,将这个数据转发到下一个模块
- 从上一个模块获取数据,自己处理完后不再将这个数据发给别的模块
- 不需要从别处获取数据,自己就是数据源,将自己的数据发送到下一个模块。
这三种模式其实就对应Java 8函数式编程中的三个接口:Function、Consumer、Supplier
(不了解这三个接口的可自行搜索相关资料,关键字:Java 8;函数式接口)。
现在我们还来模拟之前的系统,首先模块1是系统的入口模块,不需要其他模块提供数据源,换言之它是个生产者,那么模块1就可以使用接口Supplier(只有返回没有入参)。我们假设模块1的功能是生成字符串,那模块1的代码可以写为:
public Supplier<String> produceStr(){
return () -> "hello spring cloud stream";
}
模块2会消费模块1的字符串,并将它全部转为大写,然后再将转化后的字符串写出。很明显模块2既是生产者也是消费者,那模块2就可以使用接口Function(既有返回也有入参)。模块2的代码为:
public Function<String,String> upperCase(){
return String::toUpperCase;
}
模块3会消费模块2的字符串,并将它直接打印到控制台,且模块3不再将字符串写出,很明显模块3只是一个消费者,那模块3就可以使用接口Consumer(只有入参没有返回)。模块3的代码为:
public Consumer<String> log(){
return System.out::println;
}
可以看到,我们将自己的业务处理都封装成了一个函数式接口,并作为一个函数的返回。在实际的开发中上面的那些函数都会被标上@Bean注解,注入到Spring容器,也即:
@Bean
public Supplier<String> produceStr(){
return () -> "hello spring cloud stream";
}
@Bean
public Function<String,String> upperCase(){
return String::toUpperCase;
}
@Bean
public Consumer<String> log(){
return System.out::println;
}
我们知道这代表向Spring中注入一个Bean,其中Bean的名字就是函数名,而Bean本身就是函数的返回。也即我们将自己的业务处理逻辑包装成一个对象(函数式接口)注入到了Spring IOC中。
现在假设Binder收到了一条数据,那它会寻找Binding,而Binding是一个桥梁,它会连接一个我们的处理函数,处理函数其实就是这里的Bean,Binder拿到Bean后,自然就会调用Bean的处理函数来处理(因为是函数式接口)。
如果用一张图来描述的话,大概就是这样:
这里主要讲的就是我们之前的业务处理被函数式接口包装成了对象,包装成对象后就可以注入到Spring IOC中,这样的一个Bean对象就可以对应一个Binding,通过Binding与Binder交互。
三、案例
说了那么多,还是没讲怎么使用。我们不妨还以文档一开始的那个例子来作为编码案例:
现在的需求如下:
- 模块1生产字符串,并将字符串写出到kafka
- 模块2消费模块1的字符串,并将字符串转为大写,输出到rabbitMQ
- 模块3消费模块2的字符串,并将字符串打印到控制台。
也即:
为了项目的简洁,我们将上述模块1、模块2和模块3写在一个项目中。
- docker-compose,kafka与rabbitMQ
rabbitmq:
image: rabbitmq:3.10.6
container_name: rabbitmq
build:
context: ./rabbitmq
environment:
RABBITMQ_DEFAULT_USER: ruoyi
RABBITMQ_DEFAULT_PASS: ruoyi123
ports:
- "15672:15672" # 管理界面端口
- "5672:5672" # api 端口
volumes:
- /docker/rabbitmq/log:/var/log/rabbitmq
- /docker/rabbitmq/data:/var/lib/rabbitmq
zookeeper:
image: 'bitnami/zookeeper:3.8.0'
container_name: zookeeper
ports:
- "2181:2181"
- "28080:8080"
environment:
TZ: Asia/Shanghai
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_SERVER_ID: 1
ZOO_PORT_NUMBER: 2181
# 自带的控制台 一般用不上可自行开启
ZOO_ENABLE_ADMIN_SERVER: "no"
# 自带控制台的端口
ZOO_ADMIN_SERVER_PORT_NUMBER: 8080
kafka:
image: 'bitnami/kafka:3.2.0'
container_name: kafka
ports:
- "9092:9092"
environment:
TZ: Asia/Shanghai
# 更多变量 查看文档 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
KAFKA_BROKER_ID: 1
# 监听端口
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
# 实际访问ip 本地用 127 内网用 192 外网用 外网ip
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.8:9092
KAFKA_CFG_ZOOKEEPER_CONNECT: 192.168.1.8:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
volumes:
- /docker/kafka/data:/bitnami/kafka/data
depends_on:
- zookeeper
- maven依赖
我们需要的依赖并不多,其实只需要rabbit和kafka的依赖,整个项目的maven配置如下:
<!-- 主要依赖,版本由导入的springcloud帮我们控制-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
- 配置文件之Binder
之前已经讲了一个kafka实例或者rabbitMQ实例其实就是一个binder,那你现在有了一个kafka,要如何告诉Spring Cloud呢?最简单的就是通过配置文件,配置文件配置Binder的思想很简单,就是告诉Spring Cloud Stream,我要创建一个Binder,这个Binder的类型是kafka或者rabbitMQ,然后它的IP,端口都是啥以及用户名密码等都是啥就好了
。
我们先以kafka为例,配置kafka为Binder有两种方式:
- 方式1:
spring:
cloud:
stream:
kafka:
binder:
# kafka的Ip和端口,可以是集群
brokers: ip:port
方式2:
spring:
cloud:
stream:
binders:
# 你的binder名字,自己随意取,我取的名字叫myKafka
myKafka:
# 你的binder类型,我们这里类型是kafka
type: kafka
# 下面的环境配置与上面的一模一样
environment:
spring:
cloud:
stream:
kafka:
binder:
# kafka的Ip和端口,可以是集群
brokers: ip:port
很明显,第二种比第一种更复杂,你如果只有一个kafka实例,那直接用第一种就可以了,但如果你的项目中有多个kafka实例,比如项目2和项目1之间用的是kafka,项目2和项目3间也用的kafka,这两个kafka又是不是同一套kafka。所以,第二种配置可以配备多个kafka实例,如:
spring:
cloud:
stream:
binders:
myKafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip1:port1
myKafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip2:port2
另外,如果你有多个kafka实例,使用第一种方式下配备的属性信息会被这多个kafka实例共享
,如:
spring:
cloud:
stream:
kafka:
binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
binders:
myKafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip1:port1
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
myKafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip2:port2
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
我们现在有两个kafka实例myKafka1和myKafka2,但我们在一开头配置了security.protocol = SASL_PLAINTEXT和 sasl.mechanism = PLAIN,这是kafka的安全配置,这个配置信息会被myKafka1和myKafka2都具备。也即在一开始的这些配置会被每个kafka实例都具有,因此一些公共的配置可以放在一开始。
同理rabbitMQ的配置也有两种:
spring:
rabbitmq:
host: 你的rabbitMQ的IP
port: 你的rabbitMQ的端口
username: 用户名
password: 密码
YAML 复制
spring:
cloud:
stream:
binders:
myRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 你的rabbitMQ的IP
port: 你的rabbitMQ的端口
username: 用户名
password: 密码
是的,如果你只有一个rabbitMQ实例可以使用第一种,但如果有多个,就得使用第二种,它与kafka配置的思路一模一样,这里不再赘述。虽然我们只有一个kafka实例和一个rabitMQ实例,但笔者依然采取了第二种配置文件,一则是考虑到以后实例增多改动比较小的可能,二则是第二种配置笔者认为更清晰。项目对于Binder的配置全部信息为:
spring:
cloud:
stream:
binders:
myRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 你的rabbitMQ的IP
port: 你的rabbitMQ的端口
username: 用户名
password: 密码
myKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ip:port
关于rabbitMQ与kafka更详细的配置,如自动提交,ACK等信息可以参考Spring官网,本文不再列出。
- 编写自己的业务代码
配置完Binder就代表你已经具备和外部消息中间件通信的能力了,现在你可以写自己的业务代码了:
package com.coderzoe.loggingconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier
@SpringBootApplication
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
/**
* 模块1 生产字符串
*/
@Bean
public Supplier<String> produceStr(){
return () -> "hello spring cloud stream";
}
/**
* 模块2,将生产的字符串转为大写
*/
@Bean
public Function<String,String> upperCase(){
return String::toUpperCase;
}
/**
* 模块3 将字符串打印
*/
@Bean
public Consumer<String> log(){
return System.out::println;
}
}
写完这些Bean后,我们还需要将它写到配置文件,告诉Spring Cloud,这些都是用于函数处理的Bean:
spring:
cloud:
function:
definition: produceStr;upperCase;log
- 配置文件之Bindings
现在我们有了Binder,也有了处理业务函数,肯定还差一个Binding,将Binder与业务处理联系起来
。联系起来的方法很简单,就是通过配置文件来配置。在讲配置文件前,我们先讲Binding的名称规范。Binding的命名是:<functionName>-in/out-<index>。
比如:
@Bean
public Supplier<String> produceStr(){
return () -> "hello spring cloud stream";
}
它的Binding名字就是produceStr-out-0。其中produceStr是函数名(也是Bean名),out代表这个Binding是向外写出的,而index是输入或输出绑定的索引
。对于典型的单个输入/输出函数,它始终为 0,因此它仅与具有多个输入和输出参数的函数相关(一个函数被多次作为输出/输出,比如这个函数被kafka和rabbitMQ都作为输出,那就是一个index0一个index1)。
再比如:
@Bean
public Function<String,String> upperCase(){
return String::toUpperCase;
}
它对应两个Binding,因为它既是输入又是输出(从kafka入数据,向rabbitMQ出数据),它们的名字是:upperCase-in-0、upperCase-out-0
。可以看到,我们通过名字就将Binding和处理函数做了关联。关联了Binding与处理函数,还需要关联Binding与Binder,它的配置写法如下:
spring:
application:
cloud:
stream:
bindings:
produceStr-out-0:
binder: myKafka
通过在你的Binding中指明使用的是哪个binder就可以了。
这样我们配置好了Binding,项目的Bindings完整配置如下:
spring:
cloud:
stream:
bindings:
produceStr-out-0:
binder: myKafka
destination: topic1
upperCase-in-0:
binder: myKafka
group: group1
destination: topic1
upperCase-out-0:
destination: topic2
binder: myRabbit
log-in-0:
binder: myRabbit
group: group1
destination: topic2
这里在配置Binding的时候比上面多了group和destination两个属性,其中group是消费组的意思,而destination是主题(topic)。
如果你不了解这两个概念,我建议你查阅一下kafka的相关资料。
- 主动发送消息
这样其实我们就完成了整个项目,启动项目你会发现:控制台会不断的打印HELLO SPRING CLOUD STREAM。
但这个消息我们是被动发送的,因为Binder调用我们的produceStr-out-0这个Binding来不断的发送消息。很多时候我们是希望主动的发送消息的,比如处理完一条用户请求后,将处理结果发送出去。
Spring Cloud Stream主动发送消息借助于StreamBridge,它的用法如下:
@Service
public class SendService {
@Autowired
private StreamBridge streamBridge;
public void send(String message){
streamBridge.send("upperCase-in-0",message);
}
}
可以看到,就是使用StreamBridge给一个in的Binding发送消息。
- 一些补充
- Message
我们刚才的文档一直在以字符串作为消息传递的数据,实际上消息传递的准确对象是org.springframework.messaging.Message,这是一个接口:
public interface Message<T> {
/**
* Return the message payload.
*/
T getPayload();
/**
* Return message headers for the message (never {@code null} but may be empty).
*/
MessageHeaders getHeaders();
}
也即生产者和消费者交互的对象其实是Message,我们之前写的String只是在生产时被Spring Cloud Stream封装为了Message,而在消费时又从Message转为了String
,因此我们其实完全可以这样写:
@Bean
public Supplier<Message<String>> produceStr(){
return () -> MessageBuilder.withPayload("hello spring cloud stream").build();
}
但大部分场景下没有必要,还是那句话,因为Spring Cloud Stream会为我们自动“装箱”和“拆箱”。另外,消息是支持发送Java对象的,比如:
public static class User{
String name;
int age;
//省略 getter setter和toString
}
@Bean
public Supplier<User> produceUser(){
return () -> new User("tom",18);
}
消费者可以写为:
@Bean
public Consumer<User> logUser(){
return s-> System.out.println(s.name+"_"+s.age);
}
很明显,对象想要被发送需要被序列化,且想要被消费也需要被反序列化,在Spring Cloud Stream中默认的序列化是json。也即对象会被以application/json的形式发送出去
。
这可以在配置文件中进行修改:
produceUser-out-0:
binder: myKafka
destination: topic1
content-type: application/json
是不是发现和Spring MVC有点眼熟,是的,其实就是Spring MVC那一套。
- 消费组
我们在配置文件之Bindings中为消费的Binding配置了一个group,Spring Cloud Stream建议大家为每个消费者都显示声明一个消费组,因为这样可以保证“断点续传”的功能。比如你消费者挂了,如果指明了消费组,重启后可以从之前挂掉的地方继续消费
,但如果没有指明消费组,Spring Cloud Stream会分配一个匿名的消费组,但每次启动这个名字可能都会变,这样可能会导致重启后重复消费。
更多推荐
所有评论(0)