Springboot2.0集成阿里云RocketMQ
介绍RocketMQ是出自阿里巴巴的一款开源消息中间件,在设计上借鉴了Kafka,2017年成为Apache顶级项目,虽然目前社区无法和Kafka比肩,但其历经多次天猫双十一的考验,其性能和稳定是毋庸置疑的。当前市面上常用的四款:ActiveMQ、RabbitMQ、RocketMQ、Kafka,其优缺点各有千秋,但就分布式,功能方面,RocketMQ还是最优选择。想继续了解的话,可以自行百...
·
介绍
RocketMQ是出自阿里巴巴的一款开源消息中间件,在设计上借鉴了Kafka,2017年成为Apache顶级项目,虽然目前社区无法和Kafka比肩,但其历经多次天猫双十一的考验,其性能和稳定是毋庸置疑的。
当前市面上常用的四款:ActiveMQ、RabbitMQ、RocketMQ、Kafka,其优缺点各有千秋,但就分布式,功能方面,RocketMQ还是最优选择。想继续了解的话,可以自行百度,这里分享一篇比较通俗易懂的博文:
话不多说,开始搭建!
一、准备工作
首先在阿里云开通RocketMQ消息队列(如果已经配过,跳过此步骤吧)
点进test实例详情
创建topic
创建Group
记录AccessKey,和Access Key Secret
点击头像里的accesskeys,查看accesskey,并记录下来,后面要用到!
二、Springboot2.0集成RocketMQ
1. pom.xml
<!-- 阿里mq -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
<!--非必须,用于简化配置类-->
<dependency>
<groupId> org.springframework.boot </groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
2.application.yml
rocketmq 如何配置数据,步骤一图内有介绍哦!
server:
port: 8098
aliyun:
rocketmq:
onsAddr: ******** # TCP 接入域名
accessKey: ******** # AccessKey
secretKey: ******** # SecretKey
groupId: GID_groupId
topic: test
timeout: 3000 #超时时间
3.参数配置类 RocketMqProperties.java
package com.simpleleaf.leafrocketmq.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author FCX
* @date Create in 16:42 2019/11/15
* @description 阿里云RocketMq参数配置
*/
@Data
@Component
@ConfigurationProperties(prefix = "aliyun.rocketmq")
public class RocketMqProperties {
/**
* TCP 接入域名
*/
private String onsAddr;
/**
* 阿里云身份验证:AccessKey
*/
private String accessKey;
/**
* 阿里云身份验证:SecretKey
*/
private String secretKey;
/**
* Group管理配置group id
*/
private String groupId;
/**
* 配置的topic名
*/
private String topic;
/**
* 生产标签,可自定义,默认通配
*/
private String tag;
/**
* 超时时间
*/
private String timeout;
}
4.生产者配置 RocketMqProducerConfig.java
package com.simpleleaf.leafrocketmq.config;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
/**
* @author FCX
* @date Create in 16:51 2019/11/15
* @description 阿里云RocketMq生产者配置
*/
@Component
public class RocketMqProducerConfig {
@Autowired
private RocketMqProperties rocketMqProperties;
private static Producer producer;
@PostConstruct
public void init(){
System.out.println("初始化启动生产者!");
// producer 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID,rocketMqProperties.getGroupId());
// AccessKey 阿里云身份验证
properties.setProperty(PropertyKeyConst.AccessKey, rocketMqProperties.getAccessKey());
// SecretKey 阿里云身份验证
properties.setProperty(PropertyKeyConst.SecretKey, rocketMqProperties.getSecretKey());
//设置发送超时时间(毫秒)
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, rocketMqProperties.getTimeout());
// 设置 TCP 接入域名
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, rocketMqProperties.getOnsAddr());
producer = ONSFactory.createProducer(properties);
// 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
producer.start();
}
/**
* 初始化生产者
* @return
*/
public Producer getProducer() {
return producer;
}
}
5.消费者配置 RocketMqConsumerConfig.java
package com.simpleleaf.leafrocketmq.config;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.simpleleaf.leafrocketmq.listen.RocketMqTestListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
/**
* @author FCX
* @date Create in 17:39 2019/11/15
* @description 阿里云RocketMq消费者
*/
@Component
public class RocketMqConsumerConfig {
@Autowired
private RocketMqProperties rocketMqProperties;
private static Consumer consumer;
@PostConstruct
public void init(){
System.out.println("初始化启动消费者者!");
// listen 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, rocketMqProperties.getGroupId());
// AccessKey 阿里云身份验证
properties.setProperty(PropertyKeyConst.AccessKey, rocketMqProperties.getAccessKey());
// SecretKey 阿里云身份验证
properties.setProperty(PropertyKeyConst.SecretKey, rocketMqProperties.getSecretKey());
//设置发送超时时间(毫秒)
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, rocketMqProperties.getTimeout());
// 设置 TCP 接入域名
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, rocketMqProperties.getOnsAddr());
consumer = ONSFactory.createConsumer(properties);
//监听topic,new对应的监听器
consumer.subscribe(rocketMqProperties.getTopic(),rocketMqProperties.getTag(), new RocketMqTestListener());
// 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
consumer.start();
}
/**
* 初始化消费者
* @return
*/
public Consumer getConsumer(){
return consumer;
}
}
6. 生产者业务层 RocketMqProducerImpl.java
package com.simpleleaf.leafrocketmq.service;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.simpleleaf.leafrocketmq.config.RocketMqProducerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* @author FCX
* @date Create in 16:11 2019/11/16
* @description 生产者
*/
@Service
@Slf4j
public class RocketMqProducerImpl {
@Autowired
private RocketMqProducerConfig rocketMqProducerConfig;
/**
* 同步发送实体对象消息
* 可靠同步发送:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式;
* 特点:速度快;有结果反馈;数据可靠;
* 应用场景:应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等;
*/
public boolean sendMsg(String msg) {
Long startTime = System.currentTimeMillis();
Message message = new Message("test", "*", msg.getBytes());
SendResult sendResult = rocketMqProducerConfig.getProducer().send(message);
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
} else {
log.warn(".sendResult is null.........");
}
Long endTime = System.currentTimeMillis();
System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
return true;
}
/**
* 异步发送消息
* 可靠异步发送:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式;
* 特点:速度快;有结果反馈;数据可靠;
* 应用场景:异步发送一般用于链路耗时较长,对 rt响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等;
* @param msg
* @return
*/
public boolean sendMsgAsy(String msg) {
Long startTime = System.currentTimeMillis();
Message message = new Message("test", "*", msg.getBytes());
rocketMqProducerConfig.getProducer().sendAsync(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
///消息发送成功
System.out.println("send message success. topic=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
//消息发送失败
System.out.println("send message failed. execption=" + context.getException());
}
});
Long endTime = System.currentTimeMillis();
System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
return true;
}
/**
* 单向发送
* 单向发送:只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答;此方式发送消息的过程耗时非常短,一般在微秒级别;
* 特点:速度最快,耗时非常短,毫秒级别;无结果反馈;数据不可靠,可能会丢失;
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集;
* @return
*/
public boolean sendMsgOneway(String msg) {
Long startTime = System.currentTimeMillis();
Message message = new Message("test", "*", msg.getBytes());
rocketMqProducerConfig.getProducer().sendOneway(message);
Long endTime = System.currentTimeMillis();
System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
return true;
}
}
7 消费者监听层 RocketMqListener.java
package com.simpleleaf.leafrocketmq.listen;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author FCX
* @date Create in 16:12 2019/11/16
* @description 消费者
*/
@Service
@Slf4j
public class RocketMqListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
Long startTime = System.currentTimeMillis();
byte[] body = message.getBody();
//获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
String msg = new String(body);
//TODO 业务逻辑,自行设计
System.out.println("MessageListener.Receive message:"+msg);
Long endTime = System.currentTimeMillis();
System.out.println("单次消费耗时:"+(endTime-startTime)/1000);
} catch (Exception e) {
log.error("MessageListener.consume error:" + e.getMessage(), e);
}
log.info("MessageListener.Receive message");
//如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
return Action.CommitMessage;
}
}
8 controller层
package com.simpleleaf.leafrocketmq.controller;
import com.simpleleaf.leafrocketmq.service.RocketMqProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author FCX
* @date Create in 18:22 2019/11/16
* @description 测试
*/
@RestController
public class TestController {
@Autowired
private RocketMqProducerService rocketMqProducerService;
@GetMapping("/test")
public Object test() {
rocketMqProducerService.sendMsg("我是一个测试");
return null;
}
}
启动服务,初始化配置成功!
请求服务: localhost:8098/test
可以看到消息发送成功,并接受成功!
本文参考: https://blog.csdn.net/alan_liuyue/article/details/86645887
更多推荐
已为社区贡献1条内容
所有评论(0)