技术栈传送门
JAVA 基础手撸架构,Java基础面试100问_vincent-CSDN博客
JAVA 集合手撸架构,JAVA集合面试60问_vincent-CSDN博客
JVM 虚拟机手撸架构,JVM面试30问_vincent-CSDN博客
并发编程手撸架构,并发编程面试123问_vincent-CSDN博客
Spring 手撸架构,Spring面试63问_vincent-CSDN博客
Spring cloud 手撸架构,Spring cloud面试45问_vincent-CSDN博客
SpringBoot手撸面试,Spring Boot面试41问_vincent-CSDN博客
Netty 与 RPC手撸架构,Netty 与 RPC面试48问_vincent-CSDN博客
Doubo 手撸架构,Dubbo面试49问_vincent-CSDN博客
Redis手撸架构,Redis面试41问_vincent-CSDN博客
Zookeeper手撸架构,Zookeeper面试27问_vincent-CSDN博客
Mysql 手撸架构,Mysql 面试126问_vincent-CSDN博客
MyBatis手撸架构,MyBatis面试42问_vincent-CSDN博客
MongoDB 手撸架构,MongDB 面试50问_vincent-CSDN博客
Elasticsearch手撸架构,Elasticsearch 面试25问_vincent-CSDN博客
RabbitMQ 手撸架构,RabbitMQ 面试49问_vincent-CSDN博客
Kafka 手撸架构,Kafka 面试42问_vincent-CSDN博客
Docker手撸架构,Docker 面试25问_vincent-CSDN博客
Nginx手撸架构,Nginx 面试40问_vincent-CSDN博客
算法常用排序算法总结(1)-- 比较排序_vincent-CSDN博客_比较排序
常用排序算法总结(2)-- 非比较排序算法_vincent-CSDN博客_非比较排序的算法有
分布式事务分布式事务解决方案(总览)_vincent-CSDN博客
HTTP太厉害了,终于有人能把TCP/IP 协议讲的明明白白了_vincent-CSDN博客_tcp和ip

MQ如何选型?

特性ActiveMQRabbitMQRocketMQKafka
客户端支持语言JAVA、C、C++、Python、PHP、Pert、net等官方支持Erlang、Java/Ruby等,社区产出多种语言API,几乎支持所有常用语言JAVA、C++(不成熟)官方支持JAVA,开源社区有多语言版本,如PHP,Python,GO,C/C++,Ruby,NodeJS等编程语言
单机吞吐量万级,吞吐量RocketMQ和Kafka要低了一个数量级万级,吞吐量比RocketMQ和Kafka要低了一个数量级10万级,RocketMQ也是可以支撑高吞吐的一种MQ10万级别,这是kafka最大的优点,就是吞吐量高。 一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic数量对吞吐量的影响topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topictopic从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源
时效性ms级微秒级,这是rabbitmq的一大特点,延迟是最低的ms级延迟在ms级以内
可用性高,基于主从架构实现高可用性高,基于主从架构实现高可用性非常高,分布式架构非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据经过参数优化配置,可以做到0丢失经过参数优化配置,消息可以做到0丢失
持久化内存、文件、数据库内存、文件,支持数据堆积,但数据堆积会影响生产速率磁盘文件磁盘文件,只要磁盘容量足够,可以做到无限消息堆积
功能支持MQ领域的功能极其完备基于erlang开发,所以并发能力很强,性能极其好,延时很低MQ功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
  1. 中小型公司首选 RabbitmQ:管理界面简单,高并发。
  2. 大型公司可以选择 RocketMQ:更高并发,可对 rocket进行定制化开发。
  3. 日志采集功能,首选 kafka,专为大数据准备。

RabbitMQ是什么?

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

RabbitMQ特点

可靠性: RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。

灵活的路由 : 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个 交换器绑定在一起, 也可以通过插件机制来实现自己的交换器。

扩展性: 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。

高可用性 : 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。

多种协议: RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息 中间件协议。

多语言客户端 :RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。

管理界面 : RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集 群中的节点等。

令插件机制 : RabbitMQ 提供了许多插件 , 以实现从多方面进行扩展,当然也可以编写自 己的插件。

使用 RabbitmQ有什么好处?

  1. 应用解耦(系统拆分)
  2. 异步处理(预约挂号业务处理成功后,异步发送短信、推送消息、日志记录等,可以大大减小响应)
  3. 消息分发
  4. 流量削峰:将请求发送到队列中,短暂的高峰期积压是允许的。
  5. 消息缓冲

