在这里插入图片描述

消费者都有哪些参数

1、消费超时时间(15分钟,不要将太大任务通过消费者执行,容易造成消息挤压);2、单个consumer实例最大消费次数;3、消费线程数(10);4、单个消费实例最大缓存消息输,如果一个消息同时监听多个topic,单个topic最大缓存消息数要做个平均;5;单个消费支持最大缓存消息大小。

1 为什么使用MQ?MQ的优点

消息队列的主要作用是肖峰、异步和解耦。缺点提高了系统整体复杂性,引入更多外部依赖系统,使系统的稳定性降低。一个系统同时通过消息队列依赖ABC三个系统,其中一个系统出现异常或者宕机,业务都无法完成执行,同时消息队列出现问题也会引起系统无法正常工作,MQ需要保证高可用。

2 如何保证高可用的?

主从复制+多集群+消息持久化。
(1)RocketMQ集群部署方式:多master-多slave异步复制;多master-多slave同步双写;Dledger模式。

dledger模式拓扑结构
在这里插入图片描述

优点是:创建多个集群,方便水平扩展,不易造成存储瓶颈,单集群采用主从复制,提高集群可靠性,采用注册发现,提高扩展性。其中master的broker id=0,slave 的broker id > 0。

NameServer主要用于Broker的注册和发现,维护topic和broker之间的关系,并通过心跳机制检测Broker的健康状态,每个NameServer独立,具有所有broker信息,每创建一个topic都需要同步所有NameServer。

Broker负责消息的存储、传递和查询,Broker主动向NameServer发送心跳信息时,会带上所有的负责的topic信息,可以达到几十M,网络差的情况,可能导致发送心跳失败。
(2)RabbitMQ集群:普通集群模式+镜像集群模式。
普通集群模式
在这里插入图片描述
缺点:队列只放在一个节点上,其他节点存储了队列的元数据,当consumer向非队列所在节点请求是 ,需要先从队列所在节点拉取数据。队列所在节点宕机后,整个集群瘫痪,无法对外提供服务。
镜像集群模式:
在这里插入图片描述

优点:每个节点上都有全量消息,即使一个节点宕机,依然能够对外提供服务。缺点:同步消息对网络带宽很冲,依然没有解决单个队列数据过多的压力,不能方便进行水平扩展。
(3)kafaka集群
在这里插入图片描述
优点:方便进行水平扩展,并且可以通过副本,增加每个partition的高可用。kafaka、RabbitMQ和RocketMQ三者集群都是通过,主从和多节点方式实现了高可用。主要区别在Kafafa的方便水平扩展。卡法卡将一个topic消息拆分为多个partition实现了负载均衡。RocketMQ通过NameServer控制路由规则,实现了复杂均衡。

如何实现定时消息

rockmq5.x版本及以后,使用的时间轮,理论可以支持任意时间的时间定时消息。时间粒度最小是秒,默认时间轮卡槽长度支持一周的内的定时消息发送7x24x3600. 这个版本之前,只支持8个粒度消息
定时任务使用场景:一般数据库写入之后,我门会增加2s的延迟,保证下游服务在消费时,数据已经ready(数据主从延迟)。还有下单30分种不完成支付,直接取消订单。

3 如何保证RocketMQ消息的可靠传输(不丢失)

消息可能在生产阶段、存储阶段和消费阶段发生丢失。

生产阶段:消费者根据从nameServer获得topic路由信息,同步方式、异步方式和单向发送信息的方式向生产者发行消息,同步方式和异步方式,都会收到broker的确认消息,没有收到确认信息的生产者会进行失败重试保证消息成功发送到broker,默认重试两次。

存储阶段:存储阶段可能发生broker非正常关闭,操作系统异常,断电,磁盘异常。对于前三中情况,通过消息持久化,重启恢复可以解决,对于自盘异常,RocketMQ提供Delger集群部署,一个实例出现问题,选择新的从节点作为新的实例。消息持久化过程需要进行大量I/O,需要根据系统可靠性和性能之间进行权衡,例如对下单消息扣减库存重要消息进行持久化操作,对秒杀信息即使丢失部分也无所谓,只要参与活动的商品能被部分用户全部获得就行。消息写入自盘过程为producer-》direct memory-》pagecache-》disk。
在这里插入图片描述

消费阶段:第一、消费阶段也是使用了消息确认机制保证消息至少被消息一次(At least once)。第二、即使超过重试次的消息会放入死信队列,需要恢复的时候,可以使用rocketMQ提供的接口从死信队列中读取失败的消息,保证了消息的消费的可靠性。第三、Broker在想消费端成功投递消息后,依然会将消息保留一个小时,当业务消费消息有问题修复后,只要消息还没有过期,RocketMQ broker提供了一种按照时间回退消费进度,重新消费一小时前的消息,该中机制叫做消息回溯机制。

可以再springBoot中引入rocketmq-spring-boot-starter。虽然在RocketMessageListener的OnMessage方法返回值是void,但是底层调用类的DefaultRocketMQlistenerContainer中实现了MessageListenerConcurrently和MesssageListenerOrderly中consumeMessage向broker进行消息消费成功或者失败进行回复。

