文章目录
MQTT简述
MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议。由服务器作为代理(Broker)来处理客户端消息的转发。客户端包括发布者(Publish)和订阅者(Subscribe),前者只能发布消息,后者既可以订阅消息,也能发布消息。传输的消息包括主题(Topic)和负载(Payload)即消息的内容。
消息发布有三种级别:Qos 0
,为最多交付一次,代理收到发布者的消息后,转发给所有订阅了该主题的订阅者,不论有没有收到;Qos 1
,为至少交付一次,代理会反复转发,直至收到订阅者的反馈;Qos 2
,为只交付一次,确保订阅者收到且仅收到一次消息。Qos 2的性能(吞吐量)与延迟均只能达到其他两种方式约一半的表现。
订阅者在订阅主题时,可以直接订阅某个指定的主题,也可以通过单层通配符(+)和多层通配符(#)订阅多个主题。如传感器(Sensor)层级下有两种传感器:温度(Temp)和湿度(RH),两种传感器各有10个。可以通过主题"Sensor/Temp/1"
来订阅1号温度传感器。通过"Sensor/+/1"
订阅1号温度传感器及1号湿度传感器。通过"Sensor/#"
订阅全部传感器。
MQTT客户端
利用WinForms实现MQTT客户端的功能。
测试环境
- Windows 11 专业版 23H2
- Microsoft Visual Studio Community 2022 (64 位) – 版本 17.11.2
- .NET Framework 4.8
- MQTTnet v4.3.7.1207
- EMQX 5.3.2
- MQTTX v1.11.0
需要注意的是本例可能用到了.NET 4.8以下版本没有的功能,而且MQTTnet 3.x和4.x版本有着不同的构建和订阅方式,不互相兼容。如使用本例方法,建议用采用相同的.NET和MQTTnet版本。
客户端界面
在WinForm窗体里放置一些必要的控件,用于MQTT的各种功能。为了便于后续代码阅读,这里列出本例各个控件的名字。代理服务器地址(MQTT_IP_input)、端口号(MQTT_Port_input)、连接按钮(MQTT_Con_btn)、断开按钮(MQTT_Cut_btn)、客户端ID(MQTT_ID_input)、用户名(MQTT_User_input)、密码(MQTT_Pass_input)、订阅主题(MQTT_getTopic_input)、订阅按钮(MQTT_Sub_btn)、订阅内容(MQTT_RMsg_output)、发布主题(MQTT_putTopic_input)、发布按钮(MQTT_Pub_btn)、发布内容(MQTT_WMsg_input)。
除以上控件,上图还加了一些次要功能,可以忽略。如匿名认证权限认证的下拉菜单,显示接收到订阅消息的主题、级别和时间。
引入MQTTnet
在VS中菜单栏找到工具-Nuget 包管理器-管理解决方案的 NuGet 程序包。在浏览页面搜索MQTT,安装下图所示的包。本例使用4.3.7.1207版本,大版本间存在兼容性问题。
在窗体代码顶部引入需要的命名空间。
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Server;
C#创建MQTT对象和事件
创建一个MQTT客户端对象,并创建一些变量,用于存放接收的消息。创建一些事件,在对应事件触发时执行。如连接成功事件、连接断开事件、订阅事件、收到消息事件。
// 创建MQTT客户端对象
private IMqttClient mqttClient;
// 创建一些用于显示的变量
private bool mqttIsConnected = false; // 已连接
private string Subscribe_Topic = "Topic"; // 订阅的Topic
private string Subscribe_Payload = ""; // 订阅的内容
private string Subscribe_Time = ""; // 接收时间
private string Subscribe_Qos = "QoS: 0"; // 接收级别
/// <summary>
/// 连接MQTT代理成功事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
MessageBox.Show("MQTT客户端连接成功", "MQTT提示");
mqttIsConnected = true;
mqttClient.SubscribeAsync(Subscribe_Topic, MqttQualityOfServiceLevel.ExactlyOnce); //Qos2
return Task.CompletedTask;
}
/// <summary>
/// MQTT连接断开事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
MessageBox.Show("MQTT客户端连接已断开", "MQTT提示");
mqttIsConnected = false;
return Task.CompletedTask;
}
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topic"></param>
private async void ClientSubscribeTopic(string topic)
{
if(mqttIsConnected)
{
await mqttClient.SubscribeAsync(topic);
}
}
/// <summary>
/// 收到订阅消息事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
byte[] data = arg.ApplicationMessage.PayloadSegment.ToArray();
Subscribe_Time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.FFF");
Subscribe_Topic = "Topic: " + arg.ApplicationMessage.Topic;
switch (arg.ApplicationMessage.QualityOfServiceLevel)
{
case MqttQualityOfServiceLevel.AtMostOnce:
default:
Subscribe_Qos = "QoS: 0";
break;
case MqttQualityOfServiceLevel.AtLeastOnce:
Subscribe_Qos = "QoS: 1";
break;
case MqttQualityOfServiceLevel.ExactlyOnce:
Subscribe_Qos = "QoS: 2";
break;
}
Subscribe_Payload = Encoding.UTF8.GetString(data);
return Task.CompletedTask;
}
C#连接与断开
点击连接按钮后构建并建立连接。点击断开按钮关闭连接。断开前判断一下是否意见连接,防止报错。
// 连接按钮
private void MQTT_Con_btn_Click(object sender, EventArgs e)
{
try
{
mqttIsConnected = false;
var mqttFactory = new MqttFactory();
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(MQTT_IP_input.Text, int.Parse(MQTT_Port_input.Text))
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithClientId(MQTT_ID_input.Text)
.WithCleanSession(false)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
.WithCredentials(MQTT_User_input.Text, MQTT_Pass_input.Text)
.Build();
mqttClient = mqttFactory.CreateMqttClient();
mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
Task task = mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
task.Wait();
}
catch (Exception ex)
{
MessageBox.Show(this, ex.Message, "MQTT连接出错", MessageBoxButtons.OK, MessageBoxIcon.Information);
}
}
// 断开按钮
private async void MQTT_Cut_btn_Click(object sender, EventArgs e)
{
if (mqttIsConnected)
{
try
{
await mqttClient.DisconnectAsync();
}
catch (Exception ex)
{
MessageBox.Show(this, ex.Message, "MQTT连接出错", MessageBoxButtons.OK, MessageBoxIcon.Information);
}
}
}
C#订阅与发布
点击订阅按钮开始订阅某个主题。点击发布按钮发布主题和内容。
// 订阅
private void MQTT_Sub_btn_Click(object sender, EventArgs e)
{
string topic = MQTT_getTopic_input.Text.ToString();
if (!string.IsNullOrWhiteSpace(topic))
{
ClientSubscribeTopic(topic);
}
else
{
MQTT_getTopic_input.Text = "请输入Topic";
}
}
// 发布
private void MQTT_Pub_btn_Click(object sender, EventArgs e)
{
string payload = MQTT_WMsg_input.Text.ToString();
string mqttTopic = MQTT_putTopic_input.Text.ToString();
if (!string.IsNullOrWhiteSpace(payload))
{
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(mqttTopic)
.WithPayload(payload)
.Build();
Task<MqttClientPublishResult> task = mqttClient.PublishAsync(applicationMessage);
task.Wait();
}
}
C#更新内容
加入一个500ms的Timer,用于更新显示连接状态和接收的消息内容。代码中PictureBox1是一个连接成功的绿色标志。
private void MQTT_Status_Timer_Tick(object sender, EventArgs e)
{
if(mqttIsConnected)
{
pictureBox1.Visible = true;
MQTT_Topic_lbl.Text = Subscribe_Topic;
MQTT_Qos_lbl.Text = Subscribe_Qos;
MQTT_Time_lbl.Text = Subscribe_Time;
MQTT_RMsg_output.Text = Subscribe_Payload;
}
else
{
pictureBox1.Visible = false;
}
}
C#MQTT服务端
使用EMQX作为服务端,对客户端程序进行测试。
安装EMQX
在EMQ网站下载EMQX Broker,软件支持多种平台,并非每个版本号都有适用于Windows的版本。本例使用的是emqx-5.3.2-windows-amd64.zip。
下载后解压到D盘根目录,打开后在bin文件夹中有emqx.cmd文件,说明没下载错。用系统搜索工具搜索cmd,用管理员身份运行命令提示符。使用cd命令进入bin目录:cd /d D:\emqx\bin
,执行命令:.\emqx.cmd install
,安装EMQX,卸载命令把install换成uninstall。安装完成后执行命令:.\emqx.cmd console
启动服务。
如图所示启动成功。如果显示某某端口被占用了,找到占用的进程杀掉就行。重启一下电脑也行。
启动EMQX
在浏览器访问http://127.0.0.1:18083/,打开EMQX登录界面,初始用户名为admin,密码为public,登陆后要求更改密码。修改后进入。
在概览中可以看到连接了几台客户端,共有多少主题和订阅。点击对应文字可以查看详细信息。
安装MQTTX
为了测试客户端程序,还需要另一个工具来扮演其他的客户端。在EMQX网站找到MQTTX,下载Windows版本并安装。安装完成后运行程序,点击左侧“+”新建连接,名称任意,服务器地址和IP为127.0.0.1:1883
,点击右上角连接。
连接成功后EMQX连接处可以看到新建立的连接。在MQTTX连接界面点击添加订阅,订阅几个主题(本例为testtopic/A、testtopic/B、Topic),在EMQX查看主题和订阅数。
功能测试
运行MQTT客户端窗体,输入服务器地址端口号,客户端ID在MQTT通讯中保持唯一即可,不要与MQTTX中的ID重复,点击连接按钮,查看EMQX在线连接中是否连接成功。
点击EMQX概览界面点击左侧目录监控 – 客户端,显示如上图界面。图中的ClientID_zysd
是窗体程序建立的连接,mqttx_adef3529
是MQTTX建立的连接。
在窗体界面尝试订阅主题testtopic/#
,在MQTTX尝试发送该主题的消息,观察窗体程序是否能正常接收订阅。
然后在窗体程序输入发布的主题和内容,点击发布,观察MQTTX是否收到订阅消息。
上面两个例子因为发布者同时订阅了自己发布的主题,所以在发布时自己也收到了自己发布的消息。
在窗体程序点击断开按钮,EMQX客户端页面对应客户端下线,说明断开功能正常。