言: MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。
开发环境:
VS2022 + .NET 6 + Webapi / 控制台
1、新建一个webapi项目,用来后面做测试使用
2、新建一个继承自IHostedService的服务,用于随着webapi程序的启动而自动执行。(最终代码在文末)
3、引入 MQTTNet 包,该项目提供了.net环境下的MQTT通信协议支持,这款框架很优秀,此处直接引用它来进行使用。
4、在上面的MqttHostService类里面,开始方法里面新增初始化MQTT服务端的一些功能,例如 IP、端口号、事件等等。
5、mqtt服务端支持的一系列功能很多,大佬们可以自行去尝试一些新发现,此处只使用若干个简单功能。
6、添加客户端连接事件、连接关闭事件
7、由于事件要用的可能有点多,此处就不一一例举了,可以直接看以下的代码,以及有关注释来理解。
8、事件触发时候,打印输出
9、输出之前,记录一个当前事件名称标记一下,用于可以更加清楚看出是哪个事件输出的。
10、对MqttHostService类进行注册,用于程序启动时候跟随启动。
11、上面貌似设计的不是很友好,所以把mqtt服务实例单独弄出来,写入到单独的类里面做成属性,供方便调用。
12、把先前的一些东西改一下,换成使用上面步骤的属性来直接调用使用。
13、运行一下,看看是否可以成功,显示服务已启动,说明服务启动时OK的了.
14、新增一个控制台程序 MqttClient,用于模拟客户端。
15、创建客户端启动以及有关配置信息和有关事件,如图。具体使用可以看代码注释,就不过多解释了。
16、在program类里面,调用客户端启动方法,用于测试使用。
17、上面客户端对应的三个事件的实现如图,同时进行有关信息的打印输出。
18、启动服务端,然后启动客户端,可以看到服务端有一个连接失败的消息,这个是因为上面配置的客户端用户名是admin,密码是1234567,而服务端配置的规则是,用户名是admin 密码是123456
19、密码改回正常匹配项以后,再重新运行试试看,可以看到客户端与服务端连接上了。
20、如果关闭客户端,也可以看到服务端会进入客户端关闭事件内。
21、把上面主题订阅的内容写到连接成功以后的事件里面,不然客户端连接期间,可能就执行了主题订阅,会存在订阅失败的情况。改为写入到连接成功以后的事件里面,可以保证主题订阅肯定是在客户端连接成功以后才执行的。
22、接下来测试服务端消息推送,在MqttService服务里面,新增一个方法,用来执行mqtt服务端发布主题消息使用。有关配置信息和消息格式,如图所示。
23、新增一个API控制器,用来测试使用。API参数直接拿来进行消息的推送使用。
24、运行服务端和客户端,并访问刚刚新增的api接口,手动随意输入一条消息,可以看到客户端订阅的主题消息已经被实时接收到了。
25、接下来对客户端新增一个消息推送的方法,用来测试客户端消息发布的功能。有关消息格式和调用,如图所示,以及注释部分的说明。
26、客户端program类里面,客户端连接以后,通过手动回车,来执行客户端发布消息。
27、再次启动服务端和客户端
28、然后客户端内按一下回车,执行消息发布功能。可以看到,服务端成功接收到了客户端发过来的主题消息。
29、接下来测试客户端与客户端之间的消息发布与订阅,为了模拟多客户端效果,把上面客户端已经编译好的文件拷贝一份出来。
30、然后本地的代码进行一些修改,用来当做第二个客户端程序。所以客户端id也进行变更为 testclient02
31、对客户端订阅的主题,也改成 topic_02
32、启动服务端,以及拷贝出来的客户端1,和上面修改了部分代码的客户端2,保证都已经连接上服务端。
33、调用服务端的api接口,由于服务端发布的消息是发布给topic_01的,所以只有客户端1可以接收到消息。
34、客户端1执行回车,用于发布一段消息给主题 topic_02,可以看到客户端01发布的消息,同时被服务端和客户端02接收到了。因为服务端是总指挥,所以客户端发布的消息都会经过服务端,从而服务端都可以接收到连接的客户端发布的所有消息。
35、测试数据保持,下面先对客户端1进行断开,然后再重新连接客户端1,可以看到客户端1直接接收到了它订阅的主题的上一次最新的消息内容,这个就是消息里面,Retain属性设为True的结果,用于让服务端记忆该主题消息使用的。如果设为false,就没有这个效果了,大佬们也可以自己尝试。
36、最终的服务端代码:
MqttHostService:
public class MqttHostService : IHostedService, IDisposable
{
public void Dispose()
{
}
const string ServerClientId="SERVER";
public Task StartAsync(CancellationToken cancellationToken)
{
MqttServerOptionsBuilder optionsBuilder=new MqttServerOptionsBuilder();
optionsBuilder.WithDefaultEndpoint();
optionsBuilder.WithDefaultEndpointPort(10086); // 设置 服务端 端口号
optionsBuilder.WithConnectionBacklog(1000); // 最大连接数
MqttServerOptions options=optionsBuilder.Build();
MqttService._mqttServer=new MqttFactory().CreateMqttServer(options);
MqttService._mqttServer.ClientConnectedAsync +=_mqttServer_ClientConnectedAsync; //客户端连接事件
MqttService._mqttServer.ClientDisconnectedAsync +=_mqttServer_ClientDisconnectedAsync; // 客户端关闭事件
MqttService._mqttServer.ApplicationMessageNotConsumedAsync +=_mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件
MqttService._mqttServer.ClientSubscribedTopicAsync +=_mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件
MqttService._mqttServer.ClientUnsubscribedTopicAsync +=_mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件
MqttService._mqttServer.StartedAsync +=_mqttServer_StartedAsync; // 启动后事件
MqttService._mqttServer.StoppedAsync +=_mqttServer_StoppedAsync; // 关闭后事件
MqttService._mqttServer.InterceptingPublishAsync +=_mqttServer_InterceptingPublishAsync; // 消息接收事件
MqttService._mqttServer.ValidatingConnectionAsync +=_mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关
MqttService._mqttServer.StartAsync();
return Task.CompletedTask;
}
/// <summary>
/// 客户端订阅主题事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientSubscribedTopicAsync:客户端ID=【{arg.ClientId}】订阅的主题=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 关闭后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StoppedAsync(EventArgs arg)
{
Console.WriteLine($"StoppedAsync:MQTT服务已关闭……");
return Task.CompletedTask;
}
/// <summary>
/// 用户名和密码验证有关
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
arg.ReasonCode=MqttConnectReasonCode.Success;
if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456")
{
arg.ReasonCode=MqttConnectReasonCode.Banned;
Console.WriteLine($"ValidatingConnectionAsync:客户端ID=【{arg.ClientId}】用户名或密码验证错误 ");
}
return Task.CompletedTask;
}
/// <summary>
/// 消息接收事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
if (string.Equals(arg.ClientId, ServerClientId))
{
return Task.CompletedTask;
}
Console.WriteLine($"InterceptingPublishAsync:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 启动后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StartedAsync(EventArgs arg)
{
Console.WriteLine($"StartedAsync:MQTT服务已启动……");
return Task.CompletedTask;
}
/// <summary>
/// 客户端取消订阅事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientUnsubscribedTopicAsync:客户端ID=【{arg.ClientId}】已取消订阅的主题=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageNotConsumedAsync:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 客户端断开时候触发
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.WriteLine($"ClientDisconnectedAsync:客户端ID=【{arg.ClientId}】已断开, 地址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 客户端连接时候触发
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.WriteLine($"ClientConnectedAsync:客户端ID=【{arg.ClientId}】已连接, 用户名=【{arg.UserName}】地址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
MqttService:
public class MqttService
{
public static MqttServer _mqttServer { get; set; }
public static void PublishData(string data)
{
var message=new MqttApplicationMessage
{
Topic="topic_01",
Payload=Encoding.Default.GetBytes(data),
QualityOfServiceLevel=MqttQualityOfServiceLevel.AtLeastOnce,
Retain=true // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
};
_mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01的客户端
{
SenderClientId="Server_01"
}).GetAwaiter().GetResult();
}
}
37、最终的客户端代码:
MqttClientService:
public class MqttClientService
{
public static IMqttClient _mqttClient;
public void MqttClientStart()
{
var optionsBuilder=new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1", 10086) // 要访问的mqtt服务端的 ip 和 端口号
.WithCredentials("admin", "123456") // 要访问的mqtt服务端的用户名和密码
.WithClientId("testclient02") // 设置客户端id
.WithCleanSession()
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls=false // 是否使用 tls加密
});
var clientOptions=optionsBuilder.Build();
_mqttClient=new MqttFactory().CreateMqttClient();
_mqttClient.ConnectedAsync +=_mqttClient_ConnectedAsync; // 客户端连接成功事件
_mqttClient.DisconnectedAsync +=_mqttClient_DisconnectedAsync; // 客户端连接关闭事件
_mqttClient.ApplicationMessageReceivedAsync +=_mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件
_mqttClient.ConnectAsync(clientOptions);
}
/// <summary>
/// 客户端连接关闭事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine($"客户端已断开与服务端的连接……");
return Task.CompletedTask;
}
/// <summary>
/// 客户端连接成功事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine($"客户端已连接服务端……");
// 订阅消息主题
// MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
// 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
// 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
_mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce);
return Task.CompletedTask;
}
/// <summary>
/// 收到消息事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
public void Publish(string data)
{
var message=new MqttApplicationMessage
{
Topic="topic_02",
Payload=Encoding.Default.GetBytes(data),
QualityOfServiceLevel=MqttQualityOfServiceLevel.AtLeastOnce,
Retain=true // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
};
_mqttClient.PublishAsync(message);
}
}
38、后记:MQTT以上演示已经完毕,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加灵活。其实,实际上MQTT的客户端在现实生产环境场景下,并不需要咱们开发者进行开发,很多硬件设备都支持提供MQTT协议的通信客户端,所以只需要自己搭建一个服务端,就可以实现实时监控各种设备推送过来的各种信号数据。同时客户端支持发布消息给其他客户端,所以就实现了设备与设备之间的一对一信号通信的效果了。如果需要下发信号给硬件设备,MQTT服务端也可以直接下发给某个指定设备来进行实现即可。上面案例只提供入门方案,如果有感兴趣的大佬,可以自己去拓展一下,来达到更好的效果。
原文:https://www.cnblogs.com/weskynet/p/16441219.html
发部署在云端的设备接入网关服务就不得不提到MQTT,使用MQTT不论是从设备到设备,还是设备到云端服务的双向通讯,都可以获得较好的支持。
MQTT的起源和我的理解
这里基于mosquitto,以一组实际的订阅、发布,使用tcpdump来观察MQTT的通讯。
# 使用tcpdump打印指定MQTT服务器所在端口的报文
tcpdump -n -XX -i eth0 port 1883
# 订阅指定主题的消息
mosquitto_sub -h 172.16.0.12 -t "topic/#" -u user -P password -i "client1"
# 发布消息到指定主题
mosquitto_pub -h 172.16.0.12 -t "topic/" -u user -P password -i "client3" -m "1234567890123456789012345678901234567890"
tcpdump抓取的报文如下:
启动mosquitto_sub命令获得如下报文:==================================16:33:28.668926 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [S], seq 1177792969, win 29200, options [mss 1424,sackOK,TS val 530570732 ecr 0,nop,wscale 7], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 003c 5872 4000 3806 556b 5147 9713 ac10 .<Xr@.8.UkQG....
0x0020: 000c a05a 075b 4633 b1c9 0000 0000 a002 ...Z.[F3........
0x0030: 7210 a45e 0000 0204 0590 0402 080a 1f9f r..^............
0x0040: ddec 0000 0000 0103 0307 ..........
16:33:28.669001 IP MQTT.ibm-mqisdp >SUB.41050: Flags [S.], seq 1053629863, ack 1177792970, win 28960, options [mss 1460,sackOK,TS val 3885354501 ecr 530570732,nop,wscale 7], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 003c 0000 4000 4006 a645 ac10 000c 5147 .<..@.@..E....QG
0x0020: 9713 075b a05a 3ecd 1da7 4633 b1ca a012 ...[.Z>...F3....
0x0030: 7120 9309 0000 0204 05b4 0402 080a e795 q...............
0x0040: ce05 1f9f ddec 0103 0307 ..........
16:33:28.672229 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 1, win 229, options [nop,nop,TS val 530570735 ecr 3885354501], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5873 4000 3806 5572 5147 9713 ac10 .4Xs@.8.UrQG....
0x0020: 000c a05a 075b 4633 b1ca 3ecd 1da8 8010 ...Z.[F3..>.....
0x0030: 00e5 320e 0000 0101 080a 1f9f ddef e795 ..2.............
0x0040: ce05 ..
16:33:28.672280 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [P.], seq 1:42, ack 1, win 229, options [nop,nop,TS val 530570735 ecr 3885354501], length 41
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 005d 5874 4000 3806 5548 5147 9713 ac10 .]Xt@.8.UHQG....
0x0020: 000c a05a 075b 4633 b1ca 3ecd 1da8 8018 ...Z.[F3..>.....
0x0030: 00e5 37b7 0000 0101 080a 1f9f ddef e795 ..7.............
0x0040: ce05 1027 0006 4d51 4973 6470 03c2 003c ...'..MQIsdp...<
0x0050: 0007 636c 6965 6e74 3100 0563 636e 6574 ..client1..ccnet
0x0060: 0009 6363 6e65 7430 3930 31 ..ccnet0901
16:33:28.672313 IP MQTT.ibm-mqisdp > SUB.41050: Flags [.], ack 42, win 227, options [nop,nop,TS val 3885354505 ecr 530570735], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 d3a1 4000 4006 d2ab ac10 000c 5147 .4..@.@.......QG
0x0020: 9713 075b a05a 3ecd 1da8 4633 b1f3 8010 ...[.Z>...F3....
0x0030: 00e3 31e3 0000 0101 080a e795 ce09 1f9f ..1.............
0x0040: ddef ..
16:33:28.672562 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 1:5, ack 42, win 227, options [nop,nop,TS val 3885354505 ecr 530570735], length 4
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0038 d3a2 4000 4006 d2a6 ac10 000c 5147 .8..@.@.......QG
0x0020: 9713 075b a05a 3ecd 1da8 4633 b1f3 8018 ...[.Z>...F3....
0x0030: 00e3 11d5 0000 0101 080a e795 ce09 1f9f ................
0x0040: ddef 2002 0000 ......
16:33:28.675753 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 5, win 229, options [nop,nop,TS val 530570739 ecr 3885354505], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5875 4000 3806 5570 5147 9713 ac10 .4Xu@.8.UpQG....
0x0020: 000c a05a 075b 4633 b1f3 3ecd 1dac 8010 ...Z.[F3..>.....
0x0030: 00e5 31d9 0000 0101 080a 1f9f ddf3 e795 ..1.............
0x0040: ce09 ..
16:33:28.675772 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [P.], seq 42:56, ack 5, win 229, options [nop,nop,TS val 530570739 ecr 3885354505], length 14
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0042 5876 4000 3806 5561 5147 9713 ac10 .BXv@.8.UaQG....
0x0020: 000c a05a 075b 4633 b1f3 3ecd 1dac 8018 ...Z.[F3..>.....
0x0030: 00e5 46b6 0000 0101 080a 1f9f ddf3 e795 ..F.............
0x0040: ce09 820c 0001 0007 6363 6e65 742f 2300 ........ccnet/#.
16:33:28.675823 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 5:10, ack 56, win 227, options [nop,nop,TS val 3885354508 ecr 530570739], length 5
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0039 d3a3 4000 4006 d2a4 ac10 000c 5147 .9..@.@.......QG
0x0020: 9713 075b a05a 3ecd 1dac 4633 b201 8018 ...[.Z>...F3....
0x0030: 00e3 a1b8 0000 0101 080a e795 ce0c 1f9f ................
0x0040: ddf3 9003 0001 00 .......
16:33:28.719501 IP SUB.41050 > MQTT.ibm-mqisdp: Flags [.], ack 10, win 229, options [nop,nop,TS val 530570783 ecr 3885354508], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5877 4000 3806 556e 5147 9713 ac10 .4Xw@.8.UnQG....
0x0020: 000c a05a 075b 4633 b201 3ecd 1db1 8010 ...Z.[F3..>.....
0x0030: 00e5 3197 0000 0101 080a 1f9f de1f e795 ..1.............
0x0040: ce0c ..
终止mosquitto_sub命令获得如下报文:=================================16:29:54.745664 IP 81.71.151.19.40510 > 172.16.0.12.ibm-mqisdp: Flags [F.], seq 58, ack 12, win 229, options [nop,nop,TS val 530356809 ecr 3885119146], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 e1fc 4000 3806 cbe8 5147 9713 ac10 .4..@.8...QG....
0x0020: 000c 9e3e 075b 0e6c ea6f 989b b7f8 8011 ...>.[.l.o......
0x0030: 00e5 1a34 0000 0101 080a 1f9c 9a49 e792 ...4.........I..
0x0040: 36aa 6.
16:29:54.745761 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.40510: Flags [F.], seq 12, ack 59, win 227, options [nop,nop,TS val 3885140578 ecr 530356809], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 fbe4 4000 4006 aa68 ac10 000c 5147 .4..@.@..h....QG
0x0020: 9713 075b 9e3e 989b b7f8 0e6c ea70 8011 ...[.>.....l.p..
0x0030: 00e3 c67c 0000 0101 080a e792 8a62 1f9c ...|.........b..
0x0040: 9a49 .I
16:29:54.751445 IP 81.71.151.19.40510 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 13, win 229, options [nop,nop,TS val 530356812 ecr 3885140578], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 e1fd 4000 3806 cbe7 5147 9713 ac10 .4..@.8...QG....
0x0020: 000c 9e3e 075b 0e6c ea70 989b b7f9 8010 ...>.[.l.p......
0x0030: 00e5 c677 0000 0101 080a 1f9c 9a4c e792 ...w.........L..
0x0040: 8a62 .b
# 执行mosquitto_pub后得到的报文如下:
16:36:40.721513 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [S], seq 2807988728, win 29200, options [mss 1424,sackOK,TS val 530762785 ecr 0,nop,wscale 7], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 003c 5dd0 4000 3806 500d 5147 9713 ac10 .<].@.8.P.QG....
0x0020: 000c a1ae 075b a75e 81f8 0000 0000 a002 .....[.^........
0x0030: 7210 8378 0000 0204 0590 0402 080a 1fa2 r..x............
0x0040: cc21 0000 0000 0103 0307 .!........
16:36:40.721585 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [S.], seq 4236199326, ack 2807988729, win 28960, options [mss 1460,sackOK,TS val 3885546554 ecr 530762785,nop,wscale 7], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 003c 0000 4000 4006 a645 ac10 000c 5147 .<..@.@..E....QG
0x0020: 9713 075b a1ae fc7f 459e a75e 81f9 a012 ...[....E..^....
0x0030: 7120 9e41 0000 0204 05b4 0402 080a e798 q..A............
0x0040: bc3a 1fa2 cc21 0103 0307 .:...!....
16:36:40.724732 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 1, win 229, options [nop,nop,TS val 530762788 ecr 3885546554], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5dd1 4000 3806 5014 5147 9713 ac10 .4].@.8.P.QG....
0x0020: 000c a1ae 075b a75e 81f9 fc7f 459f 8010 .....[.^....E...
0x0030: 00e5 3d46 0000 0101 080a 1fa2 cc24 e798 ..=F.........$..
0x0040: bc3a .:
16:36:40.724750 IP PUB.41390 > MQTT.ibm-mqisdp: Flags [P.], seq 1:42, ack 1, win 229, options [nop,nop,TS val 530762788 ecr 3885546554], length 41
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 005d 5dd2 4000 3806 4fea 5147 9713 ac10 .]].@.8.O.QG....
0x0020: 000c a1ae 075b a75e 81f9 fc7f 459f 8018 .....[.^....E...
0x0030: 00e5 40ef 0000 0101 080a 1fa2 cc24 e798 ..@..........$..
0x0040: bc3a 1027 0006 4d51 4973 6470 03c2 003c .:.'..MQIsdp...<
0x0050: 0007 636c 6965 6e74 3300 0563 636e 6574 ..client3..ccnet
0x0060: 0009 6363 6e65 7430 3930 31 ..ccnet0901
16:36:40.724787 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [.], ack 42, win 227, options [nop,nop,TS val 3885546557 ecr 530762788], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 f9a2 4000 4006 acaa ac10 000c 5147 .4..@.@.......QG
0x0020: 9713 075b a1ae fc7f 459f a75e 8222 8010 ...[....E..^."..
0x0030: 00e3 3d1c 0000 0101 080a e798 bc3d 1fa2 ..=..........=..
0x0040: cc24 .$
16:36:40.724978 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [P.], seq 1:5, ack 42, win 227, options [nop,nop,TS val 3885546557 ecr 530762788], length 4
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0038 f9a3 4000 4006 aca5 ac10 000c 5147 .8..@.@.......QG
0x0020: 9713 075b a1ae fc7f 459f a75e 8222 8018 ...[....E..^."..
0x0030: 00e3 1d0e 0000 0101 080a e798 bc3d 1fa2 .............=..
0x0040: cc24 2002 0000 .$....
16:36:40.728114 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5dd3 4000 3806 5012 5147 9713 ac10 .4].@.8.P.QG....
0x0020: 000c a1ae 075b a75e 8222 fc7f 45a3 8010 .....[.^."..E...
0x0030: 00e5 3d13 0000 0101 080a 1fa2 cc27 e798 ..=..........'..
0x0040: bc3d .=16:36:40.728144 IP PUB.41390 > MQTT.ibm-mqisdp: Flags [P.], seq 42:82, ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 40
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 005c 5dd4 4000 3806 4fe9 5147 9713 ac10 .\].@.8.O.QG....
0x0020: 000c a1ae 075b a75e 8222 fc7f 45a3 8018 .....[.^."..E...
0x0030: 00e5 a8af 0000 0101 080a 1fa2 cc27 e798 .............'..
0x0040: bc3d 3026 0006 6363 6e65 742f 3132 3334 .=0&..ccnet/1234
0x0050: 3536 3738 3930 3132 3334 3536 3738 3930 5678901234567890
0x0060: 3132 3334 3536 3738 3930 1234567890
16:36:40.728146 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [FP.], seq 82:84, ack 5, win 229, options [nop,nop,TS val 530762791 ecr 3885546557], length 2
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0036 5dd5 4000 3806 500e 5147 9713 ac10 .6].@.8.P.QG....
0x0020: 000c a1ae 075b a75e 824a fc7f 45a3 8019 .....[.^.J..E...
0x0030: 00e5 5cdf 0000 0101 080a 1fa2 cc27 e798 ..\..........'..
0x0040: bc3d e000 .=..
16:36:40.728230 IP MQTT.ibm-mqisdp > SUB.41050: Flags [P.], seq 96:136, ack 62, win 227, options [nop,nop,TS val 3885546560 ecr 530753903], length 40
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 005c d3a9 4000 4006 d27b ac10 000c 5147 .\..@.@..{....QG
0x0020: 9713 075b a05a 3ecd 1e07 4633 b207 8018 ...[.Z>...F3....
0x0030: 00e3 e34f 0000 0101 080a e798 bc40 1fa2 ...O.........@..
0x0040: a96f 3026 0006 6363 6e65 742f 3132 3334 .o0&..ccnet/1234
0x0050: 3536 3738 3930 3132 3334 3536 3738 3930 5678901234567890
0x0060: 3132 3334 3536 3738 3930 1234567890
16:36:40.728288 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [.], ack 85, win 227, options [nop,nop,TS val 3885546561 ecr 530762791], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 f9a4 4000 4006 aca8 ac10 000c 5147 .4..@.@.......QG
0x0020: 9713 075b a1ae fc7f 45a3 a75e 824d 8010 ...[....E..^.M..
0x0030: 00e3 3ce6 0000 0101 080a e798 bc41 1fa2 ..<..........A..
0x0040: cc27 .'
16:36:40.728367 IP 172.16.0.12.ibm-mqisdp > 81.71.151.19.41390: Flags [F.], seq 5, ack 85, win 227, options [nop,nop,TS val 3885546561 ecr 530762791], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 f9a5 4000 4006 aca7 ac10 000c 5147 .4..@.@.......QG
0x0020: 9713 075b a1ae fc7f 45a3 a75e 824d 8011 ...[....E..^.M..
0x0030: 00e3 3ce5 0000 0101 080a e798 bc41 1fa2 ..<..........A..
0x0040: cc27 .'
16:36:40.731431 IP 81.71.151.19.41050 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 136, win 229, options [nop,nop,TS val 530762794 ecr 3885546560], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 5880 4000 3806 5565 5147 9713 ac10 .4X.@.8.UeQG....
0x0020: 000c a05a 075b 4633 b207 3ecd 1e2f 8010 ...Z.[F3..>../..
0x0030: 00e5 54ce 0000 0101 080a 1fa2 cc2a e798 ..T..........*..
0x0040: bc40 .@
16:36:40.731575 IP 81.71.151.19.41390 > 172.16.0.12.ibm-mqisdp: Flags [.], ack 6, win 229, options [nop,nop,TS val 530762795 ecr 3885546561], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 0000 4000 3806 ade5 5147 9713 ac10 .4..@.8...QG....
0x0020: 000c a1ae 075b a75e 824d fc7f 45a4 8010 .....[.^.M..E...
0x0030: 00e5 3cdf 0000 0101 080a 1fa2 cc2b e798 ..<..........+..
0x0040: bc41 .A
去掉TCP握手和收报报文的常规交互,可知道MQTT订阅发布的完整时序如下:
MQTT发布订阅的实际交互时序图
可以看到AUHT、SUB、PUB、SEND_MESSAGE的报文非常精简,基本只是对必须传送的字段增加了两个字节的长度,因此其对带宽是非常节省的。
# 启动对9001端口的监视
tcpdump -n -XX -i eth0 port 9001
# 订阅指定主题的消息
mosquitto_sub -h 172.16.0.12 -t "topic/#" -u user -P password -i "client1"
#编写一个基于websockets访问mqtt的页面,如下亲测通过; 点击init按钮
<html>
<head>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.2/mqttws31.js"></script>
</head>
<body>
<script>
var mqtt;
var host='81.71.87.37';
var port=9001;
// onConnect 事件
function onConnect() {
console.log('connected.');
/*
var raw_message='Hello World!';
message=new Paho.MQTT.Message(raw_message);
message.destinationName='ccnet/up/abc';
console.log('sending message: ' + raw_message );
mqtt.send(message);
*/
// 订阅 download topic
var subOptions={
qos: 1,
onSuccess: onSubscribe
};
mqtt.subscribe('ccnet/#', subOptions);
}
function onSend() {
console.log('onSend.');
var raw_message='Hello World!';
message=new Paho.MQTT.Message(raw_message);
message.destinationName='ccnet/up/abc';
console.log('sending message: ' + raw_message );
mqtt.send(message);
}
// 订阅主题成功事件
function onSubscribe(context) {
console.log('subscribe success');
console.log(context);
}
// 连接失败事件
function onFailure(message) {
console.log('connect failed.');
}
// onMessageArrived 事件
function onMessageArrived(message) {
console.log('new message arrived...');
console.log(message.payloadString);
}
// 建立 MQTT websocket 连接
function MQTTconnect() {
console.log('connecting to ' + host + ':' + port);
mqtt=new Paho.MQTT.Client(host, port, 'clientid');
var options={
timeout: 3,
onSuccess: onConnect,
onFailure: onFailure,
userName: 'ccnet',
password: 'ccnet0901',
mqttVersion: 4
};
mqtt.onMessageArrived=onMessageArrived;
mqtt.connect(options);
}
</script>
<input type="button" value="Init" onclick="javascript:MQTTconnect();"/>
<input type="button" value="Send" onclick="javascript:onSend();"/>
</body>
</html>
# 发布消息到指定主题
mosquitto_pub -h 172.16.0.12 -t "topic/" -u user -P password -i "client3" -m "1234567890123456789012345678901234567890"
TCPDUMP抓取的报文如下:
# 点击WEB页面上的Init按钮之后
18:11:49.183098 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [S], seq 3659468570, win 64240, options [mss 1360,nop,wscale 8,nop,nop,sackOK], length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0034 7b4e 4000 7206 65a0 0e96 6cbb ac10 .4{N@.r.e...l...
0x0020: 000c c737 2329 da1f 0f1a 0000 0000 8002 ...7#)..........
0x0030: faf0 797b 0000 0204 0550 0103 0308 0101 ..y{.....P......
0x0040: 0402 ..
18:11:49.183188 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [S.], seq 3730879023, ack 3659468571, win 29200, options [mss 1460,nop,nop,sackOK,nop,wscale 7], length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0034 0000 4000 4006 1357 ac10 000c 0e96 .4..@.@..W......
0x0020: 6cbb 2329 c737 de60 b22f da1f 0f1b 8012 l.#).7.`./......
0x0030: 7210 7157 0000 0204 05b4 0101 0402 0103 r.qW............
0x0040: 0307 ..
18:11:49.223085 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 1, win 515, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 7b4f 4000 7206 65ab 0e96 6cbb ac10 .({O@.r.e...l...
0x0020: 000c c737 2329 da1f 0f1b de60 b230 5010 ...7#).....`.0P.
0x0030: 0203 2237 0000 .."7..
18:11:49.247331 IP Browser.50999 > MQTT.etlservicemgr: Flags [P.], seq 1:511, ack 1, win 515, length 510
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0226 7b50 4000 7206 63ac 0e96 6cbb ac10 .&{P@.r.c...l...
0x0020: 000c c737 2329 da1f 0f1b de60 b230 5018 ...7#).....`.0P.
0x0030: 0203 d122 0000 4745 5420 2f6d 7174 7420 ..."..GET./mqtt.
0x0040: 4854 5450 2f31 2e31 0d0a 486f 7374 3a20 HTTP/1.1..Host:.
0x0050: 3831 2e37 312e 3837 2e33 373a 3930 3031 81.71.87.37:9001
0x0060: 0d0a 436f 6e6e 6563 7469 6f6e 3a20 5570 ..Connection:.Up
0x0070: 6772 6164 650d 0a50 7261 676d 613a 206e grade..Pragma:.n
0x0080: 6f2d 6361 6368 650d 0a43 6163 6865 2d43 o-cache..Cache-C
0x0090: 6f6e 7472 6f6c 3a20 6e6f 2d63 6163 6865 ontrol:.no-cache
0x00a0: 0d0a 5573 6572 2d41 6765 6e74 3a20 4d6f ..User-Agent:.Mo
0x00b0: 7a69 6c6c 612f 352e 3020 2857 696e 646f zilla/5.0.(Windo
0x00c0: 7773 204e 5420 3130 2e30 3b20 5769 6e36 ws.NT.10.0;.Win6
0x00d0: 343b 2078 3634 2920 4170 706c 6557 6562 4;.x64).AppleWeb
0x00e0: 4b69 742f 3533 372e 3336 2028 4b48 544d Kit/537.36.(KHTM
0x00f0: 4c2c 206c 696b 6520 4765 636b 6f29 2043 L,.like.Gecko).C
0x0100: 6872 6f6d 652f 3130 312e 302e 3439 3531 hrome/101.0.4951
0x0110: 2e36 3720 5361 6661 7269 2f35 3337 2e33 .67.Safari/537.3
0x0120: 360d 0a55 7067 7261 6465 3a20 7765 6273 6..Upgrade:.webs
0x0130: 6f63 6b65 740d 0a4f 7269 6769 6e3a 206e ocket..Origin:.n
0x0140: 756c 6c0d 0a53 6563 2d57 6562 536f 636b ull..Sec-WebSock
0x0150: 6574 2d56 6572 7369 6f6e 3a20 3133 0d0a et-Version:.13..
0x0160: 4163 6365 7074 2d45 6e63 6f64 696e 673a Accept-Encoding:
0x0170: 2067 7a69 702c 2064 6566 6c61 7465 0d0a .gzip,.deflate..
0x0180: 4163 6365 7074 2d4c 616e 6775 6167 653a Accept-Language:
0x0190: 207a 682d 434e 2c7a 683b 713d 302e 390d .zh-CN,zh;q=0.9.
0x01a0: 0a53 6563 2d57 6562 536f 636b 6574 2d4b .Sec-WebSocket-K
0x01b0: 6579 3a20 7938 796a 3756 7858 624b 576f ey:.y8yj7VxXbKWo
0x01c0: 4f45 6962 3248 7176 4b51 3d3d 0d0a 5365 OEib2HqvKQ==..Se
0x01d0: 632d 5765 6253 6f63 6b65 742d 4578 7465 c-WebSocket-Exte
0x01e0: 6e73 696f 6e73 3a20 7065 726d 6573 7361 nsions:.permessa
0x01f0: 6765 2d64 6566 6c61 7465 3b20 636c 6965 ge-deflate;.clie
0x0200: 6e74 5f6d 6178 5f77 696e 646f 775f 6269 nt_max_window_bi
0x0210: 7473 0d0a 5365 632d 5765 6253 6f63 6b65 ts..Sec-WebSocke
0x0220: 742d 5072 6f74 6f63 6f6c 3a20 6d71 7474 t-Protocol:.mqtt
0x0230: 0d0a 0d0a ....
18:11:49.247360 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [.], ack 511, win 237, length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0028 653f 4000 4006 ae23 ac10 000c 0e96 .(e?@.@..#......
0x0020: 6cbb 2329 c737 de60 b230 da1f 1119 5010 l.#).7.`.0....P.
0x0030: 00ed 214f 0000 ..!O..
18:11:49.323395 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 1:160, ack 511, win 237, length 159
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 00c7 6540 4000 4006 ad83 ac10 000c 0e96 ..e@@.@.........
0x0020: 6cbb 2329 c737 de60 b230 da1f 1119 5018 l.#).7.`.0....P.
0x0030: 00ed df2c 0000 4854 5450 2f31 2e31 2031 ...,..HTTP/1.1.1
0x0040: 3031 2053 7769 7463 6869 6e67 2050 726f 01.Switching.Pro
0x0050: 746f 636f 6c73 0d0a 5570 6772 6164 653a tocols..Upgrade:
0x0060: 2057 6562 536f 636b 6574 0d0a 436f 6e6e .WebSocket..Conn
0x0070: 6563 7469 6f6e 3a20 5570 6772 6164 650d ection:.Upgrade.
0x0080: 0a53 6563 2d57 6562 536f 636b 6574 2d41 .Sec-WebSocket-A
0x0090: 6363 6570 743a 204a 6845 6544 4534 6468 ccept:.JhEeDE4dh
0x00a0: 6462 2b4b 6a47 4f52 452b 595a 7176 424b db+KjGORE+YZqvBK
0x00b0: 2f34 3d0d 0a53 6563 2d57 6562 536f 636b /4=..Sec-WebSock
0x00c0: 6574 2d50 726f 746f 636f 6c3a 206d 7174 et-Protocol:.mqt
0x00d0: 740d 0a0d 0a t....
18:11:49.392070 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [P.], seq 511:557, ack 160, win 514, length 46
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0056 7b54 4000 7206 6578 0e96 6cbb ac10 .V{T@.r.ex..l...
0x0020: 000c c737 2329 da1f 1119 de60 b2cf 5018 ...7#).....`..P.
0x0030: 0202 7640 0000 82a8 a399 adbd b3bf adb9 ..v@............
0x0040: eec8 f9e9 a75b ad81 a391 ced1 cafc c3c9 .....[..........
0x0050: cafd adb8 c0fa c3d8 d799 a4de c0f7 c8c9 ................
0x0060: 93a0 9d8c ....
18:11:49.392100 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [.], ack 557, win 237, length 0
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0028 6541 4000 4006 ae21 ac10 000c 0e96 .(eA@.@..!......
0x0020: 6cbb 2329 c737 de60 b2cf da1f 1147 5010 l.#).7.`.....GP.
0x0030: 00ed 2082 0000 ......
18:11:49.392352 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 160:166, ack 557, win 237, length 6
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 002e 6542 4000 4006 ae1a ac10 000c 0e96 ..eB@.@.........
0x0020: 6cbb 2329 c737 de60 b2cf da1f 1147 5018 l.#).7.`.....GP.
0x0030: 00ed 7e6d 0000 8204 2002 0000 ..~m........
18:11:49.443110 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 166, win 514, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 7b56 4000 7206 65a4 0e96 6cbb ac10 .({V@.r.e...l...
0x0020: 000c c737 2329 da1f 1147 de60 b2d5 5010 ...7#)...G.`..P.
0x0030: 0202 1f67 0000 ...g..
18:11:49.451571 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [P.], seq 557:577, ack 166, win 514, length 20
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 003c 7b57 4000 7206 658f 0e96 6cbb ac10 .<{W@.r.e...l...
0x0020: 000c c737 2329 da1f 1147 de60 b2d5 5018 ...7#)...G.`..P.
0x0030: 0202 5568 0000 828e 2670 37fd a47c 37fc ..Uh....&p7..|7.
0x0040: 2677 549e 4815 43d2 0571 &wT.H.C..q
18:11:49.451667 IP 172.16.0.12.etlservicemgr > 14.150.108.187.50999: Flags [P.], seq 166:173, ack 577, win 237, length 7
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 002f 6543 4000 4006 ae18 ac10 000c 0e96 ./eC@.@.........
0x0020: 6cbb 2329 c737 de60 b2d5 da1f 115b 5018 l.#).7.`.....[P.
0x0030: 00ed 0d4f 0000 8205 9003 0001 01 ...O.........
18:11:49.543376 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 173, win 514, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 7b59 4000 7206 65a1 0e96 6cbb ac10 .({Y@.r.e...l...
0x0020: 000c c737 2329 da1f 115b de60 b2dc 5010 ...7#)...[.`..P.
0x0030: 0202 1f4c 0000 ...L..
# 点击Send按钮之后
18:16:59.642995 IP Browser.50999 > MQTT.etlservicemgr: Flags [P.], seq 677:711, ack 249, win 514, length 34
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 004a 7c2e 4000 7206 64aa 0e96 6cbb ac10 .J|.@.r.d...l...
0x0020: 000c c737 2329 da1f 11bf de60 b328 5018 ...7#).....`.(P.
0x0030: 0202 736b 0000 829c 69c7 926a 59dd 9266 ..sk....i..jY..f
0x0040: 0aa4 fc0f 1de8 e71a 46a6 f009 21a2 fe06 ........F...!...
0x0050: 06e7 c505 1bab f64b .......K
18:16:59.643148 IP MQTT.etlservicemgr > Browser.50999: Flags [P.], seq 249:279, ack 711, win 237, length 30
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0046 654b 4000 4006 adf9 ac10 000c 0e96 .FeK@.@.........
0x0020: 6cbb 2329 c737 de60 b328 da1f 11e1 5018 l.#).7.`.(....P.
0x0030: 00ed ce09 0000 821c 301a 000c 6363 6e65 ........0...ccne
0x0040: 742f 7570 2f61 6263 4865 6c6c 6f20 576f t/up/abcHello.Wo
0x0050: 726c 6421 rld!
18:16:59.743220 IP 14.150.108.187.50999 > 172.16.0.12.etlservicemgr: Flags [.], ack 279, win 514, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 7c30 4000 7206 64ca 0e96 6cbb ac10 .(|0@.r.d...l...
0x0020: 000c c737 2329 da1f 11e1 de60 b346 5010 ...7#).....`.FP.
0x0030: 0202 1e5c 0000 ...\..
# 执行发布消息命令之后
17:56:50.992944 IP MQTT.etlservicemgr > Browser.50939: Flags [P.], seq 205:247, ack 675, win 237, length 42
0x0000: feee 809f 3247 5254 00e4 a55c 0800 4500 ....2GRT...\..E.
0x0010: 0052 e2f9 4000 4006 303f ac10 000c 0e96 .R..@.@.0?......
0x0020: 6cbb 2329 c6fb c3b7 103a 2e40 d9e6 5018 l.#).....:.@..P.
0x0030: 00ed aaae 0000 8228 3026 0006 6363 6e65 .......(0&..ccne
0x0040: 742f 3132 3334 3536 3738 3930 3132 3334 t/12345678901234
0x0050: 3536 3738 3930 3132 3334 3536 3738 3930 5678901234567890
17:56:51.100524 IP 14.150.108.187.50939 > 172.16.0.12.etlservicemgr: Flags [.], ack 247, win 514, length 0
0x0000: 5254 00e4 a55c feee 809f 3247 0800 4568 RT...\....2G..Eh
0x0010: 0028 77db 4000 7206 691f 0e96 6cbb ac10 .(w.@.r.i...l...
0x0020: 000c c6fb 2329 2e40 d9e6 c3b7 1064 5010 ....#).@.....dP.
0x0030: 0202 bffd 0000 ......
可看到基于websockets的通讯整体协作图如下:
MQTT在websockets下的工作模
可以看到除了在建立websockets时,有较大的http头信息,其余仍然很精简。而且websocket已经对Browser上送的AUTH、SUB、PUB进行了加密处理,但是对于下发到Browser的信息并没有进行加密处理
这里使用了mosquitto-2.0.11,下载源码后,修改下config.mk文件,将WITH_WEBSOCKETS打开,如下
# Build with websockets support on the broker.
WITH_WEBSOCKETS:=yes
这里是在centos下工作的,如果没有websockets库,需要安装
yum install libwebsockets* -y
然后就是一路的make,make install,之后配置下LD_LIBRARY_PATH的环境变量,就可以开始工作了。
LD_LIBRARY_PATH=:/root/setup/mosquitto-2.0.11/lib:/root/setup/cJSON-1.7.14
配置mosquitto环境,可建立一个专门的目录,将mosquitto.conf文件拷贝过来修改,目录结构如下:
<root@VM-0-12-centos:/mqtt>tree
.
|-- aclfile # 访问控制文件
|-- log # 日志文件目录
| `-- mosquitto.log
|-- mosquitto # mqtt运行目录
| |-- mosquitto.db
| `-- pid.pid
|-- mosquitto.conf #配置文件!!!IMPORTANT
`-- pwfile # 用户的密码文件
#cat aclfile===========================================================# This affects access control for clients with no username.
topic read $SYS/#
# This only affects clients with username "roger".
user roger
topic foo/bar
# This affects all clients.
pattern write $SYS/broker/connection/%c/state
# Add 20210722
user ccnet
topic read ccnet/#
topic write ccnet/#=========================================================# 创建用户密码文件pwfile
touch pwfile
mosquitto_passwd pwfile user
#cat mosquitto.conf |grep -v "#"=========================================================pid_file /home/release/soft/mqtt/mosquitto/pid.pid
user release
port 1883
listener 9001
protocol websockets
bind_interface eth0
persistence true
persistence_location /home/release/soft/mqtt/mosquitto
allow_anonymous false
password_file /home/release/soft/mqtt/pwfile
acl_file /home/release/soft/mqtt/aclfile
运行Mosquitto
mosquitto -c mosquitto.conf -d
MQTT有很多实现,Mosquitto的魅力在于小而美。有很多开源的实现,基于nodejs、java等,也有商业版本的,更有一些云服务商直接将之PaaS化了。
例如,阿里云将MQTT定义为这样子。一些公司直接基于mqtt搭建其上的物模型,超赞!
阿里云的MQTT服务
要提示:第二节内容中所有操作使用时需要把对应的产品、设备证书等替换为个人对应的数据。为了方便读者更方便直观的了解协议报文形成过程,数据没有隐藏和加密处理,但请勿直接使用文中数据,造成损失将进行法律追究。
MQTT最初是由IBM的Andy Stanford-Clark以及Arcom公司ArlenNipper 为解决石油和天然气管道传感器数据传输问题而创造,他们通过卫星网络传输输油管道传感器数据,当时卫星通信存在费用昂贵、网络中断、低带宽、高延迟等特点,而且传感器数量很多,MQTT就由此而诞生。近年来,物联网发展迅猛,传感器数量爆发式增长,mqtt也被重新拾起。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布,目前最新版本为v3.1.1。MQTT最大的优点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛的应用。
MQTT是一种基于订阅和发布模式的通信方式,通过以主题为核心的方式进行数据或信息的传输。MQTT协议几个重要的关键词:
mqtt报文结构
mqtt有发布、订阅、连接、心跳、请求等报文,但基本结构都类似,报文结构如下:
固定报头 +可变报头+有效载荷
报文类型值有以下几种:
本节中使用阿里云的物联网平台,结合一下调试工具来实现通过MQTT的通信等操作。需要具备以下环境:
1、申请注册阿里云物联网平台,并创建产品、设备,参考阿里云操作手册即可:https://help.aliyun.com/document_detail/30528.html
2、(可选)linux系统环境
3、工具软件:mqtt.fx、网络调试助手
阿里云提供全面的物联网连接服务,可以在其物联网平台做一些简单的入门测试,官方有详细的操作文档,不再赘述,参考操作指导:https://help.aliyun.com/document_detail/73708.html
以下为部分测试效果图:
阿里云物联网平台在线调试
客户端SDK demo程序运行
结合MQTT.fx测试
结合MQTT.fx测试
为接近实际应用,本小节通过网络调试助手和阿里云平台来更加详细的分析MQTT的几个重要报文。报文内容使用16进制方式展示,需要特别注意一下报文内容有些采用UTF-8编码,以及剩余长度的编码格式。
UTF-8 编码字符对 ASCII 字符的编码做了优化。每一个字符串都有两字节的长度字段作为前缀,它标识了这个字符串 UTF-8 编码的字节数。
剩余长度(Remaining Length)表示当前报文剩余部分的字节数,包括可变报头和负载的数据。剩余长度 不包括用于编码剩余长度字段本身的字节数。剩余长度字段使用一个变长度编码方案,对小于 128 的值它使用单字节编码。更大的值按下面的方式处理。低 7 位有效位用于编码数据,最高有效位用于指示是否有更多的字节。因此每个字节可以编码 128 个数值和一个 延续位( continuation bit ) 。剩余长度字段最大 4 个字节。
得到连接报文的 有效载荷 过程如下: ? 1、代替 ? 客户端ID:IOT_F1-01|securemode=3,signmethod=hmacsha1| ? 用户名:IOT_F1-01&gnuyAXcWtth ? 密码:clientIdIOT_F1-01deviceNameIOT_F1-01productKeygnuyAXcWtth
2、转16进制
客户端ID:49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C
用户名:49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68
密码(哈希加密):a73628d4a216e570bdd3997767e827ef4ae836e7
密码:61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37
3、utf-8编码(加两个字节的前缀表示长度) ? 客户端ID:00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C ? 用户名:00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 ? 密码:00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37 ? 最终有效载荷: 00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C 00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37
计算得到可变报头加有效载荷的长度为120(0x78),把固定报头的剩余长度补充得到最终 连接报文: 10 78 00 04 4D 51 54 54 04 C2 00 64 00 2B 49 4F 54 5F 46 31 2D 30 31 7C 73 65 63 75 72 65 6D 6F 64 65 3D 33 2C 73 69 67 6E 6D 65 74 68 6F 64 3D 68 6D 61 63 73 68 61 31 7C 00 15 49 4F 54 5F 46 31 2D 30 31 26 67 6E 75 79 41 58 63 57 74 74 68 00 28 61 37 33 36 32 38 64 34 61 32 31 36 65 35 37 30 62 64 64 33 39 39 37 37 36 37 65 38 32 37 65 66 34 61 65 38 33 36 65 37
打开软件调试助手,以TCP CLIENT的模式连接阿里云平台,其目的IP、端口为阿里云的物联网平台连接参数,可以在平台设备的mqtt连接参数中找到地址和端口参数。把这个地址填到网络调试助手连接即可。 ? 连接后以hex方式发送上面得到的连接报文内容,可以看到已经收到了服务器返回的信息。
连接确认报文格式:固定报头+可变报头 上述测试中,服务器返回20 02 00 00,查询MQTT协议的连接确认报文可知道服务器接受了连接。 报文格式如下:
1、发布消息 发布消息的报文格式为:固定报头+可变报头+有效载荷+响应+动作
主题在这里使用阿里云产品中的物模型topic,也可以使用自定义主题等。
(1)把deviceName替换为设备名称,得到可变报头内容为: /sys/gnuyAXcWtth/IOT_F1-01/thing/event/property/post 转为16进制并左utf-8编码得到可变报头为: 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74
转16进制: 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D
(2)这里使用QoS为0,所以就不包含响应和动作两个字段,得到 固定报头+可变报头+有效载荷 内容: 31 ?? 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D 求得剩余长度为:190(0xBE) 注意剩余长度的表示规则,按规则处理后表示为:0xBE 0x01。
(3)最终得到的发布报文为: 31 BE 01 00 34 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 65 76 65 6E 74 2F 70 72 6F 70 65 72 74 79 2F 70 6F 73 74 7B 22 6D 65 74 68 6F 64 22 3A 22 74 68 69 6E 67 2E 73 65 72 76 69 63 65 2E 70 72 6F 70 65 72 74 79 2E 70 6F 73 74 22 2C 22 69 64 22 3A 22 30 30 30 30 30 30 30 31 22 2C 22 70 61 72 61 6D 73 22 3A 7B 22 52 75 6E 6E 69 6E 67 53 74 61 74 65 22 3A 31 2C 22 48 75 6D 69 64 69 74 79 22 3A 38 35 2C 22 74 65 6D 70 65 72 61 74 75 72 65 22 3A 32 36 2E 36 2C 7D 2C 22 76 65 72 73 69 6F 6E 22 3A 22 31 2E 30 2E 30 22 7D
(4)用网络助手向服务器发送报文后,可以看到服务器有回复,在阿里云平台看到了刷新的设置数据。
2、发布确认 报文格式:固定报头+可变报头 只有QoS等级为1时才会有发布确认报文,其构成如下,本次测试中暂时不用。
订阅报文格式:固定报头+可变报头+有效载荷+响应
转为16进制:2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 Mqtt协议要求主题过滤器要采用 UTF-8编码,所以得到最终内容为: 00 35 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 服务质量要求设置为:QoS0 ,因此取值为 00 得到最终的有效载荷为: 00 35 2F 73 79 73 2F 67 6E 75 79 41 58 63 57 74 74 68 2F 49 4F 54 5F 46 31 2D 30 31 2F 74 68 69 6E 67 2F 73 65 72 76 69 63 65 2F 70 72 6F 70 65 72 74 79 2F 73 65 74 00
最后把得到的报文通过网络调试助手发送给服务器,得到服务器的返回信息如下:
订阅确认报文格式:固定报头+可变报头+有效载荷 上述测试中,发送订阅报文后服务器返回 90 03 00 0A 01 根据确认报文格式可知,已经订阅成功。
提前在该产品下创建好第两个设备。
1、自定义设备的可订阅和发布主题
首先看到设备1发布主题,但设备2是接收不到设备1的消息的。
2、设备2订阅自定义的主题:/gnuyAXcWtth/IOT_F1-02/user/post/info,订阅成功后可在设备2的界面看到已订阅主题列表中多了该topic。
3、设备之间需要通信,则需要通过云平台对消息进行转发,所以需要在云平台做流转配置,根据文档建立流转解析器,然后启动运行解析器即可。参考文档: https://help.aliyun.com/document_detail/68677.html
4、设备1用MQTT.fx连接平台,而设备2则用网络调试助手与平台建立好连接。用MQTT.fx发布设备1的主题”/gnuyAXcWtth/IOT_F1-02/user/post/info”消息,可以看到设备2已经收到了设备1发布的”/gnuyAXcWtth/IOT_F1-02/user/post/info”消息。
*请认真填写需求信息,我们会在24小时内与您取得联系。