RocketMq物理部署结构

  • Name server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂, Broker分为 Master与Save,一个 Master可以对应多个Save,但是一个Save只能对应一个Master, Master与 Slave的对应关系通过指定相同的 BrokerName,不同的 Brokered来定义Brokered为0表示 Master,非0表示 Slave。 Master也可以部署多个。每个 Broker与 Name server集群中的所有节点建立长连接,定时注册 Topic信息到所有 Name server
  • Producer与 Name server集群中的其中一个节点(随机选择)建立长连接,定期从 Name server取Topic路由信息,并向提供 Topic服务的 Maste建立长连接,且定时向 Maste发送心跳。 Producer完全无状态,可集群部署。
  • Consumer与 Name server集群中的其中一个节点(随机选择)建立长连接,定期从 Name server取 Topic路由信息,并向提供 Topic服务的 Master、 Slave建立长连接,且定时向 Master、 Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

RocketMq逻辑结构

Producer Group

用来表示一个发送消息应用,一个 Producer Group下包含多个 Producer实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个 Producer对象。一个 Producer Group可以发送多个Topi消息, Producer Group

作用如下:

  1. 标识一类 Producer
  2. 可以通过运维工具查询这个发送消息应用下有多个 Producer实例
  3. 发送分布式事务消息时,如果 Producer中途意外宕机, Broker会主动回调 Producer Group内的任意一台机器来确认事务状态。

 Consumer Group

用来表示一个消费消息应用,一个 Consumer Group下包含多个 Consumer实例,可以是多台机器,也可以是多个进程,或者是⼀个进程的多个Consumer对象。⼀个Consumer Group下的多个Consumer以均摊⽅式消费消息,如果设置为⼴播⽅式,那么这个Consumer Group下的每个实例都消费全量数据。

RocketMq数据存储结构

RocketMQ采取了一种数据与索引分离的存储方法。有效降低文件资源、IO资源,內存资源的损耗。即便是阿里这种海量数据,高并发场景也能够有效降低端到端延迟,并具备较强的横向扩展能力。

AMQP是什么

RabbitMQ就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

AMQP协议3层

Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。

Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。

TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。

AMQP模型的几大组件

  • 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。

  • 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。

  • 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。

RocketMq的消息是有序的吗?

一个topic下有多个queue,为了保证发送有序,rocketmq提供了MessageQueueSelector队列选择机制

  1. 可使用hash取模法,让同一个订单发送到同一个queue中,再使用同步发送,只有消息A发送成功,再发送消息B
  2. rocketmq的topic内的队列机制,可以保证存储满足FIFO,剩下的只需要消费者顺序消费即可
  3. rocketmq仅保证顺序发送,顺序消费由消费者业务保证

RocketMq事务消息的实现机制?

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址

RocketMQ第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

消息如何分发?

若该队列⾄少有⼀个消费者订阅,消息将以循环(round-robin)的⽅式发送给消费者。每条消息只会分发给⼀个订阅的消费者(前提是消费者能够正常处理消息并进⾏确认)。

RabbitMQ routing 路由模式

1、 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。

2、 根据业务功能定义路由字符串。

3、 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4、 业务场景:error 通知、EXCEPTION、错误通知的功能、传统意义的错误通知、客户通知、利用 key 路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。

消息怎么路由?

消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。

常用的交换器主要分为一下三种:

  • fanout:如果交换器收到消息,将会广播到所有绑定的队列上。
  • direct:如果路由键完全匹配,消息就被投递到相应的队列。
  • topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符。

RabbitMQ publish/subscribe 发布订阅(共享资源)

每个消费者监听自己的队列。

生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

RabbitMQ 中的 broker 是指什么?cluster 又是指什么?

broker 是指一个或多个 erlang node 的逻辑分组,且 node 上运行着 RabbitMQ 应用程序。

cluster 是在 broker 的基础之上,增加了 node 之间共享元数据的约束。

什么是元数据?元数据分为哪些类型?包括哪些内容?与 cluster 相关的元数据有哪些?

在非 cluster 模式下,元数据主要分为:

Queue 元数据(queue 名字和属性等)

Exchange 元数据(exchange 名字、类型和属性等)

Binding 元数据(存放路由关系的查找表)

