整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

MQTT 协议是什么

着 5G 时代的来临,万物互联的伟大构想正在成为现实。联网的 物联网设备 在 2018 年已经达到了 70 亿1,在未来两年,仅智能水电气表就将超过10亿2。

海量的设备接入和设备管理对网络带宽、通信协议以及平台服务架构都带来了很大挑战。对于 物联网协议 来说,必须针对性地解决物联网设备通信的几个关键问题:其网络环境复杂而不可靠、其内存和闪存容量小、其处理器能力有限。

MQTT 协议 是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山:

MQTT 协议的诞生

MQTT was created by Andy Stanford-Clark of IBM, and Arlen Nipper (then of Arcom Systems, later CTO of Eurotech).3

据 Arlen Nipper 在一 IBM Podcast 上的自述,MQTT 原名是 MQ TT, 注意 MQ 与 TT之间的空格,其全称为: MQ Telemetry Transport,是九十年代早期,他在参与 Conoco Phillips 公司的一个原油管道数据采集监控系统(pipeline SCADA system)时,开发的一个实时数据传输协议。它的目的在于让传感器通过带宽有限的 VSAT ,与 IBM 的 MQ Integrator 通信。由于 Nipper 是遥感和数据采集监控专业出身,所以按业内惯例给了个 MQ TT 的名字。

MQTT 协议设计原则

按照 Nipper 的介绍,MQTT 必须简单容易实现,必须支持 QoS(设备网络环境复杂),必须轻量且省带宽(因为那时候带宽很贵),必须数据无关(不关心 Payload 数据格式),必须有持续地会话感知能力(时刻知道设备是否在线)。下面将介绍 MQTT (3.1.1 版本) 的几个核心特色,分别对应了这几个设计原则的实现。

灵活的发布订阅和主题设计

发布订阅模式是传统 Client/Server 模式的一种解耦方案。发布者通过 Broker 与消费者之间通信,Broker 的作用是将收到的消息通过某种过滤规则,正确地发送给消费者。发布/订阅模式 相对于 客户端/服务器模式 的好处在于:

  • 发布者和消费者之间不必预先知道对方的存在,比如不需要预先沟通对方的 IP Address 和 Port
  • 发布者和消费者之间不必同时运行。因为 Broker 是一直运行的。

在 MQTT 协议里,上面提到的 过滤规则 是 Topic。比如:所有发布到 news 这个 Topic 的消息,都会被 Broker 转发给已经订阅了 news 的订阅者:

上图中订阅者预先订阅了 news,然后发布者向 Broker 发布了一条消息 "some msg" 并指定发布到 news 主题,Broker 通过 Topic 匹配,决定将这条消息转发给订阅者。

MQTT 的 Topic 有层级结构,并且支持通配符 + 和 #:

  • + 是匹配单层的通配符。比如 news/+ 可以匹配 news/sports,news/+/basketball 可匹配到 news/sports/basketball。
  • # 是一到多层的通配符。比如 news/# 可以匹配 news、 news/sports、news/sports/basketball 以及 news/sports/basketball/x 等等。

MQTT 的主题是不要预先创建的,发布者发送消息到某个主题、或者订阅者订阅某个主题的时候,Broker 就会自动创建这个主题。

带宽消耗最小化

MQTT 协议将协议本身占用的额外消耗最小化,消息头部最小只需要占用 2 个字节。

MQTT 的消息格式分三部分:

固定长度头部,2 个字节,所有消息类型里都有可变长度头部,只有某些消息类型里有Payload,只有某些消息类型里有

MQTT 的主要消息类型有:

  • CONNECT / CONNACK
  • PUBLISH / PUBACK
  • SUBSCRIBE / SUBACK
  • UNSUBSCRIBE / UNSUBACK
  • PINGREQ / PINGRESP
  • DISCONNECT

其中 PINGREQ / PINGRESP 和 DISCONNECT 报文是不需要可变头部的,也没有 Payload,也就是说它们的报文大小仅仅消耗 2 个字节。

在 CONNECT 报文的可变长度头部里,有个 Protocol Version 的字段。为了节省空间,只有一个字节。所以版本号不是按照字符串 "3.1.1" 存放的,而是使用数字 4 来表示 3.1.1 版本。

