原创作者:田超凡(程序员田宝宝)

版权所有,引用请注明原作者,严禁复制转载

Part 1 理论部分

1 什么是SpringCloud Stream消息驱动?

SpringCloud Stream消息驱动可以简化开发人员对消息中间件使用的复杂度和耦合度,让开发人员更专注于核心业务逻辑的开发,SpringCloud Stream基于SpringBoot实现,具有自动化配置的功能,类似一些ORM框架,可以平滑切换多种不同的数据库,目前SpringCloud Stream消息驱动仅支持整合RabbitMQ和Kafka消息中间件。

2 SpringCloud Stream消息驱动实现的原理?

通过定义Binder绑定器作为中间层,实现了应用程序和消息中间件之间实现细节的隔离,通过向应用程序暴露统一的Channel通道,可以让应用程序不再需要考虑各种不同的消息中间件实现的兼容性问题,当需要升级消息中间件,或者更换其他的消息中间件产品时,我们需要做的就只是更换对应的Binder绑定器即可,不需要再修改任何应用中对接消息中间件的实现逻辑。

Stream消息驱动中有以下几个核心概念:

1 Source:当需要发送消息时,就需要使用Source来实现,Source会把需要发送的消息(POJO对象)进行序列化(默认转换成JSON格式的字符串),然后将这些数据发送到Channel中。

2 Channel:消息通道是Stream消息驱动的抽象之一,通常我们向消息中间件中发送消息或者消费消息的时候需要指定主题(Topic)名称或消息队列名称,但这样一来,当我们需要变更主题名称的时候就需要修改大量的消息发起方和消息消费方的代码,但是通过使用Channel消息通道,消息发起方和消息消费方的业务代码只需要连接到Channel消息通道就可以了,具体这个Channel消息通道对应的是哪个主题,就可以在配置文件中指定,这样当主题变更的时候我们不需要对代码做任何修改,就实现了业务代码和具体消息中间件的解耦。

3 Binder:Stream消息驱动中的另外一个抽象层,通过不同的Binder可以实现和不同的消息中间件整合,比如针对Kafka的Binder等等,通过Binder提供统一的消息收发接口,我们可以根据实际需要部署不同的消息中间件,或者根据实际生产环境中部署的消息中间件来调整我们的配置。

4 Sink:当需要监听消息时,就需要使用Sink来实现,Sink负责从Channel消息通道中获取消息,并将消息反序列化成消息对象(POJO对象),然后交给具体的消息消费方处理相应的业务逻辑。

Part 2 实践部分

消息驱动环境搭建

生产者环境

Maven依赖信息

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.0.1.RELEASE</version>

    </parent>

    <dependencies>

        <!-- SpringBoot整合Web组件 -->

        <dependency>

             <groupId>org.springframework.boot</groupId>

             <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <dependency>

             <groupId>org.springframework.cloud</groupId>

             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

             <version>2.0.1.RELEASE</version>

        </dependency>

    </dependencies>

application.yml信息

server:

  port: 9000

spring:

  application:

    name: spingcloud-stream-producer

#  rabbitmq:

#    host: 192.168.112.111

#    port: 5672

#    username: guest

#    password: guest

创建管道

// 创建管道接口

public interface SendMessageInterface {

     // 创建一个输出管道,用于发送消息

     @Output("my_msg")

     SubscribableChannel sendMsg();

}

发送消息

@RestController

public class SendMsgController {

      @Autowired

      private SendMessageInterface sendMessageInterface;

      @RequestMapping("/sendMsg")

      public String sendMsg() {

            String msg = UUID.randomUUID().toString();

            System.out.println("生产者发送内容msg:" + msg);

            Message build = MessageBuilder.withPayload(msg.getBytes()).build();

            sendMessageInterface.sendMsg().send(build);

            return "success";

      }

}

启动服务

@SpringBootApplication

@EnableBinding(SendMessageInterface.class) // 开启绑定

