整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

.NET6 环境下实现MQTT通信(附代码演示)

.NET6 环境下实现MQTT通信(附代码演示)

言: MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。

开发环境:

VS2022 + .NET 6 + Webapi / 控制台

1、新建一个webapi项目,用来后面做测试使用

2、新建一个继承自IHostedService的服务,用于随着webapi程序的启动而自动执行。(最终代码在文末)

3、引入 MQTTNet 包,该项目提供了.net环境下的MQTT通信协议支持,这款框架很优秀,此处直接引用它来进行使用。

4、在上面的MqttHostService类里面,开始方法里面新增初始化MQTT服务端的一些功能,例如 IP、端口号、事件等等。

5、mqtt服务端支持的一系列功能很多,大佬们可以自行去尝试一些新发现,此处只使用若干个简单功能。

6、添加客户端连接事件、连接关闭事件

7、由于事件要用的可能有点多,此处就不一一例举了,可以直接看以下的代码,以及有关注释来理解。

8、事件触发时候,打印输出

9、输出之前,记录一个当前事件名称标记一下,用于可以更加清楚看出是哪个事件输出的。

10、对MqttHostService类进行注册,用于程序启动时候跟随启动。

11、上面貌似设计的不是很友好,所以把mqtt服务实例单独弄出来,写入到单独的类里面做成属性,供方便调用。

12、把先前的一些东西改一下,换成使用上面步骤的属性来直接调用使用。

13、运行一下,看看是否可以成功,显示服务已启动,说明服务启动时OK的了.

14、新增一个控制台程序 MqttClient,用于模拟客户端。

15、创建客户端启动以及有关配置信息和有关事件,如图。具体使用可以看代码注释,就不过多解释了。

16、在program类里面,调用客户端启动方法,用于测试使用。

17、上面客户端对应的三个事件的实现如图,同时进行有关信息的打印输出。

18、启动服务端,然后启动客户端,可以看到服务端有一个连接失败的消息,这个是因为上面配置的客户端用户名是admin,密码是1234567,而服务端配置的规则是,用户名是admin 密码是123456

19、密码改回正常匹配项以后,再重新运行试试看,可以看到客户端与服务端连接上了。

20、如果关闭客户端,也可以看到服务端会进入客户端关闭事件内。

21、把上面主题订阅的内容写到连接成功以后的事件里面,不然客户端连接期间,可能就执行了主题订阅,会存在订阅失败的情况。改为写入到连接成功以后的事件里面,可以保证主题订阅肯定是在客户端连接成功以后才执行的。

22、接下来测试服务端消息推送,在MqttService服务里面,新增一个方法,用来执行mqtt服务端发布主题消息使用。有关配置信息和消息格式,如图所示。

23、新增一个API控制器,用来测试使用。API参数直接拿来进行消息的推送使用。

24、运行服务端和客户端,并访问刚刚新增的api接口,手动随意输入一条消息,可以看到客户端订阅的主题消息已经被实时接收到了。

25、接下来对客户端新增一个消息推送的方法,用来测试客户端消息发布的功能。有关消息格式和调用,如图所示,以及注释部分的说明。

26、客户端program类里面,客户端连接以后,通过手动回车,来执行客户端发布消息。

27、再次启动服务端和客户端

28、然后客户端内按一下回车,执行消息发布功能。可以看到,服务端成功接收到了客户端发过来的主题消息。

29、接下来测试客户端与客户端之间的消息发布与订阅,为了模拟多客户端效果,把上面客户端已经编译好的文件拷贝一份出来。

30、然后本地的代码进行一些修改,用来当做第二个客户端程序。所以客户端id也进行变更为 testclient02

31、对客户端订阅的主题,也改成 topic_02

32、启动服务端,以及拷贝出来的客户端1,和上面修改了部分代码的客户端2,保证都已经连接上服务端。

33、调用服务端的api接口,由于服务端发布的消息是发布给topic_01的,所以只有客户端1可以接收到消息。

34、客户端1执行回车,用于发布一段消息给主题 topic_02,可以看到客户端01发布的消息,同时被服务端和客户端02接收到了。因为服务端是总指挥,所以客户端发布的消息都会经过服务端,从而服务端都可以接收到连接的客户端发布的所有消息。

35、测试数据保持,下面先对客户端1进行断开,然后再重新连接客户端1,可以看到客户端1直接接收到了它订阅的主题的上一次最新的消息内容,这个就是消息里面,Retain属性设为True的结果,用于让服务端记忆该主题消息使用的。如果设为false,就没有这个效果了,大佬们也可以自己尝试。

36、最终的服务端代码:

MqttHostService:

  public class MqttHostService : IHostedService, IDisposable
    {
        public void Dispose()
        {
            
        }
        const string ServerClientId="SERVER";
        public Task StartAsync(CancellationToken cancellationToken)
        {
            MqttServerOptionsBuilder optionsBuilder=new MqttServerOptionsBuilder();
            optionsBuilder.WithDefaultEndpoint();
            optionsBuilder.WithDefaultEndpointPort(10086); // 设置 服务端 端口号
            optionsBuilder.WithConnectionBacklog(1000); // 最大连接数
            MqttServerOptions options=optionsBuilder.Build();

            MqttService._mqttServer=new MqttFactory().CreateMqttServer(options);

            MqttService._mqttServer.ClientConnectedAsync +=_mqttServer_ClientConnectedAsync; //客户端连接事件
            MqttService._mqttServer.ClientDisconnectedAsync +=_mqttServer_ClientDisconnectedAsync; // 客户端关闭事件
            MqttService._mqttServer.ApplicationMessageNotConsumedAsync +=_mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件

            MqttService._mqttServer.ClientSubscribedTopicAsync +=_mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件
            MqttService._mqttServer.ClientUnsubscribedTopicAsync +=_mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件
            MqttService._mqttServer.StartedAsync +=_mqttServer_StartedAsync; // 启动后事件
            MqttService._mqttServer.StoppedAsync +=_mqttServer_StoppedAsync; // 关闭后事件
            MqttService._mqttServer.InterceptingPublishAsync +=_mqttServer_InterceptingPublishAsync; // 消息接收事件
            MqttService._mqttServer.ValidatingConnectionAsync +=_mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关

            MqttService._mqttServer.StartAsync();
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端订阅主题事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientSubscribedTopicAsync:客户端ID=【{arg.ClientId}】订阅的主题=【{arg.TopicFilter}】 ");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 关闭后事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StoppedAsync(EventArgs arg)
        {
            Console.WriteLine($"StoppedAsync:MQTT服务已关闭……");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 用户名和密码验证有关
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
        {
            arg.ReasonCode=MqttConnectReasonCode.Success;
            if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456")
            {
                arg.ReasonCode=MqttConnectReasonCode.Banned;
                Console.WriteLine($"ValidatingConnectionAsync:客户端ID=【{arg.ClientId}】用户名或密码验证错误 ");

            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 消息接收事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            if (string.Equals(arg.ClientId, ServerClientId))
            {
                return Task.CompletedTask;
            }

            Console.WriteLine($"InterceptingPublishAsync:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 启动后事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StartedAsync(EventArgs arg)
        {
            Console.WriteLine($"StartedAsync:MQTT服务已启动……");
           return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端取消订阅事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientUnsubscribedTopicAsync:客户端ID=【{arg.ClientId}】已取消订阅的主题=【{arg.TopicFilter}】  ");
            return Task.CompletedTask;
        }

        private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageNotConsumedAsync:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 客户端断开时候触发
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"ClientDisconnectedAsync:客户端ID=【{arg.ClientId}】已断开, 地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 客户端连接时候触发
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
        {
            Console.WriteLine($"ClientConnectedAsync:客户端ID=【{arg.ClientId}】已连接, 用户名=【{arg.UserName}】地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
           return Task.CompletedTask;
        }
    }

MqttService:

 public class MqttService
    {
        public static MqttServer _mqttServer { get; set; }

        public static void PublishData(string data)
        {
            var message=new MqttApplicationMessage
            {
                Topic="topic_01",
                Payload=Encoding.Default.GetBytes(data),
                QualityOfServiceLevel=MqttQualityOfServiceLevel.AtLeastOnce,
                Retain=true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
            };

            _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01的客户端
            {
                SenderClientId="Server_01"
            }).GetAwaiter().GetResult();
        }

    }

37、最终的客户端代码:

MqttClientService:

public class MqttClientService
    {
        public static IMqttClient _mqttClient;
        public void MqttClientStart()
        {
            var optionsBuilder=new MqttClientOptionsBuilder()
                .WithTcpServer("127.0.0.1", 10086) // 要访问的mqtt服务端的 ip 和 端口号
                .WithCredentials("admin", "123456") // 要访问的mqtt服务端的用户名和密码
                .WithClientId("testclient02") // 设置客户端id
                .WithCleanSession()
                .WithTls(new MqttClientOptionsBuilderTlsParameters
                {
                    UseTls=false  // 是否使用 tls加密
                });

            var clientOptions=optionsBuilder.Build();
            _mqttClient=new MqttFactory().CreateMqttClient();

            _mqttClient.ConnectedAsync +=_mqttClient_ConnectedAsync; // 客户端连接成功事件
            _mqttClient.DisconnectedAsync +=_mqttClient_DisconnectedAsync; // 客户端连接关闭事件
            _mqttClient.ApplicationMessageReceivedAsync +=_mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件

            _mqttClient.ConnectAsync(clientOptions);


        }

        /// <summary>
        /// 客户端连接关闭事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"客户端已断开与服务端的连接……");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端连接成功事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            Console.WriteLine($"客户端已连接服务端……");

            // 订阅消息主题
            // MqttQualityOfServiceLevel: (QoS):  0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
            // 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
            // 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
            _mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce);

            return Task.CompletedTask;
        }

        /// <summary>
        /// 收到消息事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;
        }

        public void Publish(string data)
        {
            var message=new MqttApplicationMessage
            {
                Topic="topic_02",
                Payload=Encoding.Default.GetBytes(data),
                QualityOfServiceLevel=MqttQualityOfServiceLevel.AtLeastOnce,
                Retain=true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
            };
            _mqttClient.PublishAsync(message);
        }
    }

38、后记:MQTT以上演示已经完毕,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加灵活。其实,实际上MQTT的客户端在现实生产环境场景下,并不需要咱们开发者进行开发,很多硬件设备都支持提供MQTT协议的通信客户端,所以只需要自己搭建一个服务端,就可以实现实时监控各种设备推送过来的各种信号数据。同时客户端支持发布消息给其他客户端,所以就实现了设备与设备之间的一对一信号通信的效果了。如果需要下发信号给硬件设备,MQTT服务端也可以直接下发给某个指定设备来进行实现即可。上面案例只提供入门方案,如果有感兴趣的大佬,可以自己去拓展一下,来达到更好的效果。

原文:https://www.cnblogs.com/weskynet/p/16441219.html

发部署在云端的设备接入网关服务就不得不提到MQTT,使用MQTT不论是从设备到设备,还是设备到云端服务的双向通讯,都可以获得较好的支持。

MQTT的起源和我的理解


用tcpdump分析下MQTT的通讯时序

这里基于mosquitto,以一组实际的订阅、发布,使用tcpdump来观察MQTT的通讯。

# 使用tcpdump打印指定MQTT服务器所在端口的报文
tcpdump -n -XX -i eth0 port 1883
# 订阅指定主题的消息
mosquitto_sub -h 172.16.0.12 -t "topic/#" -u user -P password -i "client1"
# 发布消息到指定主题
mosquitto_pub -h 172.16.0.12 -t "topic/" -u user -P password  -i "client3" -m "1234567890123456789012345678901234567890"

tcpdump抓取的报文如下:

启动mosquitto_sub命令获得如下报文:==================================16:33:28.668926 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [S], seq 1177792969, win 29200, options [mss 1424,sackOK,TS val 530570732 ecr 0,nop,wscale 7], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  003c 5872 4000 3806 556b 5147 9713 ac10  .<Xr@.8.UkQG....
        0x0020:  000c a05a 075b 4633 b1c9 0000 0000 a002  ...Z.[F3........
        0x0030:  7210 a45e 0000 0204 0590 0402 080a 1f9f  r..^............
        0x0040:  ddec 0000 0000 0103 0307                 ..........
16:33:28.669001 IP MQTT.ibm-mqisdp >SUB.41050: Flags [S.], seq 1053629863, ack 1177792970, win 28960, options [mss 1460,sackOK,TS val 3885354501 ecr 530570732,nop,wscale 7], length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  003c 0000 4000 4006 a645 ac10 000c 5147  .<..@.@..E....QG
        0x0020:  9713 075b a05a 3ecd 1da7 4633 b1ca a012  ...[.Z>...F3....
        0x0030:  7120 9309 0000 0204 05b4 0402 080a e795  q...............
        0x0040:  ce05 1f9f ddec 0103 0307                 ..........
16:33:28.672229 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 1, win 229, options [nop,nop,TS val 530570735 ecr 3885354501], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 5873 4000 3806 5572 5147 9713 ac10  .4Xs@.8.UrQG....
        0x0020:  000c a05a 075b 4633 b1ca 3ecd 1da8 8010  ...Z.[F3..>.....
        0x0030:  00e5 320e 0000 0101 080a 1f9f ddef e795  ..2.............
        0x0040:  ce05                                     ..
16:33:28.672280 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [P.], seq 1:42, ack 1, win 229, options [nop,nop,TS val 530570735 ecr 3885354501], length 41
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  005d 5874 4000 3806 5548 5147 9713 ac10  .]Xt@.8.UHQG....
        0x0020:  000c a05a 075b 4633 b1ca 3ecd 1da8 8018  ...Z.[F3..>.....
        0x0030:  00e5 37b7 0000 0101 080a 1f9f ddef e795  ..7.............
        0x0040:  ce05 1027 0006 4d51 4973 6470 03c2 003c  ...'..MQIsdp...<
        0x0050:  0007 636c 6965 6e74 3100 0563 636e 6574  ..client1..ccnet
        0x0060:  0009 6363 6e65 7430 3930 31              ..ccnet0901
16:33:28.672313 IP MQTT.ibm-mqisdp > SUB.41050: Flags [.], ack 42, win 227, options [nop,nop,TS val 3885354505 ecr 530570735], length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0034 d3a1 4000 4006 d2ab ac10 000c 5147  .4..@.@.......QG
        0x0020:  9713 075b a05a 3ecd 1da8 4633 b1f3 8010  ...[.Z>...F3....
        0x0030:  00e3 31e3 0000 0101 080a e795 ce09 1f9f  ..1.............
        0x0040:  ddef                                     ..
16:33:28.672562 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 1:5, ack 42, win 227, options [nop,nop,TS val 3885354505 ecr 530570735], length 4
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0038 d3a2 4000 4006 d2a6 ac10 000c 5147  .8..@.@.......QG
        0x0020:  9713 075b a05a 3ecd 1da8 4633 b1f3 8018  ...[.Z>...F3....
        0x0030:  00e3 11d5 0000 0101 080a e795 ce09 1f9f  ................
        0x0040:  ddef 2002 0000                           ......
16:33:28.675753 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 5, win 229, options [nop,nop,TS val 530570739 ecr 3885354505], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 5875 4000 3806 5570 5147 9713 ac10  .4Xu@.8.UpQG....
        0x0020:  000c a05a 075b 4633 b1f3 3ecd 1dac 8010  ...Z.[F3..>.....
        0x0030:  00e5 31d9 0000 0101 080a 1f9f ddf3 e795  ..1.............
        0x0040:  ce09                                     ..
16:33:28.675772 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [P.], seq 42:56, ack 5, win 229, options [nop,nop,TS val 530570739 ecr 3885354505], length 14
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0042 5876 4000 3806 5561 5147 9713 ac10  .BXv@.8.UaQG....
        0x0020:  000c a05a 075b 4633 b1f3 3ecd 1dac 8018  ...Z.[F3..>.....
        0x0030:  00e5 46b6 0000 0101 080a 1f9f ddf3 e795  ..F.............
        0x0040:  ce09 820c 0001 0007 6363 6e65 742f 2300  ........ccnet/#.
16:33:28.675823 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 5:10, ack 56, win 227, options [nop,nop,TS val 3885354508 ecr 530570739], length 5
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0039 d3a3 4000 4006 d2a4 ac10 000c 5147  .9..@.@.......QG
        0x0020:  9713 075b a05a 3ecd 1dac 4633 b201 8018  ...[.Z>...F3....
        0x0030:  00e3 a1b8 0000 0101 080a e795 ce0c 1f9f  ................
        0x0040:  ddf3 9003 0001 00                        .......
16:33:28.719501 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 10, win 229, options [nop,nop,TS val 530570783 ecr 3885354508], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 5877 4000 3806 556e 5147 9713 ac10  .4Xw@.8.UnQG....
        0x0020:  000c a05a 075b 4633 b201 3ecd 1db1 8010  ...Z.[F3..>.....
        0x0030:  00e5 3197 0000 0101 080a 1f9f de1f e795  ..1.............
        0x0040:  ce0c                                     ..
终止mosquitto_sub命令获得如下报文:=================================16:29:54.745664 IP 81.71.151.19.40510 > 172.16.0.12.ibm-mqisdp: Flags [F.], seq 58, ack 12, win 229, options [nop,nop,TS val 530356809 ecr 3885119146], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 e1fc 4000 3806 cbe8 5147 9713 ac10  .4..@.8...QG....
        0x0020:  000c 9e3e 075b 0e6c ea6f 989b b7f8 8011  ...>.[.l.o......
        0x0030:  00e5 1a34 0000 0101 080a 1f9c 9a49 e792  ...4.........I..
        0x0040:  36aa                                     6.
16:29:54.745761 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.40510: Flags [F.], seq 12, ack 59, win 227, options [nop,nop,TS val 3885140578 ecr 530356809], length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0034 fbe4 4000 4006 aa68 ac10 000c 5147  .4..@.@..h....QG
        0x0020:  9713 075b 9e3e 989b b7f8 0e6c ea70 8011  ...[.>.....l.p..
        0x0030:  00e3 c67c 0000 0101 080a e792 8a62 1f9c  ...|.........b..
        0x0040:  9a49                                     .I
16:29:54.751445 IP 81.71.151.19.40510 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 13, win 229, options [nop,nop,TS val 530356812 ecr 3885140578], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 e1fd 4000 3806 cbe7 5147 9713 ac10  .4..@.8...QG....
        0x0020:  000c 9e3e 075b 0e6c ea70 989b b7f9 8010  ...>.[.l.p......
        0x0030:  00e5 c677 0000 0101 080a 1f9c 9a4c e792  ...w.........L..
        0x0040:  8a62                                     .b
# 执行mosquitto_pub后得到的报文如下:
16:36:40.721513 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [S], seq 2807988728, win 29200, options [mss 1424,sackOK,TS val 530762785 ecr 0,nop,wscale 7], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  003c 5dd0 4000 3806 500d 5147 9713 ac10  .<].@.8.P.QG....
        0x0020:  000c a1ae 075b a75e 81f8 0000 0000 a002  .....[.^........
        0x0030:  7210 8378 0000 0204 0590 0402 080a 1fa2  r..x............
        0x0040:  cc21 0000 0000 0103 0307                 .!........
16:36:40.721585 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [S.], seq 4236199326, ack 2807988729, win 28960, options [mss 1460,sackOK,TS val 3885546554 ecr 530762785,nop,wscale 7], length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  003c 0000 4000 4006 a645 ac10 000c 5147  .<..@.@..E....QG
        0x0020:  9713 075b a1ae fc7f 459e a75e 81f9 a012  ...[....E..^....
        0x0030:  7120 9e41 0000 0204 05b4 0402 080a e798  q..A............
        0x0040:  bc3a 1fa2 cc21 0103 0307                 .:...!....
16:36:40.724732 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 1, win 229, options [nop,nop,TS val 530762788 ecr 3885546554], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 5dd1 4000 3806 5014 5147 9713 ac10  .4].@.8.P.QG....
        0x0020:  000c a1ae 075b a75e 81f9 fc7f 459f 8010  .....[.^....E...
        0x0030:  00e5 3d46 0000 0101 080a 1fa2 cc24 e798  ..=F.........$..
        0x0040:  bc3a                                     .:
16:36:40.724750 IP PUB.41390 > MQTT.ibm-mqisdp: Flags [P.], seq 1:42, ack 1, win 229, options [nop,nop,TS val 530762788 ecr 3885546554], length 41
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  005d 5dd2 4000 3806 4fea 5147 9713 ac10  .]].@.8.O.QG....
        0x0020:  000c a1ae 075b a75e 81f9 fc7f 459f 8018  .....[.^....E...
        0x0030:  00e5 40ef 0000 0101 080a 1fa2 cc24 e798  ..@..........$..
        0x0040:  bc3a 1027 0006 4d51 4973 6470 03c2 003c  .:.'..MQIsdp...<
        0x0050:  0007 636c 6965 6e74 3300 0563 636e 6574  ..client3..ccnet
        0x0060:  0009 6363 6e65 7430 3930 31              ..ccnet0901
16:36:40.724787 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [.], ack 42, win 227, options [nop,nop,TS val 3885546557 ecr 530762788], length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0034 f9a2 4000 4006 acaa ac10 000c 5147  .4..@.@.......QG
        0x0020:  9713 075b a1ae fc7f 459f a75e 8222 8010  ...[....E..^."..
        0x0030:  00e3 3d1c 0000 0101 080a e798 bc3d 1fa2  ..=..........=..
        0x0040:  cc24                                     .$
16:36:40.724978 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [P.], seq 1:5, ack 42, win 227, options [nop,nop,TS val 3885546557 ecr 530762788], length 4
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0038 f9a3 4000 4006 aca5 ac10 000c 5147  .8..@.@.......QG
        0x0020:  9713 075b a1ae fc7f 459f a75e 8222 8018  ...[....E..^."..
        0x0030:  00e3 1d0e 0000 0101 080a e798 bc3d 1fa2  .............=..
        0x0040:  cc24 2002 0000                           .$....
16:36:40.728114 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 5dd3 4000 3806 5012 5147 9713 ac10  .4].@.8.P.QG....
        0x0020:  000c a1ae 075b a75e 8222 fc7f 45a3 8010  .....[.^."..E...
        0x0030:  00e5 3d13 0000 0101 080a 1fa2 cc27 e798  ..=..........'..
        0x0040:  bc3d                                     .=16:36:40.728144 IP PUB.41390 > MQTT.ibm-mqisdp: Flags [P.], seq 42:82, ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 40
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  005c 5dd4 4000 3806 4fe9 5147 9713 ac10  .\].@.8.O.QG....
        0x0020:  000c a1ae 075b a75e 8222 fc7f 45a3 8018  .....[.^."..E...
        0x0030:  00e5 a8af 0000 0101 080a 1fa2 cc27 e798  .............'..
        0x0040:  bc3d 3026 0006 6363 6e65 742f 3132 3334  .=0&..ccnet/1234
        0x0050:  3536 3738 3930 3132 3334 3536 3738 3930  5678901234567890
        0x0060:  3132 3334 3536 3738 3930                 1234567890
16:36:40.728146 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [FP.], seq 82:84, ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 2
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0036 5dd5 4000 3806 500e 5147 9713 ac10  .6].@.8.P.QG....
        0x0020:  000c a1ae 075b a75e 824a fc7f 45a3 8019  .....[.^.J..E...
        0x0030:  00e5 5cdf 0000 0101 080a 1fa2 cc27 e798  ..\..........'..
        0x0040:  bc3d e000                                .=..
16:36:40.728230 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 96:136, ack 62, win 227, options [nop,nop,TS val 3885546560 ecr 530753903], length 40
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  005c d3a9 4000 4006 d27b ac10 000c 5147  .\..@.@..{....QG
        0x0020:  9713 075b a05a 3ecd 1e07 4633 b207 8018  ...[.Z>...F3....
        0x0030:  00e3 e34f 0000 0101 080a e798 bc40 1fa2  ...O.........@..
        0x0040:  a96f 3026 0006 6363 6e65 742f 3132 3334  .o0&..ccnet/1234
        0x0050:  3536 3738 3930 3132 3334 3536 3738 3930  5678901234567890
        0x0060:  3132 3334 3536 3738 3930                 1234567890
16:36:40.728288 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [.], ack 85, win 227, options [nop,nop,TS val 3885546561 ecr 530762791], length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0034 f9a4 4000 4006 aca8 ac10 000c 5147  .4..@.@.......QG
        0x0020:  9713 075b a1ae fc7f 45a3 a75e 824d 8010  ...[....E..^.M..
        0x0030:  00e3 3ce6 0000 0101 080a e798 bc41 1fa2  ..<..........A..
        0x0040:  cc27                                     .'
16:36:40.728367 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [F.], seq 5, ack 85, win 227, options [nop,nop,TS val 3885546561 ecr 530762791], length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0034 f9a5 4000 4006 aca7 ac10 000c 5147  .4..@.@.......QG
        0x0020:  9713 075b a1ae fc7f 45a3 a75e 824d 8011  ...[....E..^.M..
        0x0030:  00e3 3ce5 0000 0101 080a e798 bc41 1fa2  ..<..........A..
        0x0040:  cc27                                     .'
16:36:40.731431 IP 81.71.151.19.41050 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 136, win 229, options [nop,nop,TS val 530762794 ecr 3885546560], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 5880 4000 3806 5565 5147 9713 ac10  .4X.@.8.UeQG....
        0x0020:  000c a05a 075b 4633 b207 3ecd 1e2f 8010  ...Z.[F3..>../..
        0x0030:  00e5 54ce 0000 0101 080a 1fa2 cc2a e798  ..T..........*..
        0x0040:  bc40                                     .@
16:36:40.731575 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 6, win 229, options [nop,nop,TS val 530762795 ecr 3885546561], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 0000 4000 3806 ade5 5147 9713 ac10  .4..@.8...QG....
        0x0020:  000c a1ae 075b a75e 824d fc7f 45a4 8010  .....[.^.M..E...
        0x0030:  00e5 3cdf 0000 0101 080a 1fa2 cc2b e798  ..<..........+..
        0x0040:  bc41                                     .A

去掉TCP握手和收报报文的常规交互,可知道MQTT订阅发布的完整时序如下:

MQTT发布订阅的实际交互时序图


可以看到AUHT、SUB、PUB、SEND_MESSAGE的报文非常精简,基本只是对必须传送的字段增加了两个字节的长度,因此其对带宽是非常节省的。

再分析下支持websockets时的通讯时序

# 启动对9001端口的监视
tcpdump -n -XX -i eth0 port 9001
# 订阅指定主题的消息
mosquitto_sub -h 172.16.0.12 -t "topic/#" -u user -P password -i "client1"
#编写一个基于websockets访问mqtt的页面,如下亲测通过;  点击init按钮
<html>
	<head>
		<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.2/mqttws31.js"></script>
	</head>
	<body>
		<script>
			var mqtt;
			var host='81.71.87.37';
			var port=9001;
			 
			// onConnect 事件
			function onConnect() {
				console.log('connected.');
				
				/*
				var raw_message='Hello World!';
				message=new Paho.MQTT.Message(raw_message);
				message.destinationName='ccnet/up/abc';
				console.log('sending message: ' + raw_message );
				mqtt.send(message);
				
				*/
			 
				// 订阅 download topic
				var subOptions={
					qos: 1,
					onSuccess: onSubscribe
				};
				mqtt.subscribe('ccnet/#', subOptions);
			}
			function onSend() {
				console.log('onSend.');
				var raw_message='Hello World!';
				message=new Paho.MQTT.Message(raw_message);
				message.destinationName='ccnet/up/abc';
				console.log('sending message: ' + raw_message );
				mqtt.send(message);
			}
			 
			// 订阅主题成功事件
			function onSubscribe(context) {
				console.log('subscribe success');
				console.log(context);
			}
			 
			// 连接失败事件
			function onFailure(message) {
				console.log('connect failed.');
			}
			 
			// onMessageArrived 事件
			function onMessageArrived(message) {
				console.log('new message arrived...');
				console.log(message.payloadString);
			}
			 
			 
			// 建立 MQTT websocket 连接
			function MQTTconnect() {
				console.log('connecting to ' + host + ':' + port);
				mqtt=new Paho.MQTT.Client(host, port, 'clientid');
				var options={
					timeout: 3,
					onSuccess: onConnect,
					onFailure: onFailure,
					userName: 'ccnet',
					password: 'ccnet0901',
					mqttVersion: 4
				};
				mqtt.onMessageArrived=onMessageArrived;
				mqtt.connect(options);
			}
		</script>
		<input  type="button" value="Init" onclick="javascript:MQTTconnect();"/> 
		<input  type="button" value="Send" onclick="javascript:onSend();"/>
	</body>
</html>
# 发布消息到指定主题
mosquitto_pub -h 172.16.0.12 -t "topic/" -u user -P password  -i "client3" -m "1234567890123456789012345678901234567890"

TCPDUMP抓取的报文如下:

# 点击WEB页面上的Init按钮之后
18:11:49.183098 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [S], seq 3659468570, win 64240, options [mss 1360,nop,wscale 8,nop,nop,sackOK], length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0034 7b4e 4000 7206 65a0 0e96 6cbb ac10  .4{N@.r.e...l...
        0x0020:  000c c737 2329 da1f 0f1a 0000 0000 8002  ...7#)..........
        0x0030:  faf0 797b 0000 0204 0550 0103 0308 0101  ..y{.....P......
        0x0040:  0402                                     ..
18:11:49.183188 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [S.], seq 3730879023, ack 3659468571, win 29200, options [mss 1460,nop,nop,sackOK,nop,wscale 7], length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0034 0000 4000 4006 1357 ac10 000c 0e96  .4..@.@..W......
        0x0020:  6cbb 2329 c737 de60 b22f da1f 0f1b 8012  l.#).7.`./......
        0x0030:  7210 7157 0000 0204 05b4 0101 0402 0103  r.qW............
        0x0040:  0307                                     ..
18:11:49.223085 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 1, win 515, length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0028 7b4f 4000 7206 65ab 0e96 6cbb ac10  .({O@.r.e...l...
        0x0020:  000c c737 2329 da1f 0f1b de60 b230 5010  ...7#).....`.0P.
        0x0030:  0203 2237 0000                           .."7..
18:11:49.247331 IP Browser.50999 > MQTT.etlservicemgr: Flags [P.], seq 1:511, ack 1, win 515, length 510
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0226 7b50 4000 7206 63ac 0e96 6cbb ac10  .&{P@.r.c...l...
        0x0020:  000c c737 2329 da1f 0f1b de60 b230 5018  ...7#).....`.0P.
        0x0030:  0203 d122 0000 4745 5420 2f6d 7174 7420  ..."..GET./mqtt.
        0x0040:  4854 5450 2f31 2e31 0d0a 486f 7374 3a20  HTTP/1.1..Host:.
        0x0050:  3831 2e37 312e 3837 2e33 373a 3930 3031  81.71.87.37:9001
        0x0060:  0d0a 436f 6e6e 6563 7469 6f6e 3a20 5570  ..Connection:.Up
        0x0070:  6772 6164 650d 0a50 7261 676d 613a 206e  grade..Pragma:.n
        0x0080:  6f2d 6361 6368 650d 0a43 6163 6865 2d43  o-cache..Cache-C
        0x0090:  6f6e 7472 6f6c 3a20 6e6f 2d63 6163 6865  ontrol:.no-cache
        0x00a0:  0d0a 5573 6572 2d41 6765 6e74 3a20 4d6f  ..User-Agent:.Mo
        0x00b0:  7a69 6c6c 612f 352e 3020 2857 696e 646f  zilla/5.0.(Windo
        0x00c0:  7773 204e 5420 3130 2e30 3b20 5769 6e36  ws.NT.10.0;.Win6
        0x00d0:  343b 2078 3634 2920 4170 706c 6557 6562  4;.x64).AppleWeb
        0x00e0:  4b69 742f 3533 372e 3336 2028 4b48 544d  Kit/537.36.(KHTM
        0x00f0:  4c2c 206c 696b 6520 4765 636b 6f29 2043  L,.like.Gecko).C
        0x0100:  6872 6f6d 652f 3130 312e 302e 3439 3531  hrome/101.0.4951
        0x0110:  2e36 3720 5361 6661 7269 2f35 3337 2e33  .67.Safari/537.3
        0x0120:  360d 0a55 7067 7261 6465 3a20 7765 6273  6..Upgrade:.webs
        0x0130:  6f63 6b65 740d 0a4f 7269 6769 6e3a 206e  ocket..Origin:.n
        0x0140:  756c 6c0d 0a53 6563 2d57 6562 536f 636b  ull..Sec-WebSock
        0x0150:  6574 2d56 6572 7369 6f6e 3a20 3133 0d0a  et-Version:.13..
        0x0160:  4163 6365 7074 2d45 6e63 6f64 696e 673a  Accept-Encoding:
        0x0170:  2067 7a69 702c 2064 6566 6c61 7465 0d0a  .gzip,.deflate..
        0x0180:  4163 6365 7074 2d4c 616e 6775 6167 653a  Accept-Language:
        0x0190:  207a 682d 434e 2c7a 683b 713d 302e 390d  .zh-CN,zh;q=0.9.
        0x01a0:  0a53 6563 2d57 6562 536f 636b 6574 2d4b  .Sec-WebSocket-K
        0x01b0:  6579 3a20 7938 796a 3756 7858 624b 576f  ey:.y8yj7VxXbKWo
        0x01c0:  4f45 6962 3248 7176 4b51 3d3d 0d0a 5365  OEib2HqvKQ==..Se
        0x01d0:  632d 5765 6253 6f63 6b65 742d 4578 7465  c-WebSocket-Exte
        0x01e0:  6e73 696f 6e73 3a20 7065 726d 6573 7361  nsions:.permessa
        0x01f0:  6765 2d64 6566 6c61 7465 3b20 636c 6965  ge-deflate;.clie
        0x0200:  6e74 5f6d 6178 5f77 696e 646f 775f 6269  nt_max_window_bi
        0x0210:  7473 0d0a 5365 632d 5765 6253 6f63 6b65  ts..Sec-WebSocke
        0x0220:  742d 5072 6f74 6f63 6f6c 3a20 6d71 7474  t-Protocol:.mqtt
        0x0230:  0d0a 0d0a                                ....
18:11:49.247360 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [.], ack 511, win 237, length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0028 653f 4000 4006 ae23 ac10 000c 0e96  .(e?@.@..#......
        0x0020:  6cbb 2329 c737 de60 b230 da1f 1119 5010  l.#).7.`.0....P.
        0x0030:  00ed 214f 0000                           ..!O..
18:11:49.323395 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 1:160, ack 511, win 237, length 159
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  00c7 6540 4000 4006 ad83 ac10 000c 0e96  ..e@@.@.........
        0x0020:  6cbb 2329 c737 de60 b230 da1f 1119 5018  l.#).7.`.0....P.
        0x0030:  00ed df2c 0000 4854 5450 2f31 2e31 2031  ...,..HTTP/1.1.1
        0x0040:  3031 2053 7769 7463 6869 6e67 2050 726f  01.Switching.Pro
        0x0050:  746f 636f 6c73 0d0a 5570 6772 6164 653a  tocols..Upgrade:
        0x0060:  2057 6562 536f 636b 6574 0d0a 436f 6e6e  .WebSocket..Conn
        0x0070:  6563 7469 6f6e 3a20 5570 6772 6164 650d  ection:.Upgrade.
        0x0080:  0a53 6563 2d57 6562 536f 636b 6574 2d41  .Sec-WebSocket-A
        0x0090:  6363 6570 743a 204a 6845 6544 4534 6468  ccept:.JhEeDE4dh
        0x00a0:  6462 2b4b 6a47 4f52 452b 595a 7176 424b  db+KjGORE+YZqvBK
        0x00b0:  2f34 3d0d 0a53 6563 2d57 6562 536f 636b  /4=..Sec-WebSock
        0x00c0:  6574 2d50 726f 746f 636f 6c3a 206d 7174  et-Protocol:.mqt
        0x00d0:  740d 0a0d 0a                             t....
18:11:49.392070 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [P.], seq 511:557, ack 160, win 514, length 46
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0056 7b54 4000 7206 6578 0e96 6cbb ac10  .V{T@.r.ex..l...
        0x0020:  000c c737 2329 da1f 1119 de60 b2cf 5018  ...7#).....`..P.
        0x0030:  0202 7640 0000 82a8 a399 adbd b3bf adb9  ..v@............
        0x0040:  eec8 f9e9 a75b ad81 a391 ced1 cafc c3c9  .....[..........
        0x0050:  cafd adb8 c0fa c3d8 d799 a4de c0f7 c8c9  ................
        0x0060:  93a0 9d8c                                ....
18:11:49.392100 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [.], ack 557, win 237, length 0
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0028 6541 4000 4006 ae21 ac10 000c 0e96  .(eA@.@..!......
        0x0020:  6cbb 2329 c737 de60 b2cf da1f 1147 5010  l.#).7.`.....GP.
        0x0030:  00ed 2082 0000                           ......
18:11:49.392352 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 160:166, ack 557, win 237, length 6
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  002e 6542 4000 4006 ae1a ac10 000c 0e96  ..eB@.@.........
        0x0020:  6cbb 2329 c737 de60 b2cf da1f 1147 5018  l.#).7.`.....GP.
        0x0030:  00ed 7e6d 0000 8204 2002 0000            ..~m........
18:11:49.443110 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 166, win 514, length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0028 7b56 4000 7206 65a4 0e96 6cbb ac10  .({V@.r.e...l...
        0x0020:  000c c737 2329 da1f 1147 de60 b2d5 5010  ...7#)...G.`..P.
        0x0030:  0202 1f67 0000                           ...g..
18:11:49.451571 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [P.], seq 557:577, ack 166, win 514, length 20
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  003c 7b57 4000 7206 658f 0e96 6cbb ac10  .<{W@.r.e...l...
        0x0020:  000c c737 2329 da1f 1147 de60 b2d5 5018  ...7#)...G.`..P.
        0x0030:  0202 5568 0000 828e 2670 37fd a47c 37fc  ..Uh....&p7..|7.
        0x0040:  2677 549e 4815 43d2 0571                 &wT.H.C..q
18:11:49.451667 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 166:173, ack 577, win 237, length 7
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  002f 6543 4000 4006 ae18 ac10 000c 0e96  ./eC@.@.........
        0x0020:  6cbb 2329 c737 de60 b2d5 da1f 115b 5018  l.#).7.`.....[P.
        0x0030:  00ed 0d4f 0000 8205 9003 0001 01         ...O.........
18:11:49.543376 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 173, win 514, length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0028 7b59 4000 7206 65a1 0e96 6cbb ac10  .({Y@.r.e...l...
        0x0020:  000c c737 2329 da1f 115b de60 b2dc 5010  ...7#)...[.`..P.
        0x0030:  0202 1f4c 0000                           ...L..

# 点击Send按钮之后
18:16:59.642995 IP Browser.50999 > MQTT.etlservicemgr: Flags [P.], seq 677:711, ack 249, win 514, length 34
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  004a 7c2e 4000 7206 64aa 0e96 6cbb ac10  .J|.@.r.d...l...
        0x0020:  000c c737 2329 da1f 11bf de60 b328 5018  ...7#).....`.(P.
        0x0030:  0202 736b 0000 829c 69c7 926a 59dd 9266  ..sk....i..jY..f
        0x0040:  0aa4 fc0f 1de8 e71a 46a6 f009 21a2 fe06  ........F...!...
        0x0050:  06e7 c505 1bab f64b                      .......K
18:16:59.643148 IP MQTT.etlservicemgr > Browser.50999: Flags [P.], seq 249:279, ack 711, win 237, length 30
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0046 654b 4000 4006 adf9 ac10 000c 0e96  .FeK@.@.........
        0x0020:  6cbb 2329 c737 de60 b328 da1f 11e1 5018  l.#).7.`.(....P.
        0x0030:  00ed ce09 0000 821c 301a 000c 6363 6e65  ........0...ccne
        0x0040:  742f 7570 2f61 6263 4865 6c6c 6f20 576f  t/up/abcHello.Wo
        0x0050:  726c 6421                                rld!
18:16:59.743220 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 279, win 514, length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0028 7c30 4000 7206 64ca 0e96 6cbb ac10  .(|0@.r.d...l...
        0x0020:  000c c737 2329 da1f 11e1 de60 b346 5010  ...7#).....`.FP.
        0x0030:  0202 1e5c 0000                           ...\..
# 执行发布消息命令之后
17:56:50.992944 IP MQTT.etlservicemgr > Browser.50939: Flags [P.], seq 205:247, ack 675, win 237, length 42
        0x0000:  feee 809f 3247 5254 00e4 a55c 0800 4500  ....2GRT...\..E.
        0x0010:  0052 e2f9 4000 4006 303f ac10 000c 0e96  .R..@.@.0?......
        0x0020:  6cbb 2329 c6fb c3b7 103a 2e40 d9e6 5018  l.#).....:.@..P.
        0x0030:  00ed aaae 0000 8228 3026 0006 6363 6e65  .......(0&..ccne
        0x0040:  742f 3132 3334 3536 3738 3930 3132 3334  t/12345678901234
        0x0050:  3536 3738 3930 3132 3334 3536 3738 3930  5678901234567890
17:56:51.100524 IP 14.150.108.187.50939 > 172.16.0.12.etlservicemgr: Flags [.], ack 247, win 514, length 0
        0x0000:  5254 00e4 a55c feee 809f 3247 0800 4568  RT...\....2G..Eh
        0x0010:  0028 77db 4000 7206 691f 0e96 6cbb ac10  .(w.@.r.i...l...
        0x0020:  000c c6fb 2329 2e40 d9e6 c3b7 1064 5010  ....#).@.....dP.
        0x0030:  0202 bffd 0000                           ......

可看到基于websockets的通讯整体协作图如下:

MQTT在websockets下的工作模


可以看到除了在建立websockets时,有较大的http头信息,其余仍然很精简。而且websocket已经对Browser上送的AUTH、SUB、PUB进行了加密处理,但是对于下发到Browser的信息并没有进行加密处理

使用Mosquitto搭建支持Websockets的MQTT服务

这里使用了mosquitto-2.0.11,下载源码后,修改下config.mk文件,将WITH_WEBSOCKETS打开,如下

# Build with websockets support on the broker.
WITH_WEBSOCKETS:=yes

这里是在centos下工作的,如果没有websockets库,需要安装

yum install libwebsockets* -y

然后就是一路的make,make install,之后配置下LD_LIBRARY_PATH的环境变量,就可以开始工作了。

LD_LIBRARY_PATH=:/root/setup/mosquitto-2.0.11/lib:/root/setup/cJSON-1.7.14

配置mosquitto环境,可建立一个专门的目录,将mosquitto.conf文件拷贝过来修改,目录结构如下:

<root@VM-0-12-centos:/mqtt>tree
.
|-- aclfile    # 访问控制文件
|-- log        # 日志文件目录
|   `-- mosquitto.log
|-- mosquitto  # mqtt运行目录
|   |-- mosquitto.db
|   `-- pid.pid
|-- mosquitto.conf #配置文件!!!IMPORTANT
`-- pwfile # 用户的密码文件
#cat aclfile===========================================================# This affects access control for clients with no username.
topic read $SYS/#

# This only affects clients with username "roger".
user roger
topic foo/bar

# This affects all clients.
pattern write $SYS/broker/connection/%c/state

# Add 20210722
user ccnet
topic read ccnet/#
topic write ccnet/#=========================================================# 创建用户密码文件pwfile
touch pwfile
mosquitto_passwd pwfile user

#cat mosquitto.conf |grep -v "#"=========================================================pid_file /home/release/soft/mqtt/mosquitto/pid.pid
user release
port 1883
listener 9001
protocol websockets
bind_interface eth0
persistence true 
persistence_location /home/release/soft/mqtt/mosquitto
allow_anonymous false
password_file /home/release/soft/mqtt/pwfile 
acl_file  /home/release/soft/mqtt/aclfile 

运行Mosquitto

mosquitto -c mosquitto.conf -d


MQTT有很多实现,Mosquitto的魅力在于小而美。有很多开源的实现,基于nodejs、java等,也有商业版本的,更有一些云服务商直接将之PaaS化了。

例如,阿里云将MQTT定义为这样子。一些公司直接基于mqtt搭建其上的物模型,超赞!


阿里云的MQTT服务

要提示:第二节内容中所有操作使用时需要把对应的产品、设备证书等替换为个人对应的数据。为了方便读者更方便直观的了解协议报文形成过程,数据没有隐藏和加密处理,但请勿直接使用文中数据,造成损失将进行法律追究。

1 MQTT协议概述

1.1 背景概述

MQTT最初是由IBM的Andy Stanford-Clark以及Arcom公司ArlenNipper 为解决石油和天然气管道传感器数据传输问题而创造,他们通过卫星网络传输输油管道传感器数据,当时卫星通信存在费用昂贵、网络中断、低带宽、高延迟等特点,而且传感器数量很多,MQTT就由此而诞生。近年来,物联网发展迅猛,传感器数量爆发式增长,mqtt也被重新拾起。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布,目前最新版本为v3.1.1。MQTT最大的优点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛的应用。

1.2 协议概述

MQTT是一种基于订阅和发布模式的通信方式,通过以主题为核心的方式进行数据或信息的传输。MQTT协议几个重要的关键词:

  • 主题(Topic):附加在应用消息上的一个标签,服务器broker就是根据topic名称与订阅该topic的client进行关联,然后进行消息转发。MQTT的Topic是一种层级结构,用'/'来标识不同的层级。
  • 订阅(Subscription):客户端通过“订阅”某个topic,当这个topic有新的消息时,服务器会自动将消息转发给这个client。
  • 发布(Topic):发布既可以是客户端也可以是服务端,但是最终都是需要服务端broker进行转发。
  • 会话(Session):从客户端向服务端发起MQTT连接请求开始,到连接中断,直到会话过期未知的消息手法序列称之为会话。
  • 负载(Payload):消息订阅者所具体接收的内容。
  • 消息服务质量(QoS):MQTT有三种服务质量:QoS0、QoS1、QoS2,用以标记消息发送的次数。
  • 保留消息 Retained Messages和最后遗嘱 Last Will & Testament

mqtt报文结构

mqtt有发布、订阅、连接、心跳、请求等报文,但基本结构都类似,报文结构如下:

固定报头 +可变报头+有效载荷

  • 固定报头:由消息类型、标志位、剩余长度组成。主要作用是用以标识报文消息类型、服务质量、重发标志、RETAIN、剩余长度等。每个 MQTT 控制报文都必须包含一个固定报头。组成结构如下:

报文类型值有以下几种:

  • 可变报头:主要由报文的一些标识符类型组成,可变报头的内容根据报文类型的不同而不同,不是所有报文都必须有这项。结构如下:不同报文需要可变报头的情况如下:
  • 有效载荷:有效载荷包含将被传输的应用消息。数据的内容和格式是应用特定的,不同的报文类型格式也是不一样的。该字段并非所有报文都需要,常见的报文包含有效载荷情况如下:报文的具体格式不再详细展开,下一节内容中选几种报文并结合实际来了解,官方协议文档:中文版:https://note.youdao.com/s/K5dOMTUC英文版:https://note.youdao.com/s/1QLq6sfU

2 MQTT 协议详解与IOT简单应用

2.1 准备工作

本节中使用阿里云的物联网平台,结合一下调试工具来实现通过MQTT的通信等操作。需要具备以下环境:

1、申请注册阿里云物联网平台,并创建产品、设备,参考阿里云操作手册即可:https://help.aliyun.com/document_detail/30528.html

2、(可选)linux系统环境

3、工具软件:mqtt.fx、网络调试助手

2.2 基于阿里云平台的IOT应用体验

阿里云提供全面的物联网连接服务,可以在其物联网平台做一些简单的入门测试,官方有详细的操作文档,不再赘述,参考操作指导:https://help.aliyun.com/document_detail/73708.html

以下为部分测试效果图:

阿里云物联网平台在线调试

客户端SDK demo程序运行

结合MQTT.fx测试

结合MQTT.fx测试

2.3 MQTT部分报文协议详解与测试

为接近实际应用,本小节通过网络调试助手和阿里云平台来更加详细的分析MQTT的几个重要报文。报文内容使用16进制方式展示,需要特别注意一下报文内容有些采用UTF-8编码,以及剩余长度的编码格式。

UTF-8 编码字符对 ASCII 字符的编码做了优化。每一个字符串都有两字节的长度字段作为前缀,它标识了这个字符串 UTF-8 编码的字节数。

剩余长度(Remaining Length)表示当前报文剩余部分的字节数,包括可变报头和负载的数据。剩余长度 不包括用于编码剩余长度字段本身的字节数。剩余长度字段使用一个变长度编码方案,对小于 128 的值它使用单字节编码。更大的值按下面的方式处理。低 7 位有效位用于编码数据,最高有效位用于指示是否有更多的字节。因此每个字节可以编码 128 个数值和一个 延续位( continuation bit ) 。剩余长度字段最大 4 个字节。

2.3.1 CONNECT-连接报文

  • MQTT连接报文格式:固定报头+可变报头+有效载荷 固定报头:报文类型+保留位置+剩余长度 固定报头共两个字节,如下图: 根据协议说明,得到连接报文固定报头为: 10 ?? ??表示剩余长度,留到最后再来填写。
  • 可变报头:协议名称+协议级别+连接标志+保存连接 可变报头共10个字节,如下图: 根据协议说明,得到连接报文可变报头为: 00 04 4D 51 54 54 04 C2 00 64
  • 有效载荷:客户端标识+用户名称+密码 各个部分格式如下: ? 客户端ID:|securemode=3,signmethod=hmacsha1| ? 另外一种客户端格式#.|securemode=3,signmethod=hmacsha1|,timestamp=2524608000000| ? 用户名:&# ? 密码(需要经过哈希加密):clientIddeviceName*productKeygnuyAXcWtth ? *:代码设备名称 #:代表设备证书 ProductKey

得到连接报文的 有效载荷 过程如下: ? 1、代替 ? 客户端ID:IOT_F1-01|securemode=3,signmethod=hmacsha1| ? 用户名:IOT_F1-01&gnuyAXcWtth ? 密码:clientIdIOT_F1-01deviceNameIOT_F1-01productKeygnuyAXcWtth

        2、转16进制
        客户端ID:49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C
        用户名:49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68
        密码(哈希加密):a73628d4a216e570bdd3997767e827ef4ae836e7
        密码:61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37

3、utf-8编码(加两个字节的前缀表示长度) ? 客户端ID:00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C ? 用户名:00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 ? 密码:00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37 ? 最终有效载荷: 00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C 00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37

计算得到可变报头加有效载荷的长度为120(0x78),把固定报头的剩余长度补充得到最终 连接报文: 10 78 00 04 4D 51 54 54 04 C2 00 64 00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C 00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37

打开软件调试助手,以TCP CLIENT的模式连接阿里云平台,其目的IP、端口为阿里云的物联网平台连接参数,可以在平台设备的mqtt连接参数中找到地址和端口参数。把这个地址填到网络调试助手连接即可。 ? 连接后以hex方式发送上面得到的连接报文内容,可以看到已经收到了服务器返回的信息。


2.3.2 CONNACK-连接确认报文(服务端下发)

连接确认报文格式:固定报头+可变报头 上述测试中,服务器返回20 02 00 00,查询MQTT协议的连接确认报文可知道服务器接受了连接。 报文格式如下:



2.3.3 发布消息(PUBLISH)和发布确认(PUBACK)

1、发布消息 发布消息的报文格式为:固定报头+可变报头+有效载荷+响应+动作

  • 固定报头有两个字节,定义如下: 其中QoS等级取值如下: 根据协议描述,把DUP置为0、RETAIN置为1,分别表示这是第一次请求发送publish报文,而不是重发,RETAIN被设置为1,表示服务端必须存储这个应用消息和它的服务质量等级(QoS),以便它可以被分发给未来的主题名匹配的订阅者;另外取QoS为0。 由此得到需要发送的固定报头为:31 ?? (??表示剩余长度)
  • 可变报头: 可变报头格式如下:主题名(topic)+报文标识 需要注意的是报文标识只有在服务质量等级大于0才使用。

主题在这里使用阿里云产品中的物模型topic,也可以使用自定义主题等。

(1)把deviceName替换为设备名称,得到可变报头内容为: /sys/gnuyAXcWtth/IOT_F1-01/thing/event/property/post 转为16进制并左utf-8编码得到可变报头为: 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74

  • 有效载荷:有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。 在阿里云中设备的的物理模型数据有运行状态、温度、湿度,所以测试的有效载荷就发送这几个数据。 1、有效载荷内容: {"method":"thing.service.property.post","id":"00000001","params":{"RunningState":1,"Humidity":85,"temperature":26.6,},"version":"1.0.0"}

转16进制: 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D

(2)这里使用QoS为0,所以就不包含响应和动作两个字段,得到 固定报头+可变报头+有效载荷 内容: 31 ?? 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D 求得剩余长度为:190(0xBE) 注意剩余长度的表示规则,按规则处理后表示为:0xBE 0x01。

(3)最终得到的发布报文为: 31 BE 01 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D

(4)用网络助手向服务器发送报文后,可以看到服务器有回复,在阿里云平台看到了刷新的设置数据。


2、发布确认 报文格式:固定报头+可变报头 只有QoS等级为1时才会有发布确认报文,其构成如下,本次测试中暂时不用。


2.3.4SUBSCRIBE - 订阅主题

订阅报文格式:固定报头+可变报头+有效载荷+响应

  • 固定报头:报文类型+保留位+剩余长度 容易得到订阅报文的固定报头为:82 ??
  • 可变报头:为客户端标识符,标识符为10的时如图: 订阅报文的可变报头为:00 0A
  • 有效载荷: 主题过滤器+服务质量+保留位 主题过滤器主要用于向服务器说明客户端要订阅的主题,这里使用阿里云已创建设备的主题。 ${deviceName}替换为设备名称,即主题过滤器内容为: /sys/gnuyAXcWtth/IOT_F1-01/thing/service/property/set

转为16进制:2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 Mqtt协议要求主题过滤器要采用 UTF-8编码,所以得到最终内容为: 00 35 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 服务质量要求设置为:QoS0 ,因此取值为 00 得到最终的有效载荷为: 00 35 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 00

  • 整合订阅报文:固定报头+可变报头+有效载荷 82 ?? 00 0A 00 35 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 00 计算得剩余长度:58(0x3A),最终得到的订阅报文为: 82 3A 00 0A 00 35 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 00

最后把得到的报文通过网络调试助手发送给服务器,得到服务器的返回信息如下:


2.3.5 SUBACK-订阅确认(服务器下发)

订阅确认报文格式:固定报头+可变报头+有效载荷 上述测试中,发送订阅报文后服务器返回 90 03 00 0A 01 根据确认报文格式可知,已经订阅成功。


  • 确认报文格式如下

2.4 两个IOT设备之间的通信测试

提前在该产品下创建好第两个设备。

1、自定义设备的可订阅和发布主题

首先看到设备1发布主题,但设备2是接收不到设备1的消息的。


2、设备2订阅自定义的主题:/gnuyAXcWtth/IOT_F1-02/user/post/info,订阅成功后可在设备2的界面看到已订阅主题列表中多了该topic。


3、设备之间需要通信,则需要通过云平台对消息进行转发,所以需要在云平台做流转配置,根据文档建立流转解析器,然后启动运行解析器即可。参考文档: https://help.aliyun.com/document_detail/68677.html



4、设备1用MQTT.fx连接平台,而设备2则用网络调试助手与平台建立好连接。用MQTT.fx发布设备1的主题”/gnuyAXcWtth/IOT_F1-02/user/post/info”消息,可以看到设备2已经收到了设备1发布的”/gnuyAXcWtth/IOT_F1-02/user/post/info”消息。