C#实现MQTT客户端及调试方法


MQTT简述

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议。由服务器作为代理(Broker)来处理客户端消息的转发。客户端包括发布者(Publish)和订阅者(Subscribe),前者只能发布消息,后者既可以订阅消息,也能发布消息。传输的消息包括主题(Topic)和负载(Payload)即消息的内容。

消息发布有三种级别:Qos 0,为最多交付一次,代理收到发布者的消息后,转发给所有订阅了该主题的订阅者,不论有没有收到;Qos 1,为至少交付一次,代理会反复转发,直至收到订阅者的反馈;Qos 2,为只交付一次,确保订阅者收到且仅收到一次消息。Qos 2的性能(吞吐量)与延迟均只能达到其他两种方式约一半的表现。

订阅者在订阅主题时,可以直接订阅某个指定的主题,也可以通过单层通配符(+)和多层通配符(#)订阅多个主题。如传感器(Sensor)层级下有两种传感器:温度(Temp)和湿度(RH),两种传感器各有10个。可以通过主题"Sensor/Temp/1"来订阅1号温度传感器。通过"Sensor/+/1"订阅1号温度传感器及1号湿度传感器。通过"Sensor/#"订阅全部传感器。

MQTT客户端

利用WinForms实现MQTT客户端的功能。

测试环境

  • Windows 11 专业版 23H2
  • Microsoft Visual Studio Community 2022 (64 位) – 版本 17.11.2
  • .NET Framework 4.8
  • MQTTnet v4.3.7.1207
  • EMQX 5.3.2
  • MQTTX v1.11.0

需要注意的是本例可能用到了.NET 4.8以下版本没有的功能,而且MQTTnet 3.x和4.x版本有着不同的构建和订阅方式,不互相兼容。如使用本例方法,建议用采用相同的.NET和MQTTnet版本。

客户端界面

在WinForm窗体里放置一些必要的控件,用于MQTT的各种功能。为了便于后续代码阅读,这里列出本例各个控件的名字。代理服务器地址(MQTT_IP_input)、端口号(MQTT_Port_input)、连接按钮(MQTT_Con_btn)、断开按钮(MQTT_Cut_btn)、客户端ID(MQTT_ID_input)、用户名(MQTT_User_input)、密码(MQTT_Pass_input)、订阅主题(MQTT_getTopic_input)、订阅按钮(MQTT_Sub_btn)、订阅内容(MQTT_RMsg_output)、发布主题(MQTT_putTopic_input)、发布按钮(MQTT_Pub_btn)、发布内容(MQTT_WMsg_input)。

除以上控件,上图还加了一些次要功能,可以忽略。如匿名认证权限认证的下拉菜单,显示接收到订阅消息的主题、级别和时间。

引入MQTTnet

在VS中菜单栏找到工具-Nuget 包管理器-管理解决方案的 NuGet 程序包。在浏览页面搜索MQTT,安装下图所示的包。本例使用4.3.7.1207版本,大版本间存在兼容性问题。

在窗体代码顶部引入需要的命名空间。

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Server;
C#

创建MQTT对象和事件

创建一个MQTT客户端对象,并创建一些变量,用于存放接收的消息。创建一些事件,在对应事件触发时执行。如连接成功事件、连接断开事件、订阅事件、收到消息事件。

// 创建MQTT客户端对象
private IMqttClient mqttClient;

// 创建一些用于显示的变量
private bool mqttIsConnected = false; // 已连接
private string Subscribe_Topic = "Topic"; // 订阅的Topic
private string Subscribe_Payload = ""; // 订阅的内容
private string Subscribe_Time = ""; // 接收时间
private string Subscribe_Qos = "QoS: 0"; // 接收级别

/// <summary>
/// 连接MQTT代理成功事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
    MessageBox.Show("MQTT客户端连接成功", "MQTT提示");
    mqttIsConnected = true;
    mqttClient.SubscribeAsync(Subscribe_Topic, MqttQualityOfServiceLevel.ExactlyOnce); //Qos2
    return Task.CompletedTask;
}

/// <summary>
/// MQTT连接断开事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
    MessageBox.Show("MQTT客户端连接已断开", "MQTT提示");
    mqttIsConnected = false;
    return Task.CompletedTask;
}

/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topic"></param>
private async void ClientSubscribeTopic(string topic)
{
    if(mqttIsConnected)
    {
        await mqttClient.SubscribeAsync(topic);
    }
}

/// <summary>
/// 收到订阅消息事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
    byte[] data = arg.ApplicationMessage.PayloadSegment.ToArray();
    Subscribe_Time = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.FFF");
    Subscribe_Topic = "Topic: " + arg.ApplicationMessage.Topic;
    switch (arg.ApplicationMessage.QualityOfServiceLevel)
    {
        case MqttQualityOfServiceLevel.AtMostOnce:
        default:
            Subscribe_Qos = "QoS: 0";
            break;
        case MqttQualityOfServiceLevel.AtLeastOnce:
            Subscribe_Qos = "QoS: 1";
            break;
        case MqttQualityOfServiceLevel.ExactlyOnce:
            Subscribe_Qos = "QoS: 2";
            break;
    }
    Subscribe_Payload = Encoding.UTF8.GetString(data);
    return Task.CompletedTask;
}
C#

连接与断开

点击连接按钮后构建并建立连接。点击断开按钮关闭连接。断开前判断一下是否已建立连接,防止报错。