Vhost 元数据(vhost 范围内针对前三者的名字空间约束和安全属性设置)

在cluster 模式下,还包括: cluster 中 node 位置信息和 node 关系信息。

元数据是如何保存的?

元数据按照 erlangnode 的类型确定是仅保存于 RAM 中,还是同时保存在 RAM 和 disk 上。

元数据在 cluster 中是如何分布的?

元数据在cluster 中是全 node 分布的。

RAM node 和 disk node 的区别?

RAM node 仅将 fabric(即 queue、exchange 和 binding 等 RabbitMQ 基础构件)相关元数据保存到内存中,但 disk node 会在内存和磁盘中均进行存储。RAM node 上唯一会存储到磁盘上的元数据是 cluster 中使用的 disk node 的地址。要求在 RabbitMQ cluster中至少存在一个 disk node 

RabbitMQ 上的一个 queue 中存放的 message 是否有数量限制?

可以认为是无限制,因为限制取决于机器的内存,但是消息过多会导致处理效率的下降。

RabbitMQ 概念里的 channel、exchange 和 queue 是逻辑概念,还是对应着进程实体?分别起什么作用?

queue 具有自己的 erlang 进程;

exchange 内部实现为保存 binding 关系的查找表;

channel 是实际进行路由工作的实体,即负责按照 routing_key 将 message 投递给queue 。

由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。channel 号为 0 的 channel 用于处理所有对于当前 connection 全局有效的帧,而 1-65535 号 channel 用于处理和特定 channel 相关的帧。其中每一个 channel 运行在一个独立的线程上,多线程共享同一个 socket。

vhost 是什么?起什么作用?

vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。

若 cluster 中拥有某个 queue 的 owner node 失效了,且该 queue 被声明具有durable 属性,是否能够成功从其他 node 上重新声明该 queue ?

不能,在这种情况下,将得到 404 NOT_FOUND 错误。只能等 queue 所属的 node恢复后才能使用该 queue 。但若该 queue 本身不具有 durable 属性,则可在其他 node上重新声明。

cluster 中 node 的失效会对 consumer 产生什么影响?若是在 cluster 中创建了mirrored queue ,这时 node 失效会对 consumer 产生什么影响?

若是 consumer 所连接的那个 node 失效(无论该 node 是否为 consumer 所订阅queue 的 owner node),则 consumer 会在发现 TCP 连接断开时,按标准行为执行重连逻辑,并根据“Assume Nothing”原则重建相应的 fabric 即可。若是失效的 node 为consumer 订阅 queue 的 owner node,则 consumer 只能通过 Consumer CancellationNotification 机制来检测与该 queue 订阅关系的终止,否则会出现傻等却没有任何消息来到的问题。

能够在地理上分开的不同数据中心使用 RabbitMQ cluster 么?

不能。第一,你无法控制所创建的 queue 实际分布在 cluster 里的哪个 node 上(一般使用 HAProxy + cluster 模型时都是这样),这可能会导致各种跨地域访问时的常见问题;

第二,Erlang 的 OTP 通信框架对延迟的容忍度有限,这可能会触发各种超时,导致业务疲于处理;

第三,在广域网上的连接失效问题将导致经典的“脑裂”问题,而RabbitMQ 目前无法处理(该问题主要是说 Mnesia)。

为什么 heavy RPC 的使用场景下不建议采用 disk node ?

heavy RPC 是指在业务逻辑中高频调用 RabbitMQ 提供的 RPC 机制,导致不断创建、销毁 reply queue ,进而造成 disk node 的性能问题(因为会针对元数据不断写盘)。所以在使用 RPC 机制时需要考虑自身的业务场景。

向不存在的 exchange 发 publish 消息会发生什么?向不存在的 queue 执行consume 动作会发生什么?

都会收到 Channel.Close 信令告之不存在(内含原因 404 NOT_FOUND)。

routing_key 和 binding_key 的最大长度是多少?

255 字节。

RabbitMQ 允许发送的 message 最大可达多大?

根据 AMQP 协议规定,消息体的大小由 64-bit 的值来指定,所以你就可以知道到底能发多大的数据了。

什么情况下 producer 不主动创建 queue 是安全的?

1.message 是允许丢失的;

2.实现了针对未处理消息的 republish 功能(例如采用Publisher Confirm 机制)。

如何确保消息不丢失?

