一、什么是Spring Cloud Stream

Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在这个基础上提供了RocketMQ的支持,简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者

二、 Spring Cloud Stream的一些概念

要理解和使用Spring Cloud Stream需要先明白Spring Cloud Stream提出的一些概念。

  • 辅助图

在这里插入图片描述

假设:模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。Spring Cloud Stream想让我们不关心如何获取数据,如何发送数据,而只专心处理自己的业务。还拿上面的例子来说,假设你现在负责的是系统里的模块3,它的功能是将模块2传来的字符串全部转成大写,然后再将这个转化后的字符串发给模块4。

  1. Binder

什么是Binder?一句话概括就是具体中间件的统一抽象。一个kafka中间件在Spring Cloud Stream里是一个Binder,一个rabbitMQ中间件也是一个Binder。官方文档中写道:当你引入spring-cloud-stream依赖的时候,Spring Cloud Stream就会为你的那个中间件生成一个Binder实例,你就可以通过这个Binder实例来和这个消息中间件通信(收发数据)。很容易得出结论,Spring Cloud Stream对底层中间件的差异屏蔽都是基于我们的Binder,Binder适配了不同的消息中间件(官方文档中写道:Spring Cloud Stream为kafka和rabbitMQ提供了Binder的实现了)。

  1. Binding

Binding是个比较抽象的概念,那下面的例子来说:

public String handle(String source){
    return source.toUpperCase();
}

这是你写的模块3中的业务代码,我们假设你与模块2交互使用的是中间件kafka和与模块4交互使用的是中间件rabbitMQ。也即你的模块的功能就变为了从kafka中获取数据,将获取的字符串数据全转为大写并写出给rabbitMQ。很明显,这里有两个Binder,一个kafka Binder一个rabbitMQ Binder。而你这个业务处理函数其实也有两个功能:接收中间件的输入和将返回数据输出。再结合Binder,我们可以理解为:

  1. 函数接收kafka Binder中的输入
  2. 函数将返回结果写出给rabbitMQ binder。

但是如何表示这种关系呢?也即你现在写了一个函数,怎么表示这个函数的参数是从kafka入的,函数的返回是向rabbitMQ输出的呢?这就需要BindingBinding其实就是一座桥,桥的一头是Binder,另一头是你的业务处理函数。Bindings将外部消息中间件与你的业务处理代码连接在了一起(官方原话是:外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由Binder创建))。

了解了这些其实也就了解Spring Cloud Stream的架构图,Spring Cloud Stream官网中有一张图讲了它的架构:

SCSt with binder
SCSt with binder

首先最底层的Middleware是中间件,我们的kafka,rabbitMQ都属于中间件。上一层的Binder已经讲了,是对中间件的一层抽象和封装。再上一层的inputs和outputs其实就是Bindings,我们与Binder的交互就是通过Binding,其中写出数据就是output,而获取数据就是input。再上层的Application Core就是我们自己的业务代码,可以看到我们的业务代码通过Binding(input、output)与Binder交互,而Binder又负责和具体中间件交互。

  1. 函数式接口

Spring Cloud Stream 2.x与Spring Cloud Stream 3.x最大的不同就是2.x是基于注解的,而3.x是基于函数式编程的。

还拿上面的例子来说:对于你开发的一个模块而言,它无非三种情况:

  1. 从上一个模块获取数据,将这个数据转发到下一个模块
  2. 从上一个模块获取数据,自己处理完后不再将这个数据发给别的模块
  3. 不需要从别处获取数据,自己就是数据源,将自己的数据发送到下一个模块。

这三种模式其实就对应Java 8函数式编程中的三个接口:Function、Consumer、Supplier(不了解这三个接口的可自行搜索相关资料,关键字:Java 8;函数式接口)。

现在我们还来模拟之前的系统,首先模块1是系统的入口模块,不需要其他模块提供数据源,换言之它是个生产者,那么模块1就可以使用接口Supplier(只有返回没有入参)。我们假设模块1的功能是生成字符串,那模块1的代码可以写为:

public Supplier<String> produceStr(){
    return () -> "hello spring cloud stream";
}

模块2会消费模块1的字符串,并将它全部转为大写,然后再将转化后的字符串写出。很明显模块2既是生产者也是消费者,那模块2就可以使用接口Function(既有返回也有入参)。模块2的代码为:

public Function<String,String> upperCase(){
    return String::toUpperCase;
}