public class AppProducer {

      public static void main(String[] args) {

            SpringApplication.run(AppProducer.class, args);

      }

}

消费者环境

Maven

<parent>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-parent</artifactId>

            <version>2.0.1.RELEASE</version>

      </parent>

      <dependencies>

            <!-- SpringBoot整合Web组件 -->

            <dependency>

                  <groupId>org.springframework.boot</groupId>

                  <artifactId>spring-boot-starter-web</artifactId>

            </dependency>

            <dependency>

                  <groupId>org.springframework.cloud</groupId>

                  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

                  <version>2.0.1.RELEASE</version>

            </dependency>

      </dependencies>

application.yml

server:

  port: 9000

spring:

  application:

    name: spingcloud-stream-consumer

#  rabbitmq:

#    host: 192.168.112.111

#    port: 5672

#    username: guest

#    password: guest

管道中绑定消息

public interface RedMsgInterface {

      // 从管道中获取消息

      @Input("my_msg")

      SubscribableChannel redMsg();

}

消费者获取消息

@Component

public class Consumer {

      @StreamListener("my_msg")

      public void listener(String msg) {

            System.out.println("消费者获取生产消息:" + msg);

      }

}

启动消费者

@SpringBootApplication

@EnableBinding(RedMsgInterface.class)

public class AppConsumer {

      public static void main(String[] args) {

            SpringApplication.run(AppConsumer.class, args);

      }

}

消费组

在现实的业务场景中,每一个微服务应用为了实现高可用和负载均衡,都会集群部署,按照上面我们启动了两个应用的实例,消息被重复消费了两次。为解决这个问题,Spring Cloud Stream 中提供了消费组,通过配置 spring.cloud.stream.bindings.myInput.group 属性为应用指定一个组名,下面修改下配置文件

server:

  port: 8001

spring:

  application:

    name: spring-cloud-stream

#  rabbitmq:

#    host: 192.168.112.111

#    port: 5672

#    username: guest

#    password: guest

  cloud:

    stream:

      bindings:

        mymsg: ###指定 管道名称

          #指定该应用实例属于 stream 消费组

          group: stream

修改消费者

@Component

public class Consumer {

      @Value("${server.port}")

      private String serverPort;

      @StreamListener("my_msg")

      public void listener(String msg) {

            System.out.println("消费者获取生产消息" + msg + "端口:" + serverPort);

      }

}

更改环境为kafka

Maven依赖

            <dependency>

                  <groupId>org.springframework.cloud</groupId>

                  <artifactId>spring-cloud-starter-stream-kafka</artifactId>

                  <version>2.0.1.RELEASE</version>

            </dependency>

生产者配置

server:

  port: 9000

spring:

  cloud:

    stream:

      # 设置成使用kafka

      kafka:

        binder:

          # Kafka的服务端列表,默认localhost

          brokers: 192.168.212.111:9092,192.168.212.112:9092,192.168.212.113:9092

          # Kafka服务端连接的ZooKeeper节点列表,默认localhost

          zkNodes: 192.168.212.111:2181,192.168.212.112:2181,192.168.212.113:2181

          minPartitionCount: 1

          autoCreateTopics: true

          autoAddPartitions: true

消费者配置

server:

  port: 8000

spring:

  application:

    name: springcloud_kafka_consumer

  cloud:

     instance-count: 1

     instance-index: 0

     stream:

        kafka:

          binder:

            brokers: 192.168.212.111:9092,192.168.212.112:9092,192.168.212.113:9092

            zk-nodes: 192.168.212.111:2181,192.168.212.112:2181,192.168.212.113:2181

            auto-add-partitions: true

            auto-create-topics: true

            min-partition-count: 1

        bindings:

          input:

            destination: my_msg

            group: s1

            consumer:

              autoCommitOffset: false

              concurrency: 1

              partitioned: false

本文部分素材转载自蚂蚁课堂

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