消息持久化的前提是:将交换器/队列的durable属性设置为true,表示交换器/队列是持久交换器/队列,在服务器崩溃或重启之后不需要重新创建交换器/队列(交换器/队列会⾃动创建)。

如果消息想要从Rabbit崩溃中恢复,那么消息必须:

1、在消息发布前,通过把它的 “投递模式” 选项设置为2(持久)来把消息标记成持久化

2、将消息发送到持久交换器

3、消息到达持久队列

如何避免消息重复投递或重复消费?

在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。

如何保证消息队列高可用?

1.集群:

 集群可以扩展消息通信的吞吐量,但是不会备份消息,备份消息要通过镜像队列的方式解决。

队列存储在单个节点、交换器存储在所有节点

2.镜像队列:

将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现 RabbitmQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在εoηsuwεκ消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。

使⽤RabbitMQ增加rest服务吞吐量

“dead letter”queue 的用途?

当消息被 RabbitMQ server 投递到 consumer 后,但 consumer 却通过 Basic.Reject进行了拒绝时(同时设置 requeue=false),那么该消息会被放入“dead letter”queue 中。该 queue 可用于排查 message 被 reject 或 undeliver 的原因。

为什么说保证 message 被可靠持久化的条件是 queue 和 exchange 具有durable 属性,同时 message 具有 persistent 属性才行?

binding 关系可以表示为 exchange – binding – queue 。从文档中我们知道,若要求投递的 message 能够不丢失,要求 message 本身设置 persistent 属性,要求 exchange和 queue 都设置 durable 属性。其实这问题可以这么想,若 exchange 或 queue 未设置durable 属性,则在其 crash 之后就会无法恢复,那么即使 message 设置了 persistent 属性,仍然存在 message 虽然能恢复但却无处容身的问题;同理,若 message 本身未设置persistent 属性,则 message 的持久化更无从谈起。

Consumer Cancellation Notification 机制用于什么场景?

用于保证当镜像 queue 中 master 挂掉时,连接到 slave 上的 consumer 可以收到自身 consume 被取消的通知,进而可以重新执行 consume 动作从新选出的 master 出获得消息。若不采用该机制,连接到 slave 上的 consumer 将不会感知 master 挂掉这个事情,导致后续无法再收到新 master 广播出来的 message 。另外,因为在镜像 queue 模式下,存在将 message 进行 requeue 的可能,所以实现 consumer 的逻辑时需要能够正确处理出现重复 message 的情况。

为什么不应该对所有的 message 都使用持久化机制?

首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。

其次,message 的持久化机制用在 RabbitMQ 的内置 cluster 方案时会出现“坑爹”问题。矛盾点在于,若 message 设置了 persistent 属性,但 queue 未设置durable 属性,那么当该 queue 的 owner node 出现异常后,在未重建该 queue 前,发往该 queue 的 message 将被 blackholed ;若 message 设置了 persistent 属性,同时queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建,只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器),则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使

用 SSD)。

另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

能够在地理上分开的不同数据中心使用 RabbitMQ cluster 么?

不能。

第一,你无法控制所创建的 queue 实际分布在 cluster 里的哪个 node 上(一般使用

HAProxy + cluster 模型时都是这样),这可能会导致各种跨地域访问时的常见问题。

第二,Erlang 的 OTP 通信框架对延迟的容忍度有限,这可能会触发各种超时,导致业务

疲于处理。

第三,在广域网上的连接失效问题将导致经典的“脑裂”问题,而 RabbitMQ 目前无法

处理(该问题主要是说 Mnesia)。

RabbitMQ 有那些基本概念?

  • Broker:简单来说就是消息队列服务器实体。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange 根据这个关键字进行消息投递。
  • VHost:vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的vhost 中)。
  • Producer:消息生产者,就是投递消息的程序。
  • Consumer:消息消费者,就是接受消息的程序。
  • Channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel代表一个会话任务。由 Exchange、Queue、RoutingKey 三个才能决定一个从 Exchange 到 Queue 的唯
  • 一的线路。

RocketMq会有重复消费的问题吗?如何解决?

在网络中断的情况下可能出现,需要保证消费端处理消息的业务逻辑保持幂等性

RocketMq延迟消息?如何实现的?

RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时。默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

Message msg = new Message(topic, tags, keys, body);

msg.setDelayTimeLevel(3);

RocketMq是推模型还是拉模型?

rocketmq不管是推模式还是拉模式底层都是拉模式,推模式也是在拉模式上做了一层封装.

