介绍

RocketMQ是出自阿里巴巴的一款开源消息中间件,在设计上借鉴了Kafka,2017年成为Apache顶级项目,虽然目前社区无法和Kafka比肩,但其历经多次天猫双十一的考验,其性能和稳定是毋庸置疑的。

当前市面上常用的四款:ActiveMQ、RabbitMQ、RocketMQ、Kafka,其优缺点各有千秋,但就分布式,功能方面,RocketMQ还是最优选择。想继续了解的话,可以自行百度,这里分享一篇比较通俗易懂的博文:

https://www.cnblogs.com/williamjie/p/9481780.html

话不多说,开始搭建! 

一、准备工作

首先在阿里云开通RocketMQ消息队列(如果已经配过,跳过此步骤吧)

点进test实例详情

创建topic

 创建Group

记录AccessKey,和Access Key Secret

点击头像里的accesskeys,查看accesskey,并记录下来,后面要用到!

 二、Springboot2.0集成RocketMQ

1. pom.xml

<!-- 阿里mq -->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.4.Final</version>
</dependency>
 <!--非必须,用于简化配置类-->
<dependency>
    <groupId> org.springframework.boot </groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

 2.application.yml

rocketmq 如何配置数据,步骤一图内有介绍哦!

server:
  port: 8098
aliyun:
  rocketmq:
    onsAddr: ********     # TCP 接入域名
    accessKey: ********   # AccessKey
    secretKey: ********   # SecretKey
    groupId: GID_groupId 
    topic: test
    timeout: 3000         #超时时间

 3.参数配置类 RocketMqProperties.java

package com.simpleleaf.leafrocketmq.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;


/**
 * @author FCX
 * @date Create in 16:42 2019/11/15
 * @description 阿里云RocketMq参数配置
 */
@Data
@Component
@ConfigurationProperties(prefix = "aliyun.rocketmq")
public class RocketMqProperties {
    /**
     * TCP 接入域名
     */
    private String onsAddr;
    /**
     * 阿里云身份验证:AccessKey
     */
    private String accessKey;
    /**
     * 阿里云身份验证:SecretKey
     */
    private String secretKey;
    /**
     * Group管理配置group id
     */
    private String groupId;
    /**
     * 配置的topic名
     */
    private String topic;
    /**
     * 生产标签,可自定义,默认通配
     */
    private String tag;
    /**
     * 超时时间
     */
    private String timeout;
}

4.生产者配置 RocketMqProducerConfig.java

package com.simpleleaf.leafrocketmq.config;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;

/**
 * @author FCX
 * @date Create in 16:51 2019/11/15
 * @description 阿里云RocketMq生产者配置
 */
@Component
public class RocketMqProducerConfig {
    @Autowired
    private RocketMqProperties rocketMqProperties;

    private static Producer producer;

    @PostConstruct
    public void init(){
        System.out.println("初始化启动生产者!");
        // producer 实例配置初始化
        Properties properties = new Properties();
        //您在控制台创建的Group ID
        properties.setProperty(PropertyKeyConst.GROUP_ID,rocketMqProperties.getGroupId());
        // AccessKey 阿里云身份验证
        properties.setProperty(PropertyKeyConst.AccessKey, rocketMqProperties.getAccessKey());
        // SecretKey 阿里云身份验证
        properties.setProperty(PropertyKeyConst.SecretKey, rocketMqProperties.getSecretKey());
        //设置发送超时时间(毫秒)
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, rocketMqProperties.getTimeout());
        // 设置 TCP 接入域名
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, rocketMqProperties.getOnsAddr());
        producer = ONSFactory.createProducer(properties);
        // 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
        producer.start();
    }

    /**
     * 初始化生产者
     * @return
     */
    public Producer getProducer() {
        return producer;
    }
}

5.消费者配置 RocketMqConsumerConfig.java

package com.simpleleaf.leafrocketmq.config;

import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.simpleleaf.leafrocketmq.listen.RocketMqTestListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Properties;

/**
 * @author FCX
 * @date Create in 17:39 2019/11/15
 * @description 阿里云RocketMq消费者
 */
@Component
public class RocketMqConsumerConfig {
    @Autowired
    private RocketMqProperties rocketMqProperties;
    private static Consumer consumer;

