着 5G 时代的来临,万物互联的伟大构想正在成为现实。联网的 物联网设备 在 2018 年已经达到了 70 亿1,在未来两年,仅智能水电气表就将超过10亿2。
海量的设备接入和设备管理对网络带宽、通信协议以及平台服务架构都带来了很大挑战。对于 物联网协议 来说,必须针对性地解决物联网设备通信的几个关键问题:其网络环境复杂而不可靠、其内存和闪存容量小、其处理器能力有限。
MQTT 协议 是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山:
MQTT was created by Andy Stanford-Clark of IBM, and Arlen Nipper (then of Arcom Systems, later CTO of Eurotech).3
据 Arlen Nipper 在一 IBM Podcast 上的自述,MQTT 原名是 MQ TT, 注意 MQ 与 TT之间的空格,其全称为: MQ Telemetry Transport,是九十年代早期,他在参与 Conoco Phillips 公司的一个原油管道数据采集监控系统(pipeline SCADA system)时,开发的一个实时数据传输协议。它的目的在于让传感器通过带宽有限的 VSAT ,与 IBM 的 MQ Integrator 通信。由于 Nipper 是遥感和数据采集监控专业出身,所以按业内惯例给了个 MQ TT 的名字。
按照 Nipper 的介绍,MQTT 必须简单容易实现,必须支持 QoS(设备网络环境复杂),必须轻量且省带宽(因为那时候带宽很贵),必须数据无关(不关心 Payload 数据格式),必须有持续地会话感知能力(时刻知道设备是否在线)。下面将介绍 MQTT (3.1.1 版本) 的几个核心特色,分别对应了这几个设计原则的实现。
发布订阅模式是传统 Client/Server 模式的一种解耦方案。发布者通过 Broker 与消费者之间通信,Broker 的作用是将收到的消息通过某种过滤规则,正确地发送给消费者。发布/订阅模式 相对于 客户端/服务器模式 的好处在于:
在 MQTT 协议里,上面提到的 过滤规则 是 Topic。比如:所有发布到 news 这个 Topic 的消息,都会被 Broker 转发给已经订阅了 news 的订阅者:
上图中订阅者预先订阅了 news,然后发布者向 Broker 发布了一条消息 "some msg" 并指定发布到 news 主题,Broker 通过 Topic 匹配,决定将这条消息转发给订阅者。
MQTT 的 Topic 有层级结构,并且支持通配符 + 和 #:
MQTT 的主题是不要预先创建的,发布者发送消息到某个主题、或者订阅者订阅某个主题的时候,Broker 就会自动创建这个主题。
MQTT 协议将协议本身占用的额外消耗最小化,消息头部最小只需要占用 2 个字节。
MQTT 的消息格式分三部分:
固定长度头部,2 个字节,所有消息类型里都有可变长度头部,只有某些消息类型里有Payload,只有某些消息类型里有
MQTT 的主要消息类型有:
其中 PINGREQ / PINGRESP 和 DISCONNECT 报文是不需要可变头部的,也没有 Payload,也就是说它们的报文大小仅仅消耗 2 个字节。
在 CONNECT 报文的可变长度头部里,有个 Protocol Version 的字段。为了节省空间,只有一个字节。所以版本号不是按照字符串 "3.1.1" 存放的,而是使用数字 4 来表示 3.1.1 版本。
为适应设备不同的网络环境,MQTT 设计了 3 个 QoS 等级,0, 1, 2:
QoS 0 是一种 "fire and forget" 的消息发送模式:Sender (可能是 Publisher 或者 Broker) 发送一条消息之后,就不再关心它有没有发送到对方,也不设置任何重发机制。
QoS 1 包含了简单的重发机制,Sender 发送消息之后等待接收者的 ACK,如果没收到 ACK 则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复。
QoS 2 设计了略微复杂的重发和重复消息发现机制,保证消息到达对方并且严格值到达一次。
MQTT 没有假设设备或 Broker 使用了 TCP 的保活机制4,而是设计了协议层的保活机制:在 CONNECT 报文里可设置 Keepalive 字段,来设置保活心跳包 PINGREQ/PINGRESP 的发送时间间隔。当长时间无法收到设备的 PINGREQ 的时候,Broker 就会认为设备已经下线。
总的来说,Keepalive 有两个作用:
对于那些想要在重新上线后,重新收到离线期间错过的消息的设备,MQTT 设计了持久化连接:在 CONNECT 报文里可设置 CleanSession 字段为 False,则 Broker 会为终端存储:
MQTT 设计了遗愿(Last Will) 消息,让 Broker 在发现设备异常下线的情况下,帮助设备发布一条遗愿消息到指定的主题。
实际上在某些 MQTT 服务器的实现里 (比如 EMQ X),设备上线或下线的时候 Broker 会通过某些系统主题发布设备状态更新,更符合实际应用场景。
到目前为止,比较流行的开源 MQTT 服务器有几个:
从支持 MQTT 5.0、稳定性、扩展性、集群能力等方面考虑,EMQ X 的表现应该是最好的:
EMQ X MQTT 物联网云服务 提供了一个在线的公共 MQTT 5.0 服务器,不需要任何安装您就可以快速开始 MQTT 协议的学习、测试或原型制作。
该 MQTT 服务器的详细接入信息请见 EMQ 官网页面:免费的在线 MQTT 服务器。
EMQ 也提供了支持浏览器访问的 MQTT 在线客户端工具,该工具支持通过普通或者加密的 WebSocket 端口连接至 MQTT 服务器,同时也支持缓存连接方便下次访问使用。
想一想都快两年没有编写代码,这公众号也快有两年没更新了,人闲久了总想找些活干,为了保持代码编写技能于是把之前写的MQTT协议扩展出一个网关服务,并实现对3.X和5.0协议版本的支持。作为一个服务网关在性能上也是有着一定的要求,其实现目标能支持数十万的消息订阅转发。
简介
项目是基于BeetleX通讯组件扩展的MQTT协议通讯服务,服务包括两大模块分别是:BeetleX.MQTT.Protocols和BeetleX.MQTT.Server。前者是对MQTT通讯协议的实现,分别实现了V3.X和V5.0两个版本;而后者则是网关服务的实现并集成了基础的管理界面,两个项目都是都提供了完整的实现代码并存放在Gitub上。
项目开源地址:github.com/beetlex-io/mqtt
运行项目
项目的运行非常简单,只需要创建一个控制台项目并在Main方法中添加以下代码即可以运行
class Program
{
private static MQTTServer mServer;
static void Main(string[] args)
{
mServer = new MQTTServer(ProtocolType.V3);
mServer.RegisterComponent<BeetleX.MQTT.Server.Controller>();
mServer.MQTTListen(o =>
{
o.DefaultListen.Port = 8089;
//o.DefaultListen.SSL = true;
//o.DefaultListen.CertificateFile = "";
//o.DefaultListen.CertificatePassword = "";
})
.Setting(o =>
{
o.LogToConsole = true;
o.Port = 80;
o.LogLevel = EventArgs.LogType.Info;
})
.UseJWT()
.UseEFCore<Storages.MQTTDB>()
.UseElement(PageStyle.ElementDashboard)
.Initialize((http, vue, resoure) =>
{
resoure.AddAssemblies(typeof(BeetleX.MQTT.Server.MQTTUser).Assembly);
resoure.AddCss("website.css");
resoure.AddScript("echarts.js");
vue.Debug();
})
.Run();
}
}
以上代码在80端口上打开WEB管理服务,在8089端口上打开MQTT服务;服务启动后就可以通过浏览器进入到简单的管理界面。在协议版本选择上可以在MQTTServer创建时指定V3或V5(暂时不能同一端口服务同时支持V3和V5)。
首页
用户管理
设备管理
管理界面只推荐简单的帐号管理和转发统计,基础框架已经搭建完成,可以根据实际需求进行扩展开发。
单独使用协议分析器
如果使用其他网络服务组件又不想自己编写MQTT协议,那可以单独使用BeetleX.MQTT.Protocols对网络数据进行协议分析。组件是基于Stream数据流规范开发,只需要传对一个标准的Stream数据流即可以完成MQTT协议读取和写入。
//v5
var mqttparse = new BeetleX.MQTT.Protocols.V5.MQTTParseV5();
mqttparse.Read(stream, );
mqttparse.Write(msg, stream, );
//v3.x
var mqttparse = new BeetleX.MQTT.MQTTParseV3();
mqttparse.Read(stream, );
mqttparse.Write(msg, stream, );
性能:由于MQTT是支持通配符订阅的,高并发时大量消息在订阅匹配上往往会比较损耗性能;组件在这方面做了特别的优化,对于通配符订阅上也能非常轻松地应对每秒上10万订阅转发(具体上限取决于硬件搭配)。
提醒:由于项目仅个人兴趣编写,并没有在自有商业项目中使用,因此会存在一定的问题;如果碰到问题可以去Github对应的项目上提出相应的问题。
BeetleX
开源跨平台通讯框架(支持TLS)
提供HTTP,Websocket,MQTT,Redis,RPC和服务网关开源组件
http://beetlex-io.com
应用示例使用Coolpy7作为Mqtt服务器并启用Websocket代理完美支持高并发大流量即时通过能力,本示以即时通信聊天为为例。还可以应用到其他软件应用如:网页客服系统、网站信息通知、网页即时通信系统、网页游戏等等
技术应用架构简介
系统架构包括:
安装并运行
运行Coolpy7核心服务
Coolpy7核心服务是一个最原始最单纯功能完备的MQTT消息服务器端,包括功能有:QoS:0,QoS1,QoS2消息质量支持。Will消息支持等等。深入了解 https://mcxiaoke.gitbooks.io/mqtt-cn/content/
通过ssh进入服务器192.168.200.201,并确保你已经按照 https://coolpy7.gitbook.io/coolpy7book/kai-shi-shi-yong/start 配置服务器操作系统的网络优化配置。
Coolpy7核心服务运行后会自行构当前目录下的data文件夹,此文件夹存放MQTT运行期所需求持久化的数据信息,使用的是开源项目 https://github.com/jacoblai/yiyidb,支持10亿级秒op的高性能数据库,数据库内核使用的是Leveldb技术。
# 下载服务器端 git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7 # 解压文件 unzip go_build_Coolpy7_go_linux.zip # 提权 chmod -R 777 go_build_Coolpy7_go_linux # 启动Coolpy7 启动参数 # l 当前服务Host地址 (默认为:1883即本地1883端口,此参数一般默认即可,无需配置) # a 连接接入调度器最大线程,此值可防止暴力连接攻击,对已连接客户端进行优先保护 (默认值128) ./go_build_Coolpy7_go_linux # 启动成功后会打印如下信息,即说明服务端已正常启动,host于1883端口,请确保相关防火墙配置可用 2018/10/29 12:59:55 Coolpy7 tcp is listening on [::]:1883
一般需为程序提权才可以运行Linux服务,指令:chmod -R 777 go_build_Coolpy7_go_linux
运行Coolpy7 WS代理服务
此功能即为Coolpy7核心服务提供WebSocket接入功能。通过ssh进入服务器192.168.200.203,并确保你已经按照 https://coolpy7.gitbook.io/coolpy7book/kai-shi-shi-yong/start 配置服务器操作系统的网络优化配置。
# 下载服务器端 git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7 # 解压文件 unzip go_build_Coolpy7_ws_go_linux.zip # 提权 chmod -R 777 go_build_Coolpy7_ws_go_linux # 启动Coolpy7 WS Poxy # r启动参数 CP7核心服务器所在ip或域名 (例:core.coolpy.net:1883 or 192.168.200.201:1883) # l启动参数 当前服务Host地址 (默认为:8083即本地8083端口,此参数一般默认即可,无需配置) ./go_build_Coolpy7_ws_go_linux # 启动成功后会打印如下信息,即说明服务端已正常启动,host于8083端口,请确保相关防火墙配置可用 2018/10/29 12:59:55 upstream 192.168.200.201:1883 ok 2018/10/29 12:59:55 Coolpy7 ws is listening on [::]:8083
运行Html5前端聊天室应用示例
以WebStorm为例
1.下载源代码
填写git地址下载源代码
2.修改连接信息本示例以本机运行整套系统为例输入127.0.0.1,端口号8083,假设把服务器端Coolpy7和Coolpy7-ws已经运行于阿里云之类的云服务器上改写为服务器的公网IP地址和端口即可,如果已绑定域名可直接填写域名如: test.coolpy.net
代码位于chat.html第55行
3.修改完毕后选中工程中的index.html点击
至此已完成运行部署。以下是测试运行演示
项目开源信息
服务器端开源地址: https://github.com/Coolpy7
聊天室前端开源地址:https://github.com/Coolpy7/Cp7Chat
*请认真填写需求信息,我们会在24小时内与您取得联系。