1、4种消息中间件比较

特性ActiveMQRabbitMQRocketMQKafka
开发语⾔javaerlangjavascala
单机吞吐量万级万级10万级10万级
时效性ms级us级ms级ms级以内
可⽤性⾼(主从架构)⾼(主从架构)⾮常⾼(分布式架构)⾮常⾼(分布式架构)
功能特性

成熟的产品,

在很多公司得到应⽤;

有较多的⽂档;

各种协议⽀持较好

基于erlang开发,

所以并发能⼒很强,

性能极其好,

延时很低;

管理界⾯较丰富

MQ功能⽐较完备,

扩展性佳

只⽀持主要的MQ功能,
像⼀些消息查询,消息回
溯等功能没有提供,毕竟
是为⼤数据准备的,在⼤
数据领域应⽤⼴。

1.1、消息中间件的使用场景

解耦、异步、削峰

  • 解耦

如果我方系统A要与三方B系统进行数据对接,推送系统人员信息,通常我们会使用接口开发来进行。但是如果运维期间B系统进行了调整,或者推送过程中B系统网络进行了调整,又或者后续过程中我们需要推送信息到三方C系统中,这样的话就需要我们进行频繁的接口开发调整,还需要考虑接口推送消息失败的场景。如果我们使用消息中间件进行消息推送,我们只需要按照一种约定的数据结构进行数据推送,其他三方系统从消息中间件取值消费就可以,即便是三方系统出现宕机或者其他调整,我们都可以正常进行数据推送。

  • 异步

如果我们现在需要同时推送消息到BCD三个系统中,而BCD系统接收到消息后需要进行复杂的逻辑处理,以及读库写库处理。如果一个三方系统进行消息处理需要1s,那我们遍历推送完一次消息,就需要三秒。而如果我们使用消息中间件,我们只需要推送到消息中间件,然后进行接口返回,可能只需要20ms,大大提升了用户体验。消息推送后BCD系统各自进行消息消费即可,不需要我们等待。

  • 削峰

如果某一时间段内,每秒都有一条消息推送,如果我们使用接口进行推送,BCD三个系统处理完就需要三秒。这样会导致用户前端体验较差,而且系统后台一直处于阻塞状态,后续的消息推送接口一直在等待。如果我们使用消息中间件,我们只需要将消息推送至消息中间件中,BCD系统对积压的消息进行相应的处理。

1.2、消息中间件的缺点

  • 系统可用性降低

系统关联的中间件越多,越容易引发宕机问题。原本进行消息推送我们只需要开发接口进行推送即可,引入消息中间件后就需要考虑消息中间件的高可用问题,如果消息中间件出现宕机问题,我们所有的消息推送都会失败。

  • 系统复杂度提高

如果我们使用接口进行消息推送,我们只需要考虑接口超时以及接口推送消息失败的问题。但我们引入消息中间件后,就需要考虑消息中间件的维护,以及消息重复消费,消息丢失的问题。

  • 一致性问题

如果我们使用接口进行消息推送,推送消息我们可以放在事务中处理,如果推送过程中出现异常,我们可以进行数据回滚,但我们引入消息中间件后,就需要考虑消息推送后,消费失败的问题,以及如果我们同时推送消息到BCD系统中,如何保证他们的事务一致性。

2、ActiveMQ

2.1、Activemq 是什么

Activemq 是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。

2.2、Activemq 的作用及原理

Activemq 的作用就是系统之间进行通信,原理就是生产者生产消息, 把消息发送给activemq, Activemq 接收到消息, 然后查看有多少个消费者,然后把消息转发给消费者, 此过程中生产者无需参与。 消费者接收到消息后做相应的处理和生产者没有任何关系。

2.3、Activemq 的通信方式

  • publish(发布)-subscribe(订阅)(发布-订阅方式)

发布/订阅方式用于多接收客户端的方式,作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。

  • p2p(point-to-point)(点对点)

2p的过程则理解起来比较简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。在实际应用中因为有多个用户对使用p2p的链路,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。

2.4、Activemq 的消息持久化机制