模块3会消费模块2的字符串,并将它直接打印到控制台,且模块3不再将字符串写出,很明显模块3只是一个消费者,那模块3就可以使用接口Consumer(只有入参没有返回)。模块3的代码为:

public Consumer<String> log(){
    return System.out::println;
}

可以看到,我们将自己的业务处理都封装成了一个函数式接口,并作为一个函数的返回。在实际的开发中上面的那些函数都会被标上@Bean注解,注入到Spring容器,也即:

@Bean
public Supplier<String> produceStr(){
    return () -> "hello spring cloud stream";
}

@Bean
public Function<String,String> upperCase(){
    return String::toUpperCase;
}

@Bean
public Consumer<String> log(){
    return System.out::println;
}

我们知道这代表向Spring中注入一个Bean,其中Bean的名字就是函数名,而Bean本身就是函数的返回。也即我们将自己的业务处理逻辑包装成一个对象(函数式接口)注入到了Spring IOC中。现在假设Binder收到了一条数据,那它会寻找Binding,而Binding是一个桥梁,它会连接一个我们的处理函数,处理函数其实就是这里的Bean,Binder拿到Bean后,自然就会调用Bean的处理函数来处理(因为是函数式接口)。

如果用一张图来描述的话,大概就是这样:
在这里插入图片描述

这里主要讲的就是我们之前的业务处理被函数式接口包装成了对象,包装成对象后就可以注入到Spring IOC中,这样的一个Bean对象就可以对应一个Binding,通过Binding与Binder交互。

三、案例

说了那么多,还是没讲怎么使用。我们不妨还以文档一开始的那个例子来作为编码案例:

现在的需求如下:

  1. 模块1生产字符串,并将字符串写出到kafka
  2. 模块2消费模块1的字符串,并将字符串转为大写,输出到rabbitMQ
  3. 模块3消费模块2的字符串,并将字符串打印到控制台。

也即:

在这里插入图片描述
为了项目的简洁,我们将上述模块1、模块2和模块3写在一个项目中。

  1. docker-compose,kafka与rabbitMQ
  rabbitmq:
    image: rabbitmq:3.10.6
    container_name: rabbitmq
    build:
      context: ./rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: ruoyi
      RABBITMQ_DEFAULT_PASS: ruoyi123
    ports:
      - "15672:15672" # 管理界面端口
      - "5672:5672"   # api 端口
    volumes:
      - /docker/rabbitmq/log:/var/log/rabbitmq
      - /docker/rabbitmq/data:/var/lib/rabbitmq

  zookeeper:
    image: 'bitnami/zookeeper:3.8.0'
    container_name: zookeeper
    ports:
      - "2181:2181"
      - "28080:8080"
    environment:
      TZ: Asia/Shanghai
      ALLOW_ANONYMOUS_LOGIN: "yes"
      ZOO_SERVER_ID: 1
      ZOO_PORT_NUMBER: 2181
      # 自带的控制台 一般用不上可自行开启
      ZOO_ENABLE_ADMIN_SERVER: "no"
      # 自带控制台的端口
      ZOO_ADMIN_SERVER_PORT_NUMBER: 8080

  kafka:
    image: 'bitnami/kafka:3.2.0'
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      TZ: Asia/Shanghai
      # 更多变量 查看文档 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
      KAFKA_BROKER_ID: 1
      # 监听端口
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
      # 实际访问ip 本地用 127 内网用 192 外网用 外网ip
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.8:9092
      KAFKA_CFG_ZOOKEEPER_CONNECT: 192.168.1.8:2181
      ALLOW_PLAINTEXT_LISTENER: "yes"
    volumes:
      - /docker/kafka/data:/bitnami/kafka/data
    depends_on:
      - zookeeper
  1. maven依赖

我们需要的依赖并不多,其实只需要rabbit和kafka的依赖,整个项目的maven配置如下:

        <!-- 主要依赖,版本由导入的springcloud帮我们控制-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
  1. 配置文件之Binder

之前已经讲了一个kafka实例或者rabbitMQ实例其实就是一个binder,那你现在有了一个kafka,要如何告诉Spring Cloud呢?最简单的就是通过配置文件,配置文件配置Binder的思想很简单,就是告诉Spring Cloud Stream,我要创建一个Binder,这个Binder的类型是kafka或者rabbitMQ,然后它的IP,端口都是啥以及用户名密码等都是啥就好了

