1.简介

springcloud stream消息驱动,我们可以理解为一个中间件,MQ,数据交互,以前是生产者,发送数据到MQ,MQ再到消费者。而现在中间多了一个桥梁,所有的交互,都得通过springcloud stream进行交互,他可以简化我们的一个操作,使我们开发人员,更关注于业务逻辑,简化代码,暂时,他只支持rabbitmq和kafka,可以实现多个消息产品的互相切换,只需要配置信息,而不用我们开发人员,了解对应消息中间件底层的一些理念,实际上,只兼容一些通用的场景,所以,使用的场景不是很多。

2. 代码

    2.1 生产者

          2.1.1  pom.xml

 <!--整合web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--spring cloud stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

                  2.1.2 application.yml

server:
  port: 2000


spring:
  application:
    name: stream_producer
  rabbitmq:
    host: 对应ip
    username: XXX
    password: XXX
    virtual-host: /XXX

改成自己对应的rabbitmq对应配置

                    2.1.3  发送

package com.fqyd.dao;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

/**
 * Description:
 * Author: wude
 * Date:  2019/12/3 11:57
 * Modified By:
 */
public interface SendMsg {
    @Output("streamTest")
    SubscribableChannel sendMsg();
}

注意:默认是topic模式,streamTest是主题

2.1.4  controller类

package com.fqyd.controller;

import com.fqyd.dao.SendMsg;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * Description:
 * Author: wude
 * Date:  2019/12/3 12:02
 * Modified By:
 */
@RestController
public class SendMsgController {
    @Autowired
    private SendMsg sendMsg;

    @GetMapping("/send")
    public String sendMsg() {
        String msg = UUID.randomUUID().toString();
        Message build = MessageBuilder.withPayload(msg).build();
        sendMsg.sendMsg().send(build);
        return msg;
    }
}

2.1.5 启动类 

package com.fqyd.producer;

import com.fqyd.dao.SendMsg;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = {"com.fqyd"})
@EnableBinding(SendMsg.class)
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

}

注意:@EnableBinding,是实现springcloudstream和MQ的绑定 

 2.2  消费者

      2.2.1 pom.xml

  <!--整合web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--spring cloud stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

       2.2.2 application.yml

server:
  port: 8086


spring:
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                  host: XXX
                  username: XXX
                  password: XXX
                  virtual-host: /XXX

 2.2.3 监听类

package com.fqyd.dao.impl;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

/**
 * Description:
 * Author: wude
 * Date:  2019/12/3 15:32
 * Modified By:
 */
@Component
public class Consumer {
    @StreamListener("streamTest")
    public  void readMsg(String msg){
        System.out.println("消费者获取生产者发送的消息:"+msg);
    }
}

2.2.4 接收类

package com.fqyd.dao;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;

public interface ReceiveMsg {
    @Input("streamTest")
    SubscribableChannel receive();
}

2.2.5 启动类

package com.fqyd.comsumer;

import com.fqyd.dao.ReceiveMsg;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = {"com.fqyd"})
@EnableBinding(ReceiveMsg.class)
public class ComsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ComsumerApplication.class, args);
    }

}

3.测试

http://localhost:2000/send

注意:1.队列名称是自动生成的,前缀是路由名称,后面是自动生成的。

            2.消费者服务关了后,队列直接消失,而不是持久化,会丢失一部分数据

4.问题

启动两个消费者,端口为8085,8086,可以借用idea一个项目启动多个实例

https://blog.csdn.net/qq_16855077/article/details/99292037

访问接口,调用生产者http://localhost:2000/send

可以发现两个消费者,接收的数据重复勒。

解决方案:

增加消费者分组

server:
  port: 8086


spring:
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                  host: XXXX
                  username: XXX
                  password: XXXX
                  virtual-host: /XXX
      bindings:
        streamTest:   ####路由器名称
          group: stream  ###消费组

 增加如下配置

连续访问http://localhost:2000/send两次,发现输出不一样,说明已经实现了轮询操作

 

如果你热衷技术,喜欢交流,欢迎加入我们! 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