JDBC: 持久化到数据库
AMQ :日志文件(已基本不用)
KahaDB : AMQ基础上改进,默认选择
LevelDB :谷歌K/V数据库
在activemq.xml中查看默认的broker持久化机制。

2.5、Activemq 的消息确认机制

  1. AUTO_ACKNOWLEDGE = 1 自动确认
  2. CLIENT_ACKNOWLEDGE = 2 客户端手动确认
  3. DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
  4. SESSION_TRANSACTED = 0 事务提交并确认
  5. INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认

3、RabbitMQ

3.1、RabbitMQ是什么

RabbitMQ是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。

3.2、RabbitMQ的作用及原理

3.3、RabbitMQ的通信方式

  • 简单队列

最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式

  • 工作队列模式

一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式

  • 发布订阅模式

Pulish/Subscribe,无选择接收消息,一个消息生产者,一个交换机(交换机类型为fanout),多个消息队列,多个消费者,生产者只需把消息发送到交换机,绑定这个交换机的队列都会获得一份一样的数据。

  • 路由模式

在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。

  • 主体模式

topics(主题)模式跟routing路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由键 routingKey,类似于SQL中 = 和 like 的关系。

  • RPC模式

RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的处理业务,处理完后然后在A服务器继续执行下去,把异步的消息以同步的方式执行。

3.4、RabbitMQ的消息持久化机制

  • Queue(消息队列)的持久化是通过durable=true来实现的。
  • Message(消息)的持久化 ,通过设置消息是持久化的标识。
  • Exchange(交换机)的持久化 。

3.5、RabbitMQ的消息确认机制

  • confirm机制:确认消息是否成功发送到Exchange
  • ack机制:确认消息是否被消费者成功消费
  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认

4、RocketMQ

4.1、RocketMQ是什么

RocketMQ是阿里开发的一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

4.2、RocketMQ的作用及原理

 4.3、RocketMQ的通信方式

RocketMQ消息订阅有两种模式

一种是Push模式(MQPushConsumer),即MQServer主动向消费端推送

另外一种是Pull模式(MQPullConsumer),即消费端在需要时,主动到MQ Server拉取

但在具体实现时,Push和Pull模式本质都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息

集群模式和广播模式

集群模式:默认情况下我们都是使用的集群模式,也就是说消费者组收到消息后,只有其中的一台机器会接收到消息。

广播模式:消费者组内的每台机器都会收到这条消息。

4.4、RocketMQ的消息持久化机制

exchange持久化、queue持久化、message持久化

CommitLog:日志数据文件,存储消息内容,所有 queue 共享,不区分 topic ,顺序读写 ,1G 一个文件

ConsumeQueue:逻辑 Queue,基于 topic 的 CommitLog 的索引文件,消息先到达 commitLog,然后异步转发到 consumeQueue,包含 queue 在 commitLog 中的物理偏移量 offset,消息实体内容大小和 Message Tag 的 hash 值,大于 600W 个字节,写满之后重新生成,顺序写

IndexFile:基于 Key 或 时间区间的 CommitLog 的索引文件,文件名以创建的时间戳命名,固定的单个 indexFile 大小为 400M,可以保存 2000W 个索引

4.5、RocketMQ的消息确认机制

  • confirm机制:确认消息是否成功发送到Exchange
  • ack机制:确认消息是否被消费者成功消费

5、Kafka

5.1、Kafka是什么

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统,可作为消息中间件

5.2、Kafka的作用及原理

 5.3、Kafka的通信方式

  • 生产者发送模式
  1. 发后即忘(fire-and-forget):只管往Kafka中发送消息而并不关心消息是否正确到达
  2. 同步(sync):一般是在send()方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。
  3. 异步(async):send()方法会返回Futrue对象,通过调用Futrue对象的get()方法,等待直到结果返回
  • 消费者消费模式
  1. At-most-once(最多一次):在每一条消息commit成功之后,再进行消费处理;设置自动提交为false,接收到消息之后,首先commit,然后再进行消费。
  2. At-least-once(最少一次):在每一条消息处理成功之后,再进行commit;设置自动提交为false;消息处理成功之后,手动进行commit。
  3. Exactly-once(正好一次):将offset作为唯一id与消息同时处理,并且保证处理的原子性;设置自动提交为false;消息处理成功之后再提交。

