MQTT
MQTT Broker 职责与需求消息队列与消息中间件适用场景不一样。MQTT 与消息队列有一定的区别,队列是一种先进先出的数据结构,消息队列常用于应用服务层面,实现参考如 RabbitMQ Kafka RocketMQ;MQTT 是传输协议,绝大部分 MQTT Broker 不保证消息顺序(Queue),常用语物联网、消息传输等。物联网主流接入协议分为MQTT,CoaP,Http,XMPP等几种
MQTT Broker 职责与需求
消息队列与消息中间件适用场景不一样。
MQTT 与消息队列有一定的区别,队列是一种先进先出的数据结构,消息队列常用于应用服务层面,实现参考如 RabbitMQ Kafka RocketMQ;
MQTT 是传输协议,绝大部分 MQTT Broker 不保证消息顺序(Queue),常用语物联网、消息传输等。
物联网主流接入协议分为MQTT,CoaP,Http,XMPP等几种
MQTT协议主要特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制
MQTT主要应用场景 (多客户端,少量消息)
1.车联网
2.工业物联网
3.智能家居
4.视频直播弹幕
5.IM实时聊天 (一对一聊天,群组聊天)
6.推送服务,比如推送实时新闻
7.金融交易数据订阅推送
整体架构
单机版本的MQTT存在并发连接数上限以及处理能力的限制,主流的单机版本的MQTT服务包括ActiveMQ, RabbitMQ,Apollo,Mosquitto,分布式的MQTT服务包括知名的EMQ, VerneMQ都是采用Erlang实现的。
分布式版本的MQTT相对于单机版本最大的难点在于Session的管理,特别是持久化session,MQTT协议定义了两种Session,其中一种是transient Session,另外一种是=Persistent Session,用户可以通过在发送连接协议包的时候设置clean session这个状态位来决定采用哪种session。另外一个难点就是集群的管理,这里设计的框架是每个broker都是对等,他们之间不存在什么主从关系,所以我们直接AKKA Cluster这个框架作为集群管理,每个broker都需要注册监听的时间包括MemberUp,MemberDown,MemberUnreachable,ClusterMemberState等事件,这样每个broker就很可以很好的感知其他节点的状态,对内部的session做相应的处理,broker和broker之间的消息通知采用Akka actor来实现。
Broker内部服务框架
为了管理,以及设计方便,我们将内部服务抽象成为很多独立服务,这些服务包括:
1.Authentication and authorization service
a)该服务负责用户名,密码等认证方式的鉴权,以及每个client对于那些主题有权限进行读和写,后台数据全部保存在Mysql,通过redis做cache加速,当然也做in memory的cache加速,cache回收机制采用LRU策略
2.Session Manager
a)持久化session管理,包括session订阅什么主题,以及对应的persistent queue,该session需要在每个broker都同步一份,这样可以有效解决高可用性的问题,比如crash之后,不会受到影响
b)非持久化session管理,包括session订阅什么主题,以及对应的transient queue,该session只需要在连接机器上保持,不需要同步到其他的broker上,如果对应的client和broker失去连接之后,对应session信息就会被清除掉
3.Event Service
a)负责将连接,订阅等事件发送给每个broker,对于每个连接事件,我们都需要将该消息推送给event service,还有就是每个client的订阅主题,取消订阅主题的事件,目前event service的后端实现采用Kafka做的,当然也可以通过Akka本身提供的功能来做,考虑到需要持久化,所以采用了Kafka,后期我们减少对Kafka的依赖
4.Session State metadata service
a)负责持久化session metadata数据存储,该服务从Event Service订阅数据,然后决定哪些数据需要持久化到后端存储(采用Hbase做持久化存储),目前主要是存储持久化session相关的信息
5.Queue Service
a)管理以及分配queue,这里的queue分为两种,一种是transient queue,一种是persistent queue,transient queue是采用in memory的方式实现,persistent queue是采用Hbase实现。Transient queue是为transient session创建的,persistent queue是为persistent session创建。Persistent session的特点就是即使该session对应的连接断开了,我们也需维护该session,以及该session订阅的数据,以便下次这个client重新连接上来之后,自动恢复session的状态,还有下发没有处理完的订阅数据
6.Quota Service
a)==管理包括并发连接数,上行带宽,下行带宽的限制 ==
7.Metric Service
a)监控服务的并发连接数,并发消息数,当前流量,服务运行指标,包括CPU,memory,网络等相关指标
MQTT Codec Stack结构
连接层采用Netty NIO框架,关于Netty NIO的详细设计,这里我们就不做介绍了。支持 4 种形式的接入方式,TCP,TLS,websocket over TLS,以及websocket,各个接入方式的codec层级关系可以参考下图。
持久化Session
对于持久化session,需要将该session信息同步到每台机器,每台机器都有所有持久化session信息的全集,这样做的好处就是当某台broker无法工作了,连接在这个异常broker上的client不会丢失消息,每条publish的消息都是直接写入hbase的,当broker恢复,或者client连接到其他broker之后,可以继续从hbase获取数据,然后发送给订阅的client。
订阅消息处理流程
订阅消息会发往event service,每个broker都会订阅来自event service的数据,对于持久化session,每个broker都会创建对应session的订阅信息以及virtual queue,这个virtual queue分为client和server两部分,client端的virtual queue负责保证写入顺序,以及批量写入(提升效率),server这边的queue保证来自不同broker的消息的有序性.
发送消息流程
C2 往C1 订阅的主题发送一条数据,router会直接将数据写入C1 对应的hbase queue,然后通知C1,告诉他有新的数据可以消费了,这个时候broker直接从hbase读取数据,然后发往C1
如果Mqtt Broker2 出现crash了,比如这个进程挂了,或者Mqtt Broker2 所在的机器断电了,或者网络出现故障了,C1 本来应该收到的数据并不会减少,由于Mqtt Broker1 会继续往Hbase写入数据,等C1 重新连接之后,可以继续从Hbase消费数据
Event service数据的Compaction
考虑持久化session相关的数据都是写入到kafka的,如果一个新的broker加入集群,首先就需要将持久化session的信息全部加载,如果加载都是从kafka主题的头部开始消费数据的话,可能会花费很久,为此我们需要将kafka的数据做compaction,这些compaction的数据写入到hbase,如何加载全量信息了,全量信息就是hbase数据的集合和备份checkpoint之后kafka数据集合merge结果就是最终的全量信息。
非持久化Session
当非持久化session的client连接上来的时候,如果订阅主题,我们会直接在改client所在机器创建session以及session对应的queue
订阅消息流程:
发送消息流程:
当C2 发送一条消息的时候,broker1 会把消息转发给broker 2, broker2 会先把消息写入到C1 对应的in memory queue,然后发送一个有数据的event给C1,这个时候broker2 会从queue读取数据,然后发往给C1
基于HBase的分布式消息队列
Hbase本身不提供queue这个功能,但是我们可以利用hbase特性来实现virtual queue的概念,通过设计好rowkey来保证消息的有序性,然后将数据的读取转化为scan操作,下图有 4 个client,我们为每个client分配一个unique的queue ID,然后每个queue的数据通过queue ID和单调递增的ID来组合成为一个unique的rowkey,为了保证写入的均匀性,我们需要合理设计unique ID的prefix来保证将这些rowkey均匀的分布到不同的region。
为了实现queue的功能,我们在Hbase上定义了一个新的coprocessor,这个coprocessor用来创建queue,管理queue的数据,以及删除queue,同时还可以修改queue的配额等等。下图是我们的一个事例,我们有 4 个client,每个client都有自己的queue,通过算法把这些queue均匀的分布到不同的region上使用定制region split算法。
定义queue name为reverse{clientId}_tenantId,这里的clientID是系统生成的,是64bit的long,我们为每个client生成一个ID,这个ID是单调递增的,加入我们预期region的数目为 128 个的话,那么我们就取reverse{clientId}的头8bit作为region分割的条件,这样我们就可以把不同的queue均匀分布到不同的region上了,然后对region做balance。
保证写入消息的有序性
对于持久化消息队列,需要在每个broker上都建立一个virtual queue,该virtual queue对应hbase的真实queue,每次virtual queue的数据都是batch写入hbase,假设这个queue的名字为Q的话,我们会为每个写到hbase的消息分配一个unique的ID,该ID是Q_(ID),ID是一个单调递增的数字,采用64bit的long表示,每个batch写入到hbase的coprocessor之后,需要先获取该queue的lock,然后分配ID,然后将数据写入hbase,最后释放lock,这样下一个request就可以继续写入,这里lock的粒度是queue级别,就是每个queue都会有自己的一个lock,这样可以保证并发性。
读取queue的数据
我们会为每个queue保存该queue在Hbase的最小ID,以及最大ID,如果该queue的最小ID和最大ID由于cache失效,导致内存不存在的话,我们就通过hbase的scan操作,来获取最小的ID,以及最大ID,然后将数据保留到cache里面,这样可以加速下次查找,每次读取特定长度的数据,下次计算便于继续读取,读数据的时候并不需要获取锁,由于读数据只会来自一台机器的一个client,就是任何时刻只有一个client在读数据
删除queue的数据
这里的删除已经读取的数据,由于我们的数据都是有序的,所以删除的时候,只需要告诉queue需要删除多长的数据即可,然后我们根据最小ID,以及offset可以算出需要删除rowkey的ID,然后执行一个batch delete操作,这样就可以将数据删除了,删除数据也不会需要获取锁,由于删除请求只会来自一台机器的一个client,就是任何时刻只有一个client在删除数据
Notes:
同时由于Hbase目前并不存在官方的async的library来往hbase写入数据,或者读取数据,目前只有opentsdb提供一个版本,考虑我们是利用coprocessor增加了一个新的endpoint,但是opentsdb的async library并不支持coprocessor,为了我们需要扩展async的library,这样就可以async library的coprocessor库来处理数据。
优化:
如何利用in memory compaction来优化hbase queue的性能指标,由于mqtt的消息写入hbase之后,基本马上就会被读取出来,然后发送给client,所以说mqtt的消息都是属于short lived的数据,如果这些数据都在in memory做compaction的话,那就意味我们不需要将这些数据写入HFile,只需要写WAL日志,这样可以极大的降低HDFS文件系统的IO,对于我们这种场景的话,Hbase的瓶颈就出在HDFS文件系统的读写上,目前in memory compaction已经在hbase 2. 0 上实现,不过没有正式release。
更多 in memory compaction 的资料可以参考:
Accordion: HBase Breathes with In-Memory Compaction
https://blogs.apache.org/hbase/entry/accordion-hbase-breathes-with-in
Internal design:
https://blogs.apache.org/hbase/entry/accordion-developer-view-of-in
更多 queue 插件
每种 queue 都有自己的优缺点,为此我们提供了多种 queue 可以供用户选择,额外提供 redis 以及 kafka 的 queue,kafka 的 queue 是一种很 popular 的方式,主要是用在大规模扇入场景,比如说 100w 个 client 都往同一个主题发送消息,如果采用 in memory 的 queue 或者 hbase 的 queue,那么瓶颈就会出在订阅端(只有一个 TCP 连接来处理数据),如果采用 kafka queue,可以将数据发往 kafka 的主题,然后调用 kafka 的 client 来消费数据,这样就可以完美解决大扇入的场景。
多租户架构
目前 MQTT 服务是一个分布式多租户的服务,一个 IotHub 上面会有很多租户的 MQTT Broker,每个 MQTT broker 对应一个 tenant,每个 broker 有自己的 authentication service, session manager, Queue service,以及很多其他服务,包括 unique Id generator,backend storage service,以及 router 服务,当一个 client 的通过 TCP 和我们的服务建立连接之后,首先我们会为该 client 创建一个 session,这个 session 会检查该 client 是否合法,包括 tenant 名字,用户名,密码,如果所有的都合法的话,我们会把这个 client 的 session 添加到 session manager,如果不是合法的,我们会直接把这个 client 的连接给断开。
MQTT 采用 TCP 的方式和云端建立连接,我们通过用户名来区分这个 client 对应的是那个 tenant,所以我们对用户名有严格的规定,用户名必须是{tenant Name}/{clientName},拿到用户名和密码之后,我们先算出该 client 对应的 tenant name,然后获取该 tenant 对应 broker 实例,后去该 broker 的 auth 服务来认证用户名和密码组合。
测试数据
Baidu IoT Hub vs EMQTT
MPS: message per seconds
消息 payload 大小: 1024 bytes
场景:一半 pub 和一半 sub,每一个 pub 对应一个 sub,也就是说通过唯一主题关联起来,这种场景是对 MQTT 协议最严格的考验,其他场景相对来说 CPU 消耗会少一些
测试 Queue 类型:In memory queue
Notes
由于 Pub 和 Sub 是一一对应的,所这里的 MPS 是指 PUB 的 QPS,所以实际 QPS 是这个数字的两倍。
可用 MPS(无丢包,latency 小于 0.5s):
结论:同等连接数下,IoT Hub 的可用最大吞吐量在 EMQTT 的 1~2 倍之间。
部署 broker 机器配置信息:
vendor_id: GenuineIntel
cpu family: 6
model: 45
model name: Intel® Xeon® CPU E5-2620 0 @ 2.00GHz
core: 12
Memory:
MemTotal: 132137288 kB
更多关于百度 IoT Hub 使用信息可以访问官网。
更多推荐
所有评论(0)