消息中间件RocketMQ
1 MQMQ全称为Message Queue,即消息队列;RabbitMQ(Messaging that just works — RabbitMQ)由erlang语言开发,基于AMQP协议实现的消息队列;常见的其它消息队列 :ActiveMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis(也可做消息队列)1.1 消息队列“消息队列(Message Queue)”是在消息的
1 MQ 消息队列
MQ全称为Message Queue,即消息队列;RabbitMQ (Messaging that just works — RabbitMQ)由erlang语言开发,基于AMQP协议实现的消息队列;
思考:为什么需要用到消息队列呢?它有什么好处呢?
传统模式
- 账号注册完成之后,发送短信,再发送邮件;整个流程都是主线程执行,一个花了150ms。
- 缺点:代码臃肿、执行时间相对长、如果后期新增发微信,则不好扩展。
多线程模式
- 执行完成主业务(账号注册),则通过创建子线程来处理次要业务。它只需要花50ms即可给用户响应。
- 优点:响应速度比较快
- 缺点:1)代码还是臃肿;2)需要扩展性不好,比如后期新增发送微信功能还得改源代码;3)需要实现容错比较复杂,比如子线程处理错误,则怎么实现补偿机制;4)子线程和主业务代码在同一台服务器,如果并发量很高的情况,其实性能还是受限。
消息队列模式
- 执行完成主业务(账号注册),则发送消息给消息队列,由消息队列的消费端监听并处理后续业务。它只需要花50ms即可给用户响应。
- 优点:1)响应速度比较快;2)业务彻底解耦,编码变的更加简单;3)由于服务都是独立部署在不同服务器,所以性能更加的高。4)需求扩展性好,比如后期新增发送微信功能,则只需要创建一个微信服务监听消息队列即可。5)可以灵活的处理容错性。
- 缺点:架构变的更加复杂了;服务数量比较多。
常见的其它消息队列 :ActiveMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis(也可做消息队列)
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
语言 | Java | Erlang | Java | Scala |
吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
速度 | 毫秒 | 微秒 | 毫秒 | 毫秒 |
集群 | 主从架构 | 主从架构 | 分布式架构 | 分布式架构 |
场景 | - | - | - | 大数据实时处理 |
- 性能排序:Kafka>RocketMQ>RabbitMQ>ActiveMQ
- 部署难度:ActiveMQ<RabbitMQ<Kafka<RocketMQ
消息队列的优势:
1)解耦:系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可
2)异步:如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能
3)削峰:如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。
2 RocketMQ
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。
NameServer:主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
Broker:负责存储消息,转发消息。单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer。
2.1 环境安装
1)单集群启动
# 启动NameServer
nohup sh bin/mqnamesrv &
# 验证Name Server 是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log
NameServer成功启动后,我们启动Broker和Proxy。
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
tail -f ~/logs/rocketmqlogs/proxy.log
关闭服务器
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
2)管控台
https://github.com/apache/rocketmq-dashboard
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
2.2 rocketmq
2.2.1 生产者与消费者
1)生产者
public class Send {
public static void main(String[] args) throws Exception{
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producer-demo");
//连接nameserver
defaultMQProducer.setNamesrvAddr("192.168.67.5:9876");
//启动生产者
defaultMQProducer.start();
//设置消息发送的目的地
String topic = "producer-demo";
//发送消息
for(int i=0 ; i<10 ; i++){
Message msg = new Message(topic,("rocketmq消息 " + i).getBytes(StandardCharsets.UTF_8));
//发送到broker
SendResult send = defaultMQProducer.send(msg);
System.out.println(i + "发送状态:" + send.getSendStatus());
}
//关闭资源
defaultMQProducer.shutdown();
}
2)消费者
public class Custom {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer01");
defaultMQPushConsumer.setNamesrvAddr("192.168.67.5:9876");
//设置订阅主题
defaultMQPushConsumer.subscribe("producer-demo","*");
//设置消息监听器
defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
String s = new String(messageExt.getBody(), StandardCharsets.UTF_8);
System.out.println("thread: " + Thread.currentThread() + ",content: " + s );
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
defaultMQPushConsumer.start();
}
}
2.2.2 消息消费模式
(1)集群模式
一个ConsumerGroup中的Consumer实例平均分摊消费消息。
(2)广播模式
一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次。
2.3 SpringBoot配置
2.3.1 生产者
2.3.2 消费者
@Component
@RocketMQMessageListener(consumerGroup = "tConBootGroup",topic = "TestTopic")
public class TestTopicListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String mess = new String(messageExt.getBody(), Charset.defaultCharset());
System.out.println("接受消息:" + mess);
}
}
2.3.3 异步消息发送
2.3.4 负载均衡测试
2.3.5 顺序消息发送
2.3.6 TAG过滤
2.3.7 SQL92过滤
@RocketMQMessageListener(consumerGroup = "tConBootGroup",topic = "TestTopic",selectorType = SelectorType.SQL92,selectorExpression = "age >= 96 and grade >95")
3 ActiveMQ
ActiveMQ主要有两种模式的消息,分别是Queue(队列)和Topic(订阅)。
- Queue,每个消费者消费的数据不一样,一个消息只能被一个消费者消费
- Topic,每个消费者消费的数据是一样的
3.1 安装
#1.解压
tar -zxvf apache-activemq-5.12.0-bin.tar.gz#2.启动
cd apache-activemq-5.12/bin
./activemq start#3.停止
./activemq stop#4.查看状态
./activemq status
ActiveMQ有两个核心端口分别是8161(后台系统的端口),61616(客户端通讯端口)
启动完ActiveMQ之后,可以在浏览器登录后台系统:
http://ip地址:8161,账号和密码都是admin
3.2 消息提供者
第一步:导入坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
第二步:配置文件
spring.activemq.broker-url=tcp://192.168.1.8:61616
spring.activemq.password=admin
spring.activemq.user=admin
第三步:发送Queue消息
@Component
public class Producer {
@Autowired
private JmsMessagingTemplate jmt;
public void send(String content) {
//注意:项目启动的时候,会在后台自动创建queue名称
jmt.convertAndSend(new ActiveMQQueue("队列名称"),content);
}
}
第四步:发送Topic消息
@Component
public class Producer {
@Autowired
private JmsMessagingTemplate jmt;
public void send(String content) {
//注意:项目启动的时候,会在后台自动创建topic名称
jmt.convertAndSend(new ActiveMQTopic("sample.topic"),content);
}
}
提示:SpringBoot默认发送Queue消息
#如果为True,则是Topic;如果是false或者默认,则是queue
spring.jms.pub-sub-domain=true
3.3 消息消费者
第一步:导入坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
第二步:配置文件
spring.activemq.broker-url=tcp://192.168.1.8:61616
spring.activemq.password=admin
spring.activemq.user=admin
第三步:监听消息
@Component
public class Consumer {
@JmsListener(destination = "队列名称")
public void receiveQueue(String text) {
System.out.println(text);
}
}
提示:SpringBoot可以同时监听Queue和Topic消息,不需要任何配置
注意:
消息队列中间件宕机了,重启之后会不会丢失数据呢?
-
首先,ActiveMQ默认有三种持久化方式,分别是内存持久化、日志持久化和JDBC持久化,内存持久化是不安全的;日志和JDBC则是安全的;ActiveMQ默认是日志持久化。
-
Queue重启之后不会丢失数据,直到消费被消费成功才被删除;Topic无法再消费之前的消息,所以重启会丢失。
消息队列会不会出现重复消息的问题?什么情况会导致重复消费?
-
正常情况下,多个消费端监听同一个queue,同一条消息不会被多个消费端同时消费,只有一个消费端能消费成功。
-
但是,如果由于网络原因,消息队列可能无法及时收到消费端的应答,会重发消息,这样的话消费端相当于消费了两次。
-
解决方案:消费端每次接受消息的时候,先根据msgId去Redis查询是否存在;如果存在则不处理;如果不存在,则把消息ID存储到Redis。
什么是死信队列?它有什么作用?
- 消息被消费失败的时候,则消息队列默认重发6次,如果还是失败则把该消息加入到死信队列里面;或者消息过期也会被加入死信队列。
- 默认的私信队列是ActiveMQ.DLQ,一般为了方便管理,针对每个队列都创建一个相应的死信队列。
- 为了数据的一致性,可以对死信队列进行监听,进行数据的补偿处理
更多推荐
所有评论(0)