我们先以kafka为例,配置kafka为Binder有两种方式:

  • 方式1:
spring:
  cloud:
    stream:
      kafka:
        binder:
          # kafka的Ip和端口,可以是集群
          brokers: ip:port

方式2:

spring:
  cloud:
    stream:
      binders:
        # 你的binder名字,自己随意取,我取的名字叫myKafka
        myKafka:
          # 你的binder类型,我们这里类型是kafka
          type: kafka
          # 下面的环境配置与上面的一模一样
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      # kafka的Ip和端口,可以是集群
                      brokers: ip:port

很明显,第二种比第一种更复杂,你如果只有一个kafka实例,那直接用第一种就可以了,但如果你的项目中有多个kafka实例,比如项目2和项目1之间用的是kafka,项目2和项目3间也用的kafka,这两个kafka又是不是同一套kafka。所以,第二种配置可以配备多个kafka实例,如:

spring:
  cloud:
    stream:
      binders:
        myKafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip1:port1
        myKafka2:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip2:port2

另外,如果你有多个kafka实例,使用第一种方式下配备的属性信息会被这多个kafka实例共享,如:

spring:
  cloud:
    stream:
      kafka:
        binder:
          configuration:
            security.protocol: SASL_PLAINTEXT
            sasl.mechanism: PLAIN
      binders:
        myKafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip1:port1
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
        myKafka2:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip2:port2
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"

我们现在有两个kafka实例myKafka1和myKafka2,但我们在一开头配置了security.protocol = SASL_PLAINTEXT和 sasl.mechanism = PLAIN,这是kafka的安全配置,这个配置信息会被myKafka1和myKafka2都具备。也即在一开始的这些配置会被每个kafka实例都具有,因此一些公共的配置可以放在一开始。

同理rabbitMQ的配置也有两种:

spring:
  rabbitmq:
    host: 你的rabbitMQ的IP
    port: 你的rabbitMQ的端口
    username: 用户名
    password: 密码
YAML 复制
spring:
  cloud:
    stream:
      binders:
        myRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 你的rabbitMQ的IP
                port: 你的rabbitMQ的端口
                username: 用户名
                password: 密码

是的,如果你只有一个rabbitMQ实例可以使用第一种,但如果有多个,就得使用第二种,它与kafka配置的思路一模一样,这里不再赘述。虽然我们只有一个kafka实例和一个rabitMQ实例,但笔者依然采取了第二种配置文件,一则是考虑到以后实例增多改动比较小的可能,二则是第二种配置笔者认为更清晰。项目对于Binder的配置全部信息为:

spring:
  cloud:
    stream:
      binders:
        myRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 你的rabbitMQ的IP
                port: 你的rabbitMQ的端口
                username: 用户名
                password: 密码
        myKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ip:port

关于rabbitMQ与kafka更详细的配置,如自动提交,ACK等信息可以参考Spring官网,本文不再列出。

  1. 编写自己的业务代码

配置完Binder就代表你已经具备和外部消息中间件通信的能力了,现在你可以写自己的业务代码了:

package com.coderzoe.loggingconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier

@SpringBootApplication
public class LoggingConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }
    /**
     * 模块1 生产字符串
     */
    @Bean
    public Supplier<String> produceStr(){
        return () -> "hello spring cloud stream";
    }
    /**
     * 模块2,将生产的字符串转为大写
     */
    @Bean
    public Function<String,String> upperCase(){
        return String::toUpperCase;
    }
    /**
     * 模块3 将字符串打印
     */
    @Bean
    public Consumer<String> log(){
        return System.out::println;
    }
}

写完这些Bean后,我们还需要将它写到配置文件,告诉Spring Cloud,这些都是用于函数处理的Bean:

spring:
  cloud:
    function:
      definition: produceStr;upperCase;log
  1. 配置文件之Bindings

现在我们有了Binder,也有了处理业务函数,肯定还差一个Binding,将Binder与业务处理联系起来。联系起来的方法很简单,就是通过配置文件来配置。在讲配置文件前,我们先讲Binding的名称规范。Binding的命名是:<functionName>-in/out-<index>。

比如:

@Bean
public Supplier<String> produceStr(){
    return () -> "hello spring cloud stream";
}