三个可选的 QoS 等级

为适应设备不同的网络环境,MQTT 设计了 3 个 QoS 等级,0, 1, 2:

  • At most once (0)
  • At least once (1)
  • Exactly once (2)

QoS 0 是一种 "fire and forget" 的消息发送模式:Sender (可能是 Publisher 或者 Broker) 发送一条消息之后,就不再关心它有没有发送到对方,也不设置任何重发机制。

QoS 1 包含了简单的重发机制,Sender 发送消息之后等待接收者的 ACK,如果没收到 ACK 则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复。

QoS 2 设计了略微复杂的重发和重复消息发现机制,保证消息到达对方并且严格值到达一次。

会话保持

MQTT 没有假设设备或 Broker 使用了 TCP 的保活机制4,而是设计了协议层的保活机制:在 CONNECT 报文里可设置 Keepalive 字段,来设置保活心跳包 PINGREQ/PINGRESP 的发送时间间隔。当长时间无法收到设备的 PINGREQ 的时候,Broker 就会认为设备已经下线。

总的来说,Keepalive 有两个作用:

  • 发现对端死亡或者网络中断
  • 在长时间无消息交互的情况下,保持连接不被网络设备断开

对于那些想要在重新上线后,重新收到离线期间错过的消息的设备,MQTT 设计了持久化连接:在 CONNECT 报文里可设置 CleanSession 字段为 False,则 Broker 会为终端存储:

  • 设备所有的订阅
  • 还未被设备确认的 QoS1 和 QoS 消息
  • 设备离线时错过的消息

在线状态感知

MQTT 设计了遗愿(Last Will) 消息,让 Broker 在发现设备异常下线的情况下,帮助设备发布一条遗愿消息到指定的主题。

实际上在某些 MQTT 服务器的实现里 (比如 EMQ X),设备上线或下线的时候 Broker 会通过某些系统主题发布设备状态更新,更符合实际应用场景。

开源 MQTT 服务器如何选择

到目前为止,比较流行的开源 MQTT 服务器有几个:

  1. Eclipse Mosquitto使用 C 语言实现的 MQTT 服务器。Eclipse 组织还还包含了大量的 MQTT 客户端项目:https://www.eclipse.org/paho/#
  2. EMQ X使用 Erlang 语言开发的 MQTT 服务器,内置强大的规则引擎,支持许多其他 IoT 协议比如 MQTT-SN、 CoAP、LwM2M 等。
  3. Mosca使用 Node.JS 开发的 MQTT 服务器,简单易用。
  4. VerneMQ同样使用 Erlang 开发的 MQTT 服务器.

从支持 MQTT 5.0、稳定性、扩展性、集群能力等方面考虑,EMQ X 的表现应该是最好的:

  • 使用 Erlang OTP 开发,容错能力好 (电信领域久经考验的语言,曾经做出过 99.9999999% 可用性的交换机设备5)
  • 官方有大量的扩展插件可供扩展。有很多认证插件,数据存储(backend)插件可供选择。可支持各种关系型数据库,NoSQL 数据库,以及常见消息队列如 Kafka,RabbitMQ,Pulsar 等
  • 支持集群,支持节点水平扩展
  • 单节点支持 2000K 并发连接
  • 支持规则引擎和编解码

MQTT 协议快速体验

MQTT 在线服务器

EMQ X MQTT 物联网云服务 提供了一个在线的公共 MQTT 5.0 服务器,不需要任何安装您就可以快速开始 MQTT 协议的学习、测试或原型制作。

该 MQTT 服务器的详细接入信息请见 EMQ 官网页面:免费的在线 MQTT 服务器。

MQTT 在线客户端

EMQ 也提供了支持浏览器访问的 MQTT 在线客户端工具,该工具支持通过普通或者加密的 WebSocket 端口连接至 MQTT 服务器,同时也支持缓存连接方便下次访问使用。


  1. The number of connected devices that are in use worldwide now exceeds 17 billion, with the number of IoT devices at 7 billion... https://iot-analytics.com/state-of-the-iot-update-q1-q2-2018-number-of-iot-devices-now-7b/ ↩
  2. The estimated installed base of smart meters (electricity, gas and water) is expected to surpass the 1 billion mark within the next 2 years. https://iot-analytics.com/smart-meter-market-2019-global-penetration-reached-14-percent/ ↩
  3. https://github.com/mqtt/mqtt.github.io/wiki/history ↩
  4. https://www.cnblogs.com/softidea/p/5764051.html ↩
  5. https://pragprog.com/articles/erlang ↩