4 如何保证消息不重复消费(幂等性)

消息的可靠性传输,可能会引起生产者重复发送两条相同消息或者投递消息重复,这两种都会引消息的重复消费,前者是由于网络抖动原因造成生产者没有接收到消费者确认消息,后者是消费者没有回复消息给Broker。MQ中允许存在重复的消息,消息重试非常常见,可以幂等性来保证消息不被重复消费,而幂等性保证需要业务方实现。

业务上常见的实现接口接口幂等性的操作有两种:

第一种:状态判断,将消费数据存储在redis中,下次消费消息时,先查询redis,不存在,消费;存在,直接丢弃。

第二种:业务判断法,利用数据库记录的主键唯一性,每次要插入新的数据时,先判断记录是否存在,不存在再进行插入。

5 如何保证RocketMQ消息的顺序性?

MQ消息乱序对最终结果影响较大的操作常见的两种场景:

第一种,一个队列+多个消费者,消费者消费消息顺序有向后要求。

第二种,一个队列+一个消费者,当消费信息量很大是,消费者可能会使用多线程技术。

首先需要使用MQ中保证消息有序的场景比较少,MQ中的消息一般都是乱序,乱序对程序最后执行业务没有影响,或者影响可以忽略不计。第二,商品下单,付款、发货、签收这种顺序操作一般也是使用业务系统保证有序性。遇到需要使用有序队列的场景,首先考虑能不能再业务上进行保证有序,不能的话,考虑创建多个消息队列,一个消息队列对应一个消费者。

6 有几百万消息持续积压几小时,说说怎么解决?如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?

(1)几百万消息积压问题解决办法:
一般做法是,对消息队列和消费者进行临时扩容,扩容消息队列是防止消息溢出丢失,扩容消费者是快速将积压的消息消耗掉。具体步骤分为以下五步:

1 先修复消费者,并将线程有问题的消费者都消耗掉。

2 临时建立多个queue,具体数量根据积压消息数/每个消息队列的容量。

3 写一个新的消费者,功能是将积压的消息从单个队列中写到临时创建的队列中。

4 将修复好的consumer部署在多个实例,每个consumer对应一个队列,直到临时队列中的消息消费完。

5 恢复原来的部署
(2)消息过期之后,消息丢失解决办法:
从私信队列中获得超时过期消息,手动重新写回MQ。
(3) 队列快满了解决办法:
1 写一个临时消费者,不做任何业务处理,快速消耗队列中消息。
2 等队列空闲的时候,手动将消息写回队列。

7 消息传输过程

以RabbitMQ为例,介绍五种消息模型。RabbitMQ有四个重要概念:

virtual hosts:虚拟主机是rabbitMQ的逻辑隔离技术,每个虚拟主机有自己的exchanger、queue和用户权限设置,生产者和消费者,不同虚拟主机实现了消息的绝对隔离。

exchanger:交换机,主要用于消息订阅模式中,实现了生产者和消费者之间的解耦。

queue:队列主要充当消息缓冲区的作用,实现消息的存储。
在这里插入图片描述

7.1 基本消息模型

在这里插入图片描述
生产者发送消息/消费者消费消息过程:

1 创建对虚拟机Virtual hosts的连接。

2 根据连接获取channel.

3 使用channel创建消息队列

4 发送消息/接受消息。

缺点:生产者创建的消息,只被一个消费者消费,无法做到并发处理,消息消费速度慢,尤其处理耗时任务时,容易造成消息积压。

7.2 Worker消费模型

在这里插入图片描述
优点:多个消费者同时消耗队列中的消息,队列中的消息只能被消费者消费一次,可以实现队列中消息分流处理。

默认队列中消息采用round-bin 轮询调度算法进行消息的分配,队列中的已有的消息会一次性投递给消费者,投递之后,会立刻受到消费成功的确认。这种情况有两个不好的点,第一,当消费者消费速度不一致时,消息时平均分配,可能出现一个消费者空闲,另一个消费者压力过大;第二,服务段一次将所有消息投递给消费者,消费者收到消息后自动确认成功,如果消费者的消息没有处理完出现故障,可能造成消息的丢失。

解决办法,设置服务端每次只向消费者投递一个消息,消费者任务执行完成,手动发送消费成功。

模型缺点:基本消息模型和worker模型不能实现消息的广播,当有多个消费者都需要接受到消费者所有消息时,消费者需要将消息同时发送到多个队列中。

7.3 publish/subscribe模式

在这里插入图片描述
消费者直接将消息发送到交换机,不关心具体消息被分发到哪个队列。凡是绑定了交换机的队列,都能够获得一份交换机中路由的消息,实现了消息的广播。缺点:不能实现对某几种信息的订阅。

7.4 routing消息模型

在这里插入图片描述
订阅同一家报社不同专栏新闻。每条消息都有一个路由标志,消息直接发送到交换器,每个队列和交换机绑定是,同时绑定路由标志。缺点:只能根据一个条件进行消息的静态路由。

7.5 topic消息模型