它的Binding名字就是produceStr-out-0。其中produceStr是函数名(也是Bean名),out代表这个Binding是向外写出的,而index是输入或输出绑定的索引。对于典型的单个输入/输出函数,它始终为 0,因此它仅与具有多个输入和输出参数的函数相关(一个函数被多次作为输出/输出,比如这个函数被kafka和rabbitMQ都作为输出,那就是一个index0一个index1)。

再比如:

@Bean
public Function<String,String> upperCase(){
    return String::toUpperCase;
}

它对应两个Binding,因为它既是输入又是输出(从kafka入数据,向rabbitMQ出数据),它们的名字是:upperCase-in-0、upperCase-out-0。可以看到,我们通过名字就将Binding和处理函数做了关联。关联了Binding与处理函数,还需要关联Binding与Binder,它的配置写法如下:

spring:
  application:
  cloud:
    stream:
      bindings:
        produceStr-out-0:
          binder: myKafka

通过在你的Binding中指明使用的是哪个binder就可以了。

这样我们配置好了Binding,项目的Bindings完整配置如下:

spring:
  cloud:
    stream:
      bindings:
        produceStr-out-0:
          binder: myKafka
          destination: topic1
        upperCase-in-0:
          binder: myKafka
          group: group1
          destination: topic1
        upperCase-out-0:
          destination: topic2
          binder: myRabbit
        log-in-0:
          binder: myRabbit
          group: group1
          destination: topic2

这里在配置Binding的时候比上面多了group和destination两个属性,其中group是消费组的意思,而destination是主题(topic)。如果你不了解这两个概念,我建议你查阅一下kafka的相关资料。

  1. 主动发送消息

这样其实我们就完成了整个项目,启动项目你会发现:控制台会不断的打印HELLO SPRING CLOUD STREAM。

image-20221004212533640
image-20221004212533640

但这个消息我们是被动发送的,因为Binder调用我们的produceStr-out-0这个Binding来不断的发送消息。很多时候我们是希望主动的发送消息的,比如处理完一条用户请求后,将处理结果发送出去。Spring Cloud Stream主动发送消息借助于StreamBridge,它的用法如下:

@Service
public class SendService {
    @Autowired
    private StreamBridge streamBridge;
    public void send(String message){
        streamBridge.send("upperCase-in-0",message);
    }
}

可以看到,就是使用StreamBridge给一个in的Binding发送消息。

  1. 一些补充
  • Message

我们刚才的文档一直在以字符串作为消息传递的数据,实际上消息传递的准确对象是org.springframework.messaging.Message,这是一个接口:

public interface Message<T> {
   /**
    * Return the message payload.
    */
   T getPayload();

   /**
    * Return message headers for the message (never {@code null} but may be empty).
    */
   MessageHeaders getHeaders();
}

也即生产者和消费者交互的对象其实是Message,我们之前写的String只是在生产时被Spring Cloud Stream封装为了Message,而在消费时又从Message转为了String,因此我们其实完全可以这样写:

@Bean
public Supplier<Message<String>> produceStr(){
    return () -> MessageBuilder.withPayload("hello spring cloud stream").build();
}

但大部分场景下没有必要,还是那句话,因为Spring Cloud Stream会为我们自动“装箱”和“拆箱”。另外,消息是支持发送Java对象的,比如:

public static class User{
    String name;
    int age;
    //省略 getter setter和toString
}
@Bean
public Supplier<User> produceUser(){
    return () -> new User("tom",18);
}

消费者可以写为:

@Bean
public Consumer<User> logUser(){
    return s-> System.out.println(s.name+"_"+s.age);
}

很明显,对象想要被发送需要被序列化,且想要被消费也需要被反序列化,在Spring Cloud Stream中默认的序列化是json。也即对象会被以application/json的形式发送出去

这可以在配置文件中进行修改:

produceUser-out-0:
  binder: myKafka
  destination: topic1
  content-type: application/json

是不是发现和Spring MVC有点眼熟,是的,其实就是Spring MVC那一套。

  1. 消费组

我们在配置文件之Bindings中为消费的Binding配置了一个group,Spring Cloud Stream建议大家为每个消费者都显示声明一个消费组,因为这样可以保证“断点续传”的功能。比如你消费者挂了,如果指明了消费组,重启后可以从之前挂掉的地方继续消费,但如果没有指明消费组,Spring Cloud Stream会分配一个匿名的消费组,但每次启动这个名字可能都会变,这样可能会导致重启后重复消费。

文章转自

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