想一想都快两年没有编写代码,这公众号也快有两年没更新了,人闲久了总想找些活干,为了保持代码编写技能于是把之前写的MQTT协议扩展出一个网关服务,并实现对3.X和5.0协议版本的支持。作为一个服务网关在性能上也是有着一定的要求,其实现目标能支持数十万的消息订阅转发。

简介
项目是基于BeetleX通讯组件扩展的MQTT协议通讯服务,服务包括两大模块分别是:BeetleX.MQTT.Protocols和BeetleX.MQTT.Server。前者是对MQTT通讯协议的实现,分别实现了V3.X和V5.0两个版本;而后者则是网关服务的实现并集成了基础的管理界面,两个项目都是都提供了完整的实现代码并存放在Gitub上。

项目开源地址:github.com/beetlex-io/mqtt

运行项目
项目的运行非常简单,只需要创建一个控制台项目并在Main方法中添加以下代码即可以运行

  • class Program{ private static MQTTServer mServer;  static void Main(string[] args) { mServer = new MQTTServer(ProtocolType.V3); mServer.RegisterComponent<BeetleX.MQTT.Server.Controller>(); mServer.MQTTListen(o => { o.DefaultListen.Port = 8089; //o.DefaultListen.SSL = true; //o.DefaultListen.CertificateFile = ""; //o.DefaultListen.CertificatePassword = ""; }) .Setting(o => { o.LogToConsole = true; o.Port = 80; o.LogLevel = EventArgs.LogType.Info; }) .UseJWT() .UseEFCore<Storages.MQTTDB>() .UseElement(PageStyle.ElementDashboard) .Initialize((http, vue, resoure) => { resoure.AddAssemblies(typeof(BeetleX.MQTT.Server.MQTTUser).Assembly); resoure.AddCss("website.css"); resoure.AddScript("echarts.js"); vue.Debug(); }) .Run();
    }}

    以上代码在80端口上打开WEB管理服务,在8089端口上打开MQTT服务;服务启动后就可以通过浏览器进入到简单的管理界面。在协议版本选择上可以在MQTTServer创建时指定V3或V5(暂时不能同一端口服务同时支持V3和V5)。

    • 首页

    • 用户管理

    • 设备管理


    管理界面只推荐简单的帐号管理和转发统计,基础框架已经搭建完成,可以根据实际需求进行扩展开发。

    单独使用协议分析器
    如果使用其他网络服务组件又不想自己编写MQTT协议,那可以单独使用BeetleX.MQTT.Protocols对网络数据进行协议分析。组件是基于Stream数据流规范开发,只需要传对一个标准的Stream数据流即可以完成MQTT协议读取和写入。

    • //v5var mqttparse = new BeetleX.MQTT.Protocols.V5.MQTTParseV5();mqttparse.Read(stream, );mqttparse.Write(msg, stream, );//v3.xvar mqttparse = new BeetleX.MQTT.MQTTParseV3();mqttparse.Read(stream, );mqttparse.Write(msg, stream, );


      性能:由于MQTT是支持通配符订阅的,高并发时大量消息在订阅匹配上往往会比较损耗性能;组件在这方面做了特别的优化,对于通配符订阅上也能非常轻松地应对每秒上10万订阅转发(具体上限取决于硬件搭配)。

      提醒:由于项目仅个人兴趣编写,并没有在自有商业项目中使用,因此会存在一定的问题;如果碰到问题可以去Github对应的项目上提出相应的问题。


      BeetleX

      开源跨平台通讯框架(支持TLS)

      提供HTTP,Websocket,MQTT,Redis,RPC和服务网关开源组件

      http://beetlex-io.com

应用示例使用Coolpy7作为Mqtt服务器并启用Websocket代理完美支持高并发大流量即时通过能力,本示以即时通信聊天为为例。还可以应用到其他软件应用如:网页客服系统、网站信息通知、网页即时通信系统、网页游戏等等

技术应用架构简介

系统架构包括:

  1. MQTT服务端程序(Coolpy7)
  2. WebSocket代理服务端 (Coolpy7_ws)
  3. Html5聊天室前端

安装并运行

运行Coolpy7核心服务

Coolpy7核心服务是一个最原始最单纯功能完备的MQTT消息服务器端,包括功能有:QoS:0,QoS1,QoS2消息质量支持。Will消息支持等等。深入了解 https://mcxiaoke.gitbooks.io/mqtt-cn/content/

  1. 防止暴力连接攻击,对已连接客户端进行优先保护
  2. 防止空连接攻击,当用户连接建立后两秒钟内没有进行身份验证即主动关闭客户端连接

通过ssh进入服务器192.168.200.201,并确保你已经按照 https://coolpy7.gitbook.io/coolpy7book/kai-shi-shi-yong/start 配置服务器操作系统的网络优化配置。

Coolpy7核心服务运行后会自行构当前目录下的data文件夹,此文件夹存放MQTT运行期所需求持久化的数据信息,使用的是开源项目 https://github.com/jacoblai/yiyidb,支持10亿级秒op的高性能数据库,数据库内核使用的是Leveldb技术。

# 下载服务器端
git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7
# 解压文件
unzip go_build_Coolpy7_go_linux.zip
# 提权
chmod -R 777 go_build_Coolpy7_go_linux
# 启动Coolpy7 启动参数
# l 当前服务Host地址 (默认为:1883即本地1883端口,此参数一般默认即可,无需配置)
# a 连接接入调度器最大线程,此值可防止暴力连接攻击,对已连接客户端进行优先保护 (默认值128)
./go_build_Coolpy7_go_linux
# 启动成功后会打印如下信息,即说明服务端已正常启动,host于1883端口,请确保相关防火墙配置可用
2018/10/29 12:59:55 Coolpy7 tcp is listening on [::]:1883

一般需为程序提权才可以运行Linux服务,指令:chmod -R 777 go_build_Coolpy7_go_linux

运行Coolpy7 WS代理服务

此功能即为Coolpy7核心服务提供WebSocket接入功能。通过ssh进入服务器192.168.200.203,并确保你已经按照 https://coolpy7.gitbook.io/coolpy7book/kai-shi-shi-yong/start 配置服务器操作系统的网络优化配置。

  1. 千万级WebSocket代理服务器
  2. 支持防爆力攻击
# 下载服务器端
git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7
# 解压文件
unzip go_build_Coolpy7_ws_go_linux.zip
# 提权
chmod -R 777 go_build_Coolpy7_ws_go_linux
# 启动Coolpy7 WS Poxy
# r启动参数 CP7核心服务器所在ip或域名 (例:core.coolpy.net:1883 or 192.168.200.201:1883)
# l启动参数 当前服务Host地址 (默认为:8083即本地8083端口,此参数一般默认即可,无需配置)
./go_build_Coolpy7_ws_go_linux
# 启动成功后会打印如下信息,即说明服务端已正常启动,host于8083端口,请确保相关防火墙配置可用
2018/10/29 12:59:55 upstream 192.168.200.201:1883 ok
2018/10/29 12:59:55 Coolpy7 ws is listening on [::]:8083

运行Html5前端聊天室应用示例

  1. 下载开源项目:https://github.com/Coolpy7/Cp7Chat
  2. 修改连接地址为上一步服务器端ip和端口(具体ip和端口按阁下真实环境,全套程序运行于本机可统一使用127.0.0.1为连接地址)
  3. 通过Webstorm等web调试工具运行代码

以WebStorm为例

1.下载源代码

填写git地址下载源代码

2.修改连接信息本示例以本机运行整套系统为例输入127.0.0.1,端口号8083,假设把服务器端Coolpy7和Coolpy7-ws已经运行于阿里云之类的云服务器上改写为服务器的公网IP地址和端口即可,如果已绑定域名可直接填写域名如: test.coolpy.net

代码位于chat.html第55行

3.修改完毕后选中工程中的index.html点击

至此已完成运行部署。以下是测试运行演示

项目开源信息

服务器端开源地址: https://github.com/Coolpy7

聊天室前端开源地址:https://github.com/Coolpy7/Cp7Chat