// 连接按钮
private void MQTT_Con_btn_Click(object sender, EventArgs e)
{
    try
    {
        mqttIsConnected = false;
        var mqttFactory = new MqttFactory();
        var mqttClientOptions = new MqttClientOptionsBuilder()
            .WithTcpServer(MQTT_IP_input.Text, int.Parse(MQTT_Port_input.Text))
            .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
            .WithClientId(MQTT_ID_input.Text)
            .WithCleanSession(false)
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
            .WithCredentials(MQTT_User_input.Text, MQTT_Pass_input.Text)
            .Build();
        mqttClient = mqttFactory.CreateMqttClient();
        mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
        mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
        mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
        Task task = mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
        task.Wait();
    }
    catch (Exception ex)
    {
        MessageBox.Show(this, ex.Message, "MQTT连接出错", MessageBoxButtons.OK, MessageBoxIcon.Information);
    }
}

// 断开按钮
private async void MQTT_Cut_btn_Click(object sender, EventArgs e)
{
    if (mqttIsConnected)
    {
        try
        {
            await mqttClient.DisconnectAsync();
        }
        catch (Exception ex)
        {
            MessageBox.Show(this, ex.Message, "MQTT连接出错", MessageBoxButtons.OK, MessageBoxIcon.Information);
        }
    }
}
C#

订阅与发布

点击订阅按钮开始订阅某个主题。点击发布按钮发布主题和内容。

// 订阅
private void MQTT_Sub_btn_Click(object sender, EventArgs e)
{
    string topic = MQTT_getTopic_input.Text.ToString();
    if (!string.IsNullOrWhiteSpace(topic))
    {
        ClientSubscribeTopic(topic);
    }
    else
    {
        MQTT_getTopic_input.Text = "请输入Topic";
    }
}

// 发布
private void MQTT_Pub_btn_Click(object sender, EventArgs e)
{
    string payload = MQTT_WMsg_input.Text.ToString();
    string mqttTopic = MQTT_putTopic_input.Text.ToString();
    if (!string.IsNullOrWhiteSpace(payload))
    {
        var applicationMessage = new MqttApplicationMessageBuilder()
            .WithTopic(mqttTopic)
            .WithPayload(payload)
            .Build();
        Task<MqttClientPublishResult> task = mqttClient.PublishAsync(applicationMessage);
        task.Wait();
    }
}
C#

更新内容

加入一个500ms的Timer,用于更新显示连接状态和接收的消息内容。代码中PictureBox1是一个连接成功的绿色标志。

private void MQTT_Status_Timer_Tick(object sender, EventArgs e)
{
    if(mqttIsConnected)
    {
        pictureBox1.Visible = true;
        MQTT_Topic_lbl.Text = Subscribe_Topic;
        MQTT_Qos_lbl.Text = Subscribe_Qos;
        MQTT_Time_lbl.Text = Subscribe_Time;
        MQTT_RMsg_output.Text = Subscribe_Payload;
    }
    else
    {
        pictureBox1.Visible = false;
    }
}
C#

MQTT服务端

使用EMQX作为服务端,对客户端程序进行测试。

安装EMQX

EMQ网站下载EMQX Broker,软件支持多种平台,并非每个版本号都有适用于Windows的版本。本例使用的是emqx-5.3.2-windows-amd64.zip。

下载后解压到D盘根目录,打开后在bin文件夹中有emqx.cmd文件,说明没下载错。用系统搜索工具搜索cmd,用管理员身份运行命令提示符。使用cd命令进入bin目录:cd /d D:\emqx\bin,执行命令:.\emqx.cmd install,安装EMQX,卸载命令把install换成uninstall。安装完成后执行命令:.\emqx.cmd console启动服务。

如图所示启动成功。如果显示某某端口被占用了,找到占用的进程杀掉就行。重启一下电脑也行。

启动EMQX

在浏览器访问http://127.0.0.1:18083/,打开EMQX登录界面,初始用户名为admin,密码为public,登陆后要求更改密码。修改后进入。

在概览中可以看到连接了几台客户端,共有多少主题和订阅。点击对应文字可以查看详细信息。

安装MQTTX

为了测试客户端程序,还需要另一个工具来扮演其他的客户端。在EMQX网站找到MQTTX,下载Windows版本并安装。安装完成后运行程序,点击左侧“+”新建连接,名称任意,服务器地址和IP为127.0.0.1:1883,点击右上角连接。

连接成功后EMQX连接处可以看到新建立的连接。在MQTTX连接界面点击添加订阅,订阅几个主题(本例为testtopic/A、testtopic/B、Topic),在EMQX查看主题和订阅数。

功能测试

运行MQTT客户端窗体,输入服务器地址端口号,客户端ID在MQTT通讯中保持唯一即可,不要与MQTTX中的ID重复,点击连接按钮,查看EMQX在线连接中是否连接成功。

点击EMQX概览界面点击左侧目录监控 – 客户端,显示如上图界面。图中的ClientID_zysd是窗体程序建立的连接,mqttx_adef3529是MQTTX建立的连接。

在窗体界面尝试订阅主题testtopic/#,在MQTTX尝试发送该主题的消息,观察窗体程序是否能正常接收订阅。

然后在窗体程序输入发布的主题和内容,点击发布,观察MQTTX是否收到订阅消息。

上面两个例子因为发布者同时订阅了自己发布的主题,所以在发布时自己也收到了自己发布的消息。

在窗体程序点击断开按钮,EMQX客户端页面对应客户端下线,说明断开功能正常。

Powered by WordPress. Theme by Alx.