    @PostConstruct
    public void init(){
        System.out.println("初始化启动消费者者!");
        // listen 实例配置初始化
        Properties properties = new Properties();
        //您在控制台创建的Group ID
        properties.setProperty(PropertyKeyConst.GROUP_ID, rocketMqProperties.getGroupId());
        // AccessKey 阿里云身份验证
        properties.setProperty(PropertyKeyConst.AccessKey, rocketMqProperties.getAccessKey());
        // SecretKey 阿里云身份验证
        properties.setProperty(PropertyKeyConst.SecretKey, rocketMqProperties.getSecretKey());
        //设置发送超时时间(毫秒)
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, rocketMqProperties.getTimeout());
        // 设置 TCP 接入域名
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, rocketMqProperties.getOnsAddr());
        consumer = ONSFactory.createConsumer(properties);
        //监听topic,new对应的监听器
        consumer.subscribe(rocketMqProperties.getTopic(),rocketMqProperties.getTag(), new RocketMqTestListener());
        // 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
        consumer.start();
    }

    /**
     * 初始化消费者
     * @return
     */
    public Consumer getConsumer(){
        return consumer;
    }
}

6. 生产者业务层  RocketMqProducerImpl.java

package com.simpleleaf.leafrocketmq.service;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.simpleleaf.leafrocketmq.config.RocketMqProducerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * @author FCX
 * @date Create in 16:11 2019/11/16
 * @description 生产者
 */
@Service
@Slf4j
public class RocketMqProducerImpl  {
    @Autowired
    private RocketMqProducerConfig rocketMqProducerConfig;

    /**
     * 同步发送实体对象消息
     * 可靠同步发送:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式;
     * 特点:速度快;有结果反馈;数据可靠;
     * 应用场景:应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等;
     */
    public boolean sendMsg(String msg) {
        Long startTime = System.currentTimeMillis();
        Message message = new Message("test", "*", msg.getBytes());
        SendResult sendResult = rocketMqProducerConfig.getProducer().send(message);
        if (sendResult != null) {
            System.out.println(new Date() + " Send mq message success. Topic is:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
        } else {
            log.warn(".sendResult is null.........");
        }
        Long endTime = System.currentTimeMillis();
        System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
        return true;
    }

    /**
     * 异步发送消息
     * 可靠异步发送:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式;
     * 特点:速度快;有结果反馈;数据可靠;
     * 应用场景:异步发送一般用于链路耗时较长,对 rt响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等;
     * @param msg
     * @return
     */
    public boolean sendMsgAsy(String msg) {
        Long startTime = System.currentTimeMillis();
        Message message = new Message("test", "*", msg.getBytes());
        rocketMqProducerConfig.getProducer().sendAsync(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                ///消息发送成功
                System.out.println("send message success. topic=" + sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                //消息发送失败
                System.out.println("send message failed. execption=" + context.getException());
            }
        });
        Long endTime = System.currentTimeMillis();
        System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
        return true;
    }

    /**
     * 单向发送
     * 单向发送:只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答;此方式发送消息的过程耗时非常短,一般在微秒级别;
     * 特点:速度最快,耗时非常短,毫秒级别;无结果反馈;数据不可靠,可能会丢失;
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集;
     * @return
     */
    public boolean sendMsgOneway(String msg) {
        Long startTime = System.currentTimeMillis();
        Message message = new Message("test", "*", msg.getBytes());
        rocketMqProducerConfig.getProducer().sendOneway(message);
        Long endTime = System.currentTimeMillis();
        System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
        return true;
    }
}

7 消费者监听层 RocketMqListener.java

package com.simpleleaf.leafrocketmq.listen;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * @author FCX
 * @date Create in 16:12 2019/11/16
 * @description 消费者
 */
@Service
@Slf4j
public class RocketMqListener implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        try {
            Long startTime = System.currentTimeMillis();
            byte[] body = message.getBody();
            //获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
            String msg = new String(body);
            //TODO 业务逻辑,自行设计
            System.out.println("MessageListener.Receive message:"+msg);
            Long endTime = System.currentTimeMillis();
            System.out.println("单次消费耗时:"+(endTime-startTime)/1000);
        } catch (Exception e) {
            log.error("MessageListener.consume error:" + e.getMessage(), e);
        }
        log.info("MessageListener.Receive message");
        //如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
        return Action.CommitMessage;
    }
}

8 controller层

package com.simpleleaf.leafrocketmq.controller;

import com.simpleleaf.leafrocketmq.service.RocketMqProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author FCX
 * @date Create in 18:22 2019/11/16
 * @description 测试
 */
@RestController
public class TestController {
    @Autowired
    private RocketMqProducerService rocketMqProducerService;

    @GetMapping("/test")
    public Object test() {
        rocketMqProducerService.sendMsg("我是一个测试");
        return null;
    }
}

启动服务,初始化配置成功!

请求服务: localhost:8098/test

可以看到消息发送成功,并接受成功!

本文参考: https://blog.csdn.net/alan_liuyue/article/details/86645887

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