消息存储在broker中,通过topic和tags区分消息队列。producer在发送消息时不关心consumer对应的topic和tags,只将消息发送到对应broker的对应topic和tags中。

推模式中broker则需要知道哪些consumer拥有哪些topic和tags,但在consumer重启或更换topic时,broker无法及时获取信息,可能将消息推送到旧的consumer中。对应consumer主动获取topic,这样确保每次主动获取时他对应的topic信息都是最新的。

RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息。

consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。 区别:

  • MQPushConsumer方式,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。
  • MQPullConsumer方式,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

什么情况下会出现 blackholed 问题?

blackholed 问题是指,向 exchange 投递了 message ,而由于各种原因导致该 message 丢失,但发送者却不知道。可导致 blackholed 的情况:

1.向未绑定 queue 的 exchange 发送 message;

2.exchange 以 binding_key key_A 绑定了 queue queue_A,但向该 exchange 发送 message 使用的 routing_key 却是 key_B。

如何防止出现 blackholed 问题?

没有特别好的办法,只能在具体实践中通过各种方式保证相关 fabric 的存在。另外,如果在执行 Basic.Publish 时设置 mandatory=true ,则在遇到可能出现 blackholed 情况时,服务器会通过返回 Basic.Return 告之当前 message 无法被正确投递(内含原因 312NO_ROUTE)。

Basic.Reject 的用法是什么?

该信令可用于 consumer 对收到的 message 进行 reject 。

若在该信令中设置requeue=true,则当 RabbitMQ server 收到该拒绝信令后,会将该 message 重新发送到下一个处于 consume 状态的 consumer 处(理论上仍可能将该消息发送给当前consumer)。

若设置 requeue=false ,则 RabbitMQ server 在收到拒绝信令后,将直接将该 message 从 queue 中移除。

另外一种移除 queue 中 message 的小技巧是,consumer 回复 Basic.Ack 但不对获取到的 message 做任何处理。而 Basic.Nack 是对 Basic.Reject 的扩展,以支持一次拒绝多条 message 的能力。

什么是 Binding 绑定?

通过绑定将交换器和队列关联起来,一般会指定一个 BindingKey,这样 RabbitMq 就知道 如何正确路由消息到队列了。

RocketMq的负载均衡

生产者负载均衡

从MessageQueue列表中随机选择一个(默认策略),通过自增随机数对列表大小取余获取位置信息,但获得的MessageQueue所在的集群不能是上次的失败集群。

集群超时容忍策略,先随机选择一个MessageQueue,如果因为超时等异常发送失败,会优先选择该broker集群下其他的messeagequeue进行发送。如果没有找到则从之前发送失败broker集群中选择一个MessageQueue进行发送,如果还没有找到则使用默认策略。

消费者负载均衡

  • 平均分配策略(默认)(AllocateMessageQueueAveragely)
  • 环形分配策略(AllocateMessageQueueAveragelyByCircle)
  • 手动配置分配策略(AllocateMessageQueueByConfig)
  • 机房分配策略(AllocateMessageQueueByMachineRoom)
  • 一致性哈希分配策略(AllocateMessageQueueConsistentHash)
  • 靠近机房策略(AllocateMachineRoomNearby)

RocketMq消息积压

提高消费并行读 同一个Consumer Group下,通过增加Consumer实例的数量来提高并行度,超过订阅队列数的Consumer实例无效。

提高单个Consumer的消费并行线程,通过修改Consumer的consumerThreadMin和consumerThreadMax来设置线程数

  1. 批量方式消费 通过设置Consumer的consumerMessageBathMaxSize这个参数,默认是1,一次只消费一条消息,例如设置N,那么每次消费的消息条数小于等于N

  2. 丢弃非重要消息 当消息发生堆积时,如果消费速度跟不上消费速度,可以选择丢弃一些不重要的消息

  3. 优化消息消费的过程 对于消费消息的过程一般包括业务处理以及跟数据库的交互,可以试着通过一些其他的方法优化消费的逻辑。

临时解决方案:

新建一个topic,写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的queue中。临时用一部分机器来部署consumer,每一批consumer消费一个临时queue的数据。等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息

死信队列和延迟队列的使用

死信消息:

  1. 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
  2. 消息过期了
  3. 队列达到最大的长度

过期消息:

    在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。

    队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒

    单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒

延时队列:在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