5.4、Kafka的消息持久化机制

Kafka直接将数据写入到日志文件中,以追加的形式写入

5.5、Kafka的消息确认机制

  • confirm机制:确认消息是否成功发送
  • ack机制:确认消息是否被消费者成功消费

6、消息重复消费问题

为什么要考虑重复消费的问题?

比如我们消费后通过消费中间件来调用,扣费10元,但是消费者消费消息后还没来得及进行确认,消息中间件进行了重启,那么消息者就会进行再次扣费处理,这样就会出问题!

kafka重复消费例子:

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧。但是,如果在这期间重启系统或者直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset。重启之后,少数消息会再次消费一次。如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据在数据库里插入了 2 次,那么数据就错啦。

重复消费问题引发后,我们就需要考虑怎么保证幂等性

如果是数据插入操作,没有数据新增,已有数据那我们只进行更新就行。

如果是基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

insert into(a,b) values('1','2') on duplicate key update a='3'
merge into student a
using (select id from dual) s
on (a.id = s.id)
when matched then
  update set a.student_name = '沈畅'
when not matched then
  insert (id, student_name, fk_class) values ('1', '小沈', '3')

7、消息丢失问题

7.1、ActiveMQ

  • 生产者丢失消息

生产者丢失消息的问题可以通过消息重投、重试机制来解决

  • ActiveMQ丢失消息

ActiveMQ丢失消息的问题需要通过ActiveMQ消息持久化机制+高可用(见ActiveMQ章节)来解决,ActiveMQ的消息持久化机制有以下几种

  1. JDBC: 持久化到数据库
  2. AMQ :日志文件(已基本不用)
  3. KahaDB : AMQ基础上改进,默认选择
  4. LevelDB :谷歌K/V数据库
  5. 在activemq.xml中查看默认的broker持久化机制。
  • 消息者丢失消息

消费者丢失消息通过ack机制来解决,消息者进行业务处理后,再进行ack确认,避免消息丢失。

7.2、RabbitMQ

  • 生产者丢失消息

生产者消息丢失,通过confirm机制来确认消息发送,然后进行相应的消息重投、重试机制

  • RabbitMQ丢失消息

  1. RabbitMQ丢失消息的问题需要通过RabbitMQ消息持久化机制+高可用(见RabbitMQ章节)来解决,
  2. RabbitMQ持久化包含:
  3. Queue(消息队列)的持久化是通过durable=true来实现的。
  4. Message(消息)的持久化 ,通过设置消息是持久化的标识。
  5. Exchange(交换机)的持久化 。
  • 消息者丢失消息

消费者丢失消息通过ack机制来解决,消息者进行业务处理后,再进行ack确认,避免消息丢失。

7.3、RocketMQ

  • 生产者丢失消息

生产者消息丢失,通过confirm机制来确认消息发送,然后进行相应的消息重投、重试机制

  • RocketMQ丢失消息

RocketMQ丢失消息的问题需要通过RocketMQ消息持久化机制+高可用(见RocketMQ章节)来解决

RocketMQ持久化包含:exchange持久化、queue持久化、message持久化

  • 消息者丢失消息

消费者丢失消息通过ack机制来解决,消息者进行业务处理后,再进行ack确认,避免消息丢失。

7.4、Kafka

  • 生产者丢失消息

生产者消息丢失,通过confirm机制来确认消息发送,然后进行相应的消息重投、重试机制

  • Kafka丢失消息

Kafka直接将数据写入到日志文件中,以追加的形式写入

  • 消息者丢失消息

消费者丢失消息通过ack机制来解决,消息者进行业务处理后,再进行ack确认,避免消息丢失。

8、消息积压问题 

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,怎么处理?

其实消息积压的问题,一般都是由消费端出了问题导致的,在实际业务场景中一般不会出现,但是出现问题一般都是大问题。

  • 模拟场景

一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。由于消费者宕机导致现在MQ中积压几百万数据

  • 解决思路
  1. 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉(避免重复消费)。
  2. 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
  3. 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  4. 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  5. 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