消息队列MQ
消息队列概述消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQmq的使用场景1.应用解耦2.异步处理3.流量削锋4.消息通讯(实时消息)5.日志处理(待应用验证)参考资料:1.MQ的使用场...
一、基础概念
1.什么是消息系统?
消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶.
或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。
2. 消息队列概述
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。
3. 事件驱动设计
事件驱动架构(EDA – Event Driven Architecture)。使用 Broker 方式,服务间通过交换消息来完成交流和整个流程的驱动。
如下图所示,这是一个订单处理流程。下单服务通知订单服务有订单要处理,而订单服务生成订单后发出通知,库存服务和支付服务得到通知后,一边是占住库存,另一边是让用户支付,等待用户支付完成后通知配送服务进行商品配送。
每个服务都是“自包含”的。所谓“自包含”也就是没有和别人产生依赖。而要把整个流程给串联起来,我们需要一系列的“消息通道(Channel)”。各个服务做完自己的事后,发出相应的事件,而又有一些服务在订阅着某些事件来联动。
事件驱动方式的好处至少有五个。
- 服务间的依赖没有了,服务间是平等的,每个服务都是高度可重用并可被替换的。
- 服务的开发、测试、运维,以及故障处理都是高度隔离的。
- 服务间通过事件关联,所以服务间是不会相互 block 的。
- 在服务间增加一些 Adapter(如日志、认证、版本、限流、降级、熔断等)相当容易。
- 服务间的吞吐也被解开了,各个服务可以按照自己的处理速度处理。
我们知道任何设计都有好有不好的方式。事件驱动的架构也会有一些不好的地方。
- 业务流程不再那么明显和好管理。整个架构变得比较复杂。解决这个问题需要有一些可视化的工具来呈现整体业务流程。
- 事件可能会乱序。这会带来非常 Bug 的事。解决这个问题需要很好地管理一个状态机的控制。
- 事务处理变得复杂。需要使用两阶段提交来做强一致性,或是退缩到最终一致性。
4.消息队列架构设计要点
mq使发送方的服务和接收方的服务最大程度地解耦。但是所有人都依赖于一个总线,所以这个总线就需要有如下的特性:
- 必须是高可用的,因为它成了整个系统的关键;
- 必须是高性能而且是可以水平扩展的;
- 必须是可以持久化不丢数据的。
二、消息队列
1 消息队列出现背景
互联网业务的一个特点是“快”,这就要求很多业务处理采用异步的方式。例如,大 V 发布一条微博后,系统需要发消息给关注的用户,我们不可能等到所有消息都发送给关注用户后再告诉大 V 说微博发布成功了,只能先让大 V 发布微博,然后再发消息给关注用户。
传统的异步通知方式是由消息生产者直接调用消息消费者提供的接口进行通知的,但当业务变得庞大,子系统数量增多时,这样做会导致系统间交互非常复杂和难以管理,因为系统间互相依赖和调用,整个系统的结构就像一张蜘蛛网,如下图所示。
消息队列就是为了实现这种跨系统异步通知的中间件系统。消息队列既可以“一对一”通知,也可以“一对多”广播。以微博为例,可以清晰地看到异步通知的实现和作用,如下图所示。
对比前面的蜘蛛网架构,可以清晰地看出引入消息队列系统后的效果:
- 整体结构从网状结构变为线性结构,结构清晰。
- 消息生产和消息消费解耦,实现简单。
- 增加新的消息消费者,消息生产者完全不需要任何改动,扩展方便。
- 消息队列系统可以做高可用、高性能,避免各业务子系统各自独立做一套,减轻工作量。
- 业务子系统只需要聚焦业务即可,实现简单。
2. 消息队列的两种模式
JMS规范目前支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)
1 点对点模式
点对点模式下包括三个角色:
- 发送者 (生产者)
- 消息载体。即消息队列
- 接收者(消费者)
消息发送者生产消息发送到queue中,然后消息接收者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储(只针对于此种模型),所以消息接收者不可能消费到已经被消费的消息。
生产者发送一条消息到queue,一个queue可以有很多消费者,但是一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者,所以Queue实现了一个可靠的负载均衡。
2.发布/订阅模式
发布/订阅模式下包括三个角色:
- 角色主题(Topic)
- 发布者(Publisher)
- 订阅者(Subscriber)
发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
发布/订阅模式特点:
1.每个消息可以有多个订阅者;消息以广播的方式发送,即每个订阅者都会收到一样的消息。
2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
3.为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;
3. 消息队列使用的利弊分析
1.使用消息队列的优点
异步、解耦、消峰这是消息队列最大的优点
1.应用解耦
如从上面的蜘蛛网架构模型,转换成了线型架构
2.异步处理
提升了核心业务的吞吐量和性能。
如下图不使用mq,系统的性能将急即下降。
3.流量削锋
防止消费者单体流量过大,导致系统宕机
如下面架构:
由于突然出现的请求峰值,导致系统不稳定的问题。使用mq后,能够起到消峰的作用。
订单系统接收到用户请求之后,将请求直接发送到mq,然后订单消费者从mq中消费消息,做写库操作。如果出现请求峰值的情况,由于消费者的消费能力有限,会按照自己的节奏来消费消息,多的请求不处理,保留在mq的队列中,不会对系统的稳定性造成影响。
4.实时分析
多在Kafka中使用到,用于实时计算
5.日志处理
多在Kafka中使用到,如一些日志平台ELK,业务消费binlog等
2.使用消息队列的缺点
缺点主要在于系统的可用性、复杂性、一致性问题
1.高可用问题
中间件固有的可用性问题,引入了一套中间件,就会对其进行技术维护,如果mq挂了或出现其他故障问题,就影响整个系统。
2.一致性问题
一条消息由多个消费者消费,万一有一个消费者消费失败了,就会导致数据不一致。如A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,咋整?你这数据就不一致了,所以可能带来使用分布式事务的解决方案。
常见模型如下图所示
消费端任何一个服务消费失败,对于整个链路来说数据是不一致的。如下订单成功了,但积分没有加上等。
解决方案:
1.一般使用重试等补偿策略,达到最终一致性。
2.关于重试的方案,一般使用异步重试,因为如果出现网络异常,同步重试可能会导致大量的消息不断重试,影响消息读取速度,造成消息堆积。
3.采用异步重试,在消费者处理失败之后,立刻写入重试表,有个job专门定时重试。还有一种做法是,如果消费失败,自己给同一个topic发一条消息,在后面的某个时间点,自己又会消费到那条消息,起到了重试的效果。如果对消息顺序要求不高的场景,可以使用这种方式。
3.复杂性
这里说的系统复杂度和系统耦合性是不一样的,比如以前只有:系统A、系统B和系统C 这三个系统,现在引入mq之后,你除了需要关注前面三个系统之外,还需要关注mq服务,需要关注的点越多,系统的复杂度越高。
需要考虑一些消息队列的常见问题和解决方案,以及它的实现原理。有一定的学习成本,需要额外部署mq服务器,而且有些mq比如:rocketmq,功能非常强大,用法有点复杂,如果使用不好,会出现很多问题。有些问题,不像接口调用那么容易排查,从而导致系统的复杂度提升了。
4.硬件成本问题
为了实现高可用,将需要部署多台服务器。并且会占用不少硬件资源。
3. 消息队列消息删除机制
1.rocketmq删除机制
参考我的另一篇博文
https://blog.csdn.net/sinat_34814635/article/details/112366502
2.Kafka删除机制
参考我的另一篇博文
https://blog.csdn.net/sinat_34814635/article/details/111460317
思考
2.4 主流消息队列中间间对比
名词解释
什么是消息延迟?
消息队列在消费过程中大量堆积就是消息延迟,也就是消费的频率跟不上生产。比方说,生产者向队列中一共生产了1000条消息,某一个消费者消费进度是900条,那么这个消费者的消费延迟就是100条消息。
总结:
1.吞吐量维度:RocketMQ和Kafka都支持10万级级别以上的,比较适合大数据类的系统来进行实时数据计算、日志采集等场景。其RocketMQ的topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降,但下降幅度要明显小于Kafka。
2.消息延迟维度:rabbitmq为微秒级,这是rabbitmq的一大特点,延迟是最低的
3.消息重复:Kafka的最大劣势就是可能造成消息重复。但对应日志采集应用来说,这一点影响可以忽略不计。
三、消息队列应用
1. mq典型应用架构
以电商交易下单的场景来说,正向交易的过程可能涉及到创建订单、扣减库存、扣减活动预算、扣减积分等等。每个接口的耗时如果是100ms,那么理论上整个下单的链路就需要耗费400ms,这个时间显然是太长了。
如果这些操作全部同步处理的话,首先调用链路太长影响接口性能,其次分布式事务的问题很难处理,这时候像扣减预算和积分这种对实时一致性要求没有那么高的请求,完全就可以通过mq异步的方式去处理了。
同时,考虑到异步带来的不一致的问题,我们可以通过job去重试保证接口调用成功(也可以利用mq自带的消息重试功能进行重复消费),而且一般公司都会有核对的平台,比如下单成功但是未扣减积分的这种问题可以通过核对作为兜底的处理方案。
使用mq之后我们的链路变简单了,同时异步发送消息我们的整个系统的抗压能力也上升了。
2. mq可视化运维平台
mq都应该有一个可视化的运维平台,能查看到消息的生产和消费情况,并且统计消息的堆积情况。对应消息的堆积上线还应该要有对应的报警机制,这样才敢引入一套mq消息中间件。
3 消息队列常见问题
1.消息丢失
2.消息堆积
3.顺序消费
4.重复消费(幂等性)
1 消息丢失
同样消息丢失问题,也是mq中普遍存在的问题,不管你用哪种mq都无法避免。
有哪些场景会出现消息丢失问题呢?
1.消息生产者发生消息时,由于网络原因,发生到mq失败了。
2.mq服务器持久化时,磁盘出现异常
3.kafka和rocketmq的offset被回调时,略过了很多消息。
4.消息消费者刚读取消息,已经ack确认了,但业务还没处理完,服务就被重启了。
导致消息丢失问题的原因挺多的,生产者、mq服务器、消费者 都有可能产生问题,我在这里就不一一列举了。最终的结果会导致消费者无法正确的处理消息,而导致数据不一致的情况。
解决方案
不管你是否承认有时候消息真的会丢,即使这种概率非常小,也会对业务有影响。生产者、mq服务器、消费者都有可能会导致消息丢失的问题。
1.增加数据表解决消息丢失问题
为了解决这个问题,我们可以增加一张消息发送表,当生产者发完消息之后,会往该表中写入一条数据,状态status标记为待确认。如果消费者读取消息之后,调用生产者的api更新该消息的status为已确认。有个job,每隔一段时间检查一次消息发送表,如果5分钟(这个时间可以根据实际情况来定)后还有状态是待确认的消息,则认为该消息已经丢失了,重新发条消息。
这样不管是由于生产者、mq服务器、还是消费者导致的消息丢失问题,job都会重新发消息。
2.只记录发送失败回调日志,然后在定期重试,这种方案只会解决生产者消息丢失问题无法解决消费端消息丢失问题,但系统吞吐量要大一些
2.5.1.1消息丢失-RabbitMQ
1.生产者端:丢失了消息。
产生原因:生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,因为网络啥的问题如超时等都有可能。
解决方案:
1.使用RabbitMQ提供的事务功能,但缺点是吞吐量会下降,性能低
2.confirm机制(推荐使用)
两者比较:事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ接收了之后会异步回调你一个接口通知你这个消息接收到了。如果没有通知到最后在重新发送即可。
2.RabbitMQ服务:丢失了消息。
产生原因:消息发送到mq后,mq自己挂了
解决方案:配置持久化,这样mq重启后,就可以重新加载到内存里面。但区间未持久化的消息还是无能为力
3.消费端丢失了消息
产生原因:消费端处理过程中,进程挂了
解决方案:使用RabbitMQ提供的ack机制
2.5.1.2消息丢失-Kafka
1.消息发送端:
产生原因:kafka某个broker宕机,然后重新选举partiton的leader时。大家想想,要是此时其他的follower刚好还有些数据没有同步,结果此时leader挂了,然后选举某个follower成leader之后,他不就少了一些数据?这就丢了一些数据啊。并且此时设置ack为0或者1时。
解决方案:设置了ack=all,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
2.消息队列kafka自己弄丢了数据:
产生原因:同发送端产生原因一样,也是在broker同步阶段,leader挂了,follower未同步完。
解决方案:
此时一般是要求起码设置如下4个参数:
- 给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本。
- 在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧。
- 在producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了。
- 在producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。当然实际情况不太可能让它无限重试,推荐的方法是使用本地自己的补偿机制。
3.消费端弄丢了数据
产生原因:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据 丢失了,下次也消费不到了。
解决方案:
将自动提交offset,改为手动提交offset,就可以保证数据不会丢。但是会引来新的问题那就是可能出现重复消费。比如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
2.5.1.3消息丢失-rocketmq
1.消息发送端:
产生原因:
生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。
解决方案:
由于同步发送的一般不会出现消息丢失,但性能是最差的,所以我们就不考虑同步发送的问题。
我们基于异步发送的场景来说。
异步发送分为两个方式:异步有回调和异步无回调。无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。
以下单的场景举例。
1.下单后先保存本地数据和MQ消息表,如果本地事务失败,那么下单失败,事务回滚。
2.下单成功,直接返回客户端成功,异步发送MQ消息,这时候消息的状态是发送中。
3.MQ回调通知发送端消息发送结果,对应更新数据库MQ发送状态。
4.JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试。
5.在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入。
一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。
2.消息队列自己弄丢了数据
产生原因:
如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。
另一种情况是在主从同步时,主节点挂了还未同步到从节点。
解决方案:
1.刷盘机制
RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了。可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。
虽然我们可以通过配置的方式来达到MQ本身数据不丢失的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。
2.主从备份
broker要等到主从同步完成之后,在向produces返回ack消息
3.消费端弄丢了数据
产生原因:
在消费端自动提交ack确认模式下,消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。
解决方案:
RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。,但有有可能会带来另一个问题消费端重复消费。
消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理等方式保证消息的不丢失。
思考
1.如果mq中间件宕机了,如何保证消息的不丢失?
可以将发送失败的消息放进缓存,然后启动定时任务检查未发送成功的消息,将其发送到mq中,如果发送成功则删除掉缓存中对应的消息。
如果要实现降级方案,请参考我的另一篇文章
分布式事务 https://mp.csdn.net/mp_blog/creation/editor/114491784
2.5.2 重复消费(幂等性)
任何mq产品,再处理实际业务中都应该考虑幂等,因为重复消费问题,原则上无法100%保障。
有哪些场景会出现重复的消息呢?
1.消息生产者产生了重复的消息
2.kafka和rocketmq的offset被重置到过去的消费点了。
3.消息消费者确认失败。
如消费业务异常,很有可能需要研发手动重试或者指定消费点offset重试。消息消费者确认时超时了,又重新消费了之前的消息
4.业务系统主动发起重试
ack确认机制下,重复消费的原因主要有以下几种
1.开启ack,消费者消费完成后,提交ack时服务宕机了,重启后还是从原来的offset重新消费
2.开启ack,网络原因迟迟未收到ack确认,时间上已经收到消息,然后客户端又重发了消息,导致消息重复
解决思路
不管是由于生产者产生的重复消息,还是由于消费者导致的重复消息,我们都可以在消费者中解决这个问题。这就要求消费者在做业务处理时,要做幂等设计
2.5.2.1重复消费-RabbitMQ
产生原因:消息重复消费的主要原因在于回馈机制(RabbitMQ是ack,Kafka是offset),在某些场景中我们采用的回馈机制不同,原因也不同,
例如消费者消费完消息后回复ack, 但是刚消费完还没来得及提交系统就重启了或者挂了,这时候上来就pull消息的时候由于没有提交ack或者offset,消费的还是上条消息。
解决思路:实际上我们只要保证多条相同的数据过来的时候只处理一条或者说多条处理和处理一条造成的结果相同即可。
解决方案:
1.数据库做唯一约束,主要为新增或者更新的消息
2.使用分布式锁
2.5.2.2 重复消费-Kafka
1.消息发送端
产生原因:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息
解决方案:
1.取消重试机制,但这样可能造成消息丢失,故不可取
2.消费端做幂等控制
2.消息消费端
产生原因:
如果消费这边配置的是手动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重 复处理
解决方案:
一般消费端都是要做消费幂等处理的。
2.5.2.3 重复消费-rocketmq
和Kafka一致。
2.5.3 消息堆积
产生原因:
1.消费者的速度大大慢于生产者的速度,速度不匹配引起的堆积
如下图所示
- 如果分区数小于同一个消费组下的实例,说明消费者有空闲的,则增加分区数,让空闲的消费者去消费
- 适当控制生产者的发送速度(看具体发送端的业务排查,如生产端每天发送全量的数据导致每天堆积几十万数据肯定是不行的)
2.消费者实例IO严重阻塞或者消费者所在服务器宕机(消费端扩容解决)
3.消费者业务处理异常不断重试导致的消息堆积
- 设置重试次数,超过阀值,则转发到其他队列或丢入死信队列
消费者消费异常的解决思路:
我们可以从以下几个角度来考虑:
1.消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费
2.如果时间来不及处理很麻烦,做转发处理。
借鉴rocketmq重试队列的思想
写一个临时的consumer消费方案,先把正常消息消费,然后消费异常的消息再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息。
等处理完积压数据后,修复consumer,去订阅消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状,即停止将消息转发到新的topic。
如果线上环境需要立刻马上处理,则直接进行转发。下面的流程没怎么看懂待定?????
- 先修复consumer的问题,确保其恢复消费速度,然后将现有有异常cnosumer都替换停掉。如果是consumer的程序出现异常问题,直接丢入死信队列即可,直接跳过后面的步骤。后续在做异常分析,
- 新建一个topic,partition是原来的10倍
- 然后写一个临时的处理分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。
- 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。 这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息。
消息堆积kafka
1.线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息?
使用分区扩容思路
此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。
2.由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息?
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。
思考
1.mq消费异常为何会引起消息堆积?
因为mq的特性如果消费者返回的消费结果为失败,会进行失败重试,消息的消费位置将保持不变,后面的消息将无法被消费到,造成消息积压。
2.如果消息积压达到磁盘上限,消息被删除了怎么办??
首先这种概率很小。
我们可以将发送的消息记录落库保存,然后就和消费方的记录去做对比,只是过程会更艰难一点。
3.顺序消费如何处理积压消息???
4.如何提升单机的消费能力
1.网上的一些博客会推荐使用线程池的方式异步处理,如下图所示
这样就能增加业务逻辑处理速度,解决消息堆积问题。此时线程池的核心线程数和最大线程数需要合理配置,不然可能会浪费系统资源。
但是,在项目实践中,不推荐使用这种方式,如消息产生的过快,会很快消费掉系统的线程资源,使系统中的其他业务可能受到影响。所以最好的方式就是增加分区,多个消费者去消费消息,这是最好的办法但会有一定的硬件成本。
2.5.4 顺序消费
有些业务数据是有状态的,比如订单有:下单、支付、完成、退货等状态,如果订单数据作为消息体,就会涉及顺序问题了。如果消费者收到同一个订单的两条消息,第一条消息的状态是下单,第二条消息的状态是支付,这是没问题的。但如果第一条消息的状态是支付,第二条消息的状态是下单就会有问题了,没有下单就先支付了?
问题是一个非常棘手的问题,比如:
- kafka同一个partition中能保证顺序,但是不同的partition无法保证顺序。
- rabbitmq的同一个queue能够保证顺序,但是如果多个消费者同一个queue也会有顺序问题。
- 如果消费者使用多线程消费消息,也无法保证顺序。
- 如果消费消息时同一个订单的多条消息中,中间的一条消息出现异常情况,顺序将会被打乱。
解决思路
消息顺序问题是我们非常常见的问题,我们以kafka消费订单消息为例。订单有:下单、支付、完成、退货等状态,这些状态是有先后顺序的,如果顺序错了会导致业务异常。
解决这类问题之前,我们先确认一下,消费者是否真的需要知道中间状态,只知道最终状态行不行?
其实很多时候,我真的需要知道的是最终状态,这时可以把流程优化一下:
这种方式可以解决大部分的消息顺序问题。
2.5.4.1顺序消费-RabbitMQ(待续)
产生原因:
一个queue,多个consumer,这不明显乱了。
2.5.4.2 顺序消费-kafka
1.重试造成的原因
产生原因:
如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第 一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。
解决方案:
是否一定要配置重试要根据业务情况而定。
1.可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送 端到消费端全链路有序。但吞吐量将大大减少,一般不推荐做。
2.根据具体的业务场景,如订单更改状态要求有序,在发送的订单消息body中增加版本号字段,依次按版本号递增,由消费端根据版本自己控制顺序。
2.mq机制造成的原因
参考我的另一篇文章kafak之顺序消费机制。
https://blog.csdn.net/sinat_34814635/article/details/111460317
2.6 常用模型
2.6.1 延时队列
使用场景
延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者 才能获取这个消息进行消费。
延时队列的使用场景有很多, 比如 :
1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,一般是取消订单释放库存。 这时就可以使用延时队列来实现这个功能了。
即支付订单的消息要在30分钟后,消费端才可以查询订单的状态,并进行后续流程处理。
2)订单完成1小时后通知用户进行评价。
3)某项业务处理的时间较长,但后面一项的业务又依赖于前一个业务的处理结果,此时可以使用延迟队列。
2.6.2 死信队列
死信队列用于处理无法被正常消费的消息,此时消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
他的主要作用是将异常消息单独拆分出去,从而不影响正常队列的消费,可以有效避免消息堆积等问题。
参考资料:
1.MQ的使用场景 https://www.cnblogs.com/HigginCui/p/6478613.html
2.RocketMQ 实战之快速入门 https://www.jianshu.com/p/824066d70da8
3.消息队列常见问题和解决方案 https://blog.csdn.net/qq_36236890/article/details/81174504
4.RocketMQ面试题 https://blog.csdn.net/QGhurt/article/details/114630705
更多推荐
所有评论(0)