在这里插入图片描述
相比于广播和routing消息模型,topic消息模型实现了消息的动态多条件路由。消息发布和订阅更加灵活。

8 Kafka、ActiveMQ、RabbitMQ、RocketMQ 对比

TYPE activeMQ RabbitMQ RocketMQ kafaka
单机吞吐量 万级 万级 十万级 十万级
topic数对吞吐量影响 千级 百万级别 几千个topic,对机器吞吐量影响较小 在几百,几千个topic,对机器吞吐量影响较大
时效性 ms ums ms ms
消息重复 至少一次 至少一次 至少一次 最多一次 至少一次 最多一次
消息顺序性 有序 有序 有序 分区有序
消息重放 支持 支持 基于消息消费时间计算便宜量 基于消息消费生产时间计算便宜量
一致性保证 支持事务 `支持事务 支持事务,首先发送一个预消息到对应topic 的broker上,整个业务事物成功之后,才会给对broker发送消息,修改消息为正式消息,此时消息才可以为消费者消费,如果业务执行有问题,会给消息队列发送回滚消息,消息队列直接删除这个消息。为了防止业务服务挂掉或通信中断,消息服务会启动一个定时任务,定时查询业务服务事物状态 支持事务
消息过滤 支持 支持 基于tag进行消息过滤,发送者可以在发送消息时为每个消息置指定一个tag;broket直接根据消费者注册指定的tag,对消息在投递之前尽心过滤 需要在业务层,使用topic堆消息进行拆分,或者消费者根据消息实际内容进行过滤,增加了消费者负载和网络传输负载
支持延迟消息 支持 通过插件或者私信队列队列实现 基于时间轮天然实现 不能直接支持,1、应用层,发小消息待上消费时间戳,消费者消费的时间判断,如果还没达到消息设定的时间,重新丢到队列里面,或者暂存;2、先将延时/定时消息暂时存储到topic里面,通过定时任务(Quartz)发送消息到对应topic;3、Kafa steams支持延迟消息
支持主题数 千级别 百万级别 千级 千级别
可靠性 高,基于主从架构实现高可用;消息偶尔会丢失 非常高,分布式;经过参数优化,可以做到0丢失。 高,基于主从架构实现高可用;消息偶尔会丢失 非常高,基于分布式,一个数据多个副本,单个集群宕机,依然可以提供服务;经过参数优化配置,可以做到消息零丢失
社区活跃度 较高 较高
可二次扩展性 功能齐全。Java开发 功能全面,丰富的管理界面;基于erlang开发,做二次开发难度较大 基于Java开发,方便阅读源码,进行定制化开发。支持的语言不多,java,c++(不成熟)。 功能单一,但是吞吐量达,非常适合大数据日志系统。使用Scala开发,源码阅读

吞吐量指的消息队列本身的写入和查询速度,指的是秒级别可以处理消息速度。
时效性指的是消息从消费者到处理者的时间,RabbitMQ最高。
一致性指的是业务册数据更新和发小消息一致,通过事物实现。
消费模式最少一次,通过消费者失败重试机制实现;最多一次模式,rocketmq是通过禁用消费者失败重试& 消费者消费后及时ACK实现的,比如日志场景,如果超过了最大重试次数,会掉到私信队列里面,私信队列是按照groupId进行聚合的,会将一个消费者所有超过最大重试次数的消息放到死信同一个队列里面,私信队列里面的消息超过一定时间没有处理也会被清理掉。
消息有序行,kafka只支持分区有序,其余三个支持全局有序。

ActiveMQ比较老,社区也不再活跃,版本更细周期长,在某些情况下有可能丢失消息,所以一般使用的公司较少。
RabbitMQ社区活跃,版本更新也较快,功能齐全,具有丰富的管理界面,适合数据了量万级的场景。
RocketMQ国内使用较多,Java开发,便于进行功能扩展和问题排查,适合对数据量大可靠性要求高的场景。
Kafaka,吞吐量达是自己的主要特点,为大数据处理开发而设计,适合大数据处理和日志处理系统。

rocket mq和kafka如何选型?
roket mq使用java技术实现,使用java方便扩展,更低运维成本,kafka使用scala语言编写,功能丰富,天然支持延迟消息、消息过滤、千级别topic情况对消息吞吐量营销比较小;kafka消息低延迟、高吞吐。

todo:kafka是如何支持这三个优点的?
1、kafka使用使用了0拷贝技术,可以使用sendFile文件直接从内核拷贝到网络,只经过2次数据拷贝。0拷贝指的是CPU只需发起/结束调用,复制过程不需要CPU参与,大大降低了CPU负载。3、rokcetmq 使用使用mmap技术,消息从磁盘到消费者需要经过3次数据拷贝,阿里内部压测,在相同硬件和内容配置下,kafka比rocket mq快了50%。没有将mmap和sendfile结合使用,会增加复杂书,比如mmap修改数据,如何保证内核数据和网络缓冲区数据一致性。4、rocket mq和kafka都采用批量拉取消息,缓存到本地,较少了消费者和消息队列频繁的通信。

在这里插入图片描述

reference
Kafka为啥比RocketMQ快

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