spring-boot 2.3.x 整合rocketmq


本地项目的基础环境

环境版本
jdk1.8.0_201
maven3.6.0
Spring-boot2.3.3.RELEASE

1、rocketMq的安装(docker形式)

这里使用docker,做一个快速的单机版本安装,需要更详细的其他形式的安装,可以查看其他的相关资料或者官网地址

https://github.com/apache/rocketmq-docker

《docker环境下安装rockermq以及rockermq-console》

1.1、docker-compose.yml

version: '3'
services:
  namesrv:
    image: apacherocketmq/rocketmq
    container_name: namesrv
    ports:
    - 9876:9876
    volumes:
    - ./data/namesrv/logs:/home/rocketmq/logs
    command: sh mqnamesrv
  broker:
    image: apacherocketmq/rocketmq
    container_name: rmqbroker
    ports:
    - 10909:10909
    - 10911:10911
    - 10912:10912
    volumes:
    - ./data/broker/logs:/home/rocketmq/logs
    - ./data/broker/store:/home/rocketmq/store
    - ./data/broker/broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf
    command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
    depends_on:
    - namesrv
  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
    - 8080:8080
    environment:
      JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
    depends_on:
    - namesrv

2、构建一个rocketMq的项目

主要是导入rocketmq-spring-boot-starter的包,以及spring-boot-starter-web的包;

导入web的包,是再消费的时候,没有守护线程,程序启动后,就会自动退出,导入web包后,tomcat容器启动,消费的线程就不退出了;

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.badger</groupId>
    <artifactId>badger-spring-boot-rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>badger-spring-boot-rocketmq</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3.1、定义yml配置文件

rocketmq:
  name-server: localhost:9876
  producer:
    group: test-producer-group

org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.class具体更可以参看配置类

部分详细配置如下:

# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
  name-server: 101.133.227.13:9876 # RocketMQ Namesrv
  # Producer 配置项
  producer:
    group: test-producer-group # 生产者分组
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置项
  consumer:
    listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
      test-consumer-group:
        topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费

3.2、生产者测试代码

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { RocketmqApplicaltion.class })
public class TestApp {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void syncSend() {
        for (int i = 0; i < 100; i++) {
            // 同步消息
            Message<String> bashMessage = new GenericMessage<String>("test_producer" + i);
            SendResult syncSend = rocketMQTemplate.syncSend("test_producer", bashMessage);
            System.out.println(syncSend);
        }
    }

    @Test
    public void asyncSend() {
        for (int i = 0; i < 100; i++) {
            // 异步消息
            Message<String> message = new GenericMessage<String>("test_producer" + i);
            rocketMQTemplate.asyncSend("test_producer", message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.println("发送失败");
                }
            });
        }
    }

    @Test
    public void sendOneWay() {
        for (int i = 0; i < 100; i++) {
            // 单向发送消息
            Message<String> message = new GenericMessage<String>("test_producer" + i);
            rocketMQTemplate.sendOneWay("test_producer", message);
            System.out.println("只发送一次");
        }
    }

    @Test
    public void syncSendOrder() {
        // 发送有序消息
        String[] tags = new String[] { "TagA", "TagC", "TagD" };
        for (int i = 0; i < 10; i++) {
            // 加个时间前缀
            Message<String> message = new GenericMessage<String>("我是顺序消息" + i);
            SendResult sendResult = rocketMQTemplate.syncSendOrderly("test_producer:" + tags[i % tags.length], message,
                    i + "");
            System.out.println(sendResult);
        }
    }
}

注意:

1、跟原生rocketmq Api对比,rocketMQ start 的做了二次的封装,把同步异步发送消息,用方法名称做了区别;相同的是,无论是原生的api还是二次封装的api,异步调用的时候,回调是在参数体里的,毕竟异步发送需要等待回调,而同步发送可以只有回调。

2、顺序消息:严格顺序消息模式下,消费者收到的所有消息均是有顺序的

​ 发送消息的时候,消息被存储在MessageQueue队列里的,默认的时候,是4个队列;为了保证消息的顺序,是需要把相同业务的数据按照顺序写入对应的队列中,单个队列下,数据是严格有序的;

rocketMQ start 对原生api做了二次封装,提供了默认的MessageQueue选择器,用的字符串的hash算法实现的,如果不满足实际需求,需要重写选择器。

3、继承RocketMQListener接口,泛型类型写消息生产者Message<T>泛型一致;会有默认转换器,把二进制数据转成对应的泛型实体;

4、注解@RocketMQMessageListener标注的RocketMQListener接口实现中,默认使用的Push模式的consumer,

如果需要使用pull模式的,则需要单独使用模板类RocketMQTemplate,并且需要再yaml配置文件中,写上相应配置;详细可以参考源码 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

@Bean(CONSUMER_BEAN_NAME)
    @ConditionalOnMissingBean(DefaultLitePullConsumer.class)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
    public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
            throws MQClientException {
        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = consumerConfig.getGroup();
        String topicName = consumerConfig.getTopic();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");

        String accessChannel = rocketMQProperties.getAccessChannel();
        MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
        SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
        String selectorExpression = consumerConfig.getSelectorExpression();
        String ak = consumerConfig.getAccessKey();
        String sk = consumerConfig.getSecretKey();
        int pullBatchSize = consumerConfig.getPullBatchSize();

        DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
                groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
        return litePullConsumer;
    }

3.3、定义消费端代码

@Component
@RocketMQMessageListener(topic = "test_producer", consumerGroup = "test_consumer-group")
public class DemoConsumer implements RocketMQListener<String> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(String message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}

注意:

​ 1、消费端的注解@RocketMQMessageListener属性中consumerGroup,多个消费端(消费集群)消费同一个topic的时候,需要定义成一致;

2、消费端消费的时候,是会多线程的形式消费topic里的4个MessageQueue的,如果要消费顺序消息,需要指定属性consumeModeConsumeMode.ORDERLY,表示同步消费;

3.4、主启动类

@SpringBootApplication
public class RocketmqApplicaltion {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(RocketmqApplicaltion.class, args);
    }
}

《官方文档github》

《官方文档》

详细代码也可以参看《码云》

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