开发指南:使用 MQTTNet 库构建 .Net 物联网 MQTT 应用程序

发布于:2025-09-15 ⋅ 阅读:(21) ⋅ 点赞:(0)

一、背景介绍

随着物联网的兴起,.Net 框架在构建物联网应用程序方面变得越来越流行。微软的 .Net Core 和 .Net 框架为开发人员提供了一组工具和库,以构建可以在 Raspberry Pi、HummingBoard、BeagleBoard、Pine A64 等平台上运行的物联网应用程序。

MQTTnet 是一个实现 MQTT 协议的高性能 .Net 库,在 GitHub 上开源,具有丰富的功能,支持 MQTT 5.0 协议和 TLS/SSL。

本文介绍使用 MQTTnet 库连接到 EMQX Serverless MQTT 消息服务器。

二、准备 MQTT Broker

1、使用 APT 安装 EMQX

# 配置 EMQX APT 源
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash

在这里插入图片描述

# 安装 EMQX 最新版
sudo apt-get install emqx

在这里插入图片描述

# 启动 EMQX
sudo emqx start

在这里插入图片描述

2、EMQX 运行情况检查

netstat -tunlp

在这里插入图片描述

端口 说明
1883 MQTT/TCP 协议端口
8883 MQTT/SSL 协议端口
8083 MQTT/WS 协议端口
8084 MQTT/WSS 协议端口
18083 EMQX Dashboard 端口
4370 Erlang 分布式传输端口
5370 集群 RPC 端口,默认情况下,每个 EMQX 节点有一个 RPC 监听端口。

3、访问 Dashboard

EMQX 提供了 Dashboard,以方便用户通过 Web 页面管理、监控 EMQX 并配置所需的功能。EMQX 成功启动之后可以通过浏览器打开 http://localhost:18083/

在这里插入图片描述

Dashboard 的默认用户名为 admin,密码为 public,第一次登录成功后会提示修改密码。密码修改完成后,我们也可以在 Settings 页面将 Dahshboard 的语言改为 简体中文

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4、配置认证

EMQX 从 5.0 开始支持在 Dashbaord 配置认证,以方便用户能更加方便、快速的创建安全的 MQTT 服务。我们点击 访问控制 菜单下的 认证 进入认证配置页面,然后点击最右侧的 创建 按钮:

在这里插入图片描述

选择 Password-Based 选项:

在这里插入图片描述

数据库选择 内置数据库

在这里插入图片描述

接下来选择账户类型、加密方式、加盐方式,并点击 创建

在这里插入图片描述

认证创建成功后如下图,接下来我们点击 用户管理 添加用户:

在这里插入图片描述

进入用户管理页面后,我们点击最右侧的 添加 按钮,并在弹出框里设置用户名与密码,之后点击 保存

在这里插入图片描述

在这里插入图片描述

如下图表示创建成功:

在这里插入图片描述

使用 Dashboard 提供的 Websocket 工具来测试认证是否已配置成功,在连接配置里输入刚才创建的用户名与密码,然后点击连接,将会看到右侧弹窗提示已连接:

在这里插入图片描述

使用一个未创建的用户名 test1,点击连接将会看到如下连接失败信息:

在这里插入图片描述

至此,我们已完成了 EMQX 的认证配置,搭建了一台可用于生产环境的单节点 MQTT 服务器。

三、SSL/TLS 证书准备

通常来说,我们会需要数字证书来保证 TLS 通讯的强认证,数字证书的使用本身是一个三方协议,除了通讯双方,还有一个颁发证书的受信第三方,有时候这个受信第三方就是一个 CA。

# 安装 OpenSSL
sudo apt install openssl

首先,我们需要一个自签名的 CA 证书,生成这个证书需要有一个私钥为它签名,可以执行以下命令来生成私钥:

openssl genrsa -out ca.key 2048

这个命令将生成一个密钥长度为 2048 的密钥并保存在 ca.key 中,有了这个密钥,就可以用它来生成 EMQX 的根证书了:

openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.pem

在这里插入图片描述

查看 CA 证书信息(可选):

openssl x509 -in ca.pem -noout -text

在这里插入图片描述

根证书是整个信任链的起点,如果一个证书的每一级签发者向上一直到根证书都是可信的,那个我们就可以认为这个证书也是可信的,有了这个根证书,我们就可以用它来给其他实体签发实体证书了。

除了 CA 证书,实体(在这里指的是 EMQX)也需要一个自己的 Key 对来保证它对自己证书的控制权,生成这个密钥的过程和上面类似:

openssl genrsa -out emqx.key 2048

新建 openssl.cnf 文件:

[req]
default_bits  = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = Guangdong
localityName = Dongguan
organizationName = EMQX
commonName = Server certificate
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = BROKER_ADDRESS
DNS.1 = BROKER_ADDRESS
  • req_distinguished_name :根据情况进行修改,
  • alt_names: BROKER_ADDRESS 修改为 EMQX 服务器实际的 IP 或 DNS 地址,例如:IP.1 = 127.0.0.1,或 DNS.1 = broker.xxx.com

在这里插入图片描述

然后以这个密钥和配置签发一个证书请求:

openssl req -new -key ./emqx.key -config openssl.cnf -out emqx.csr

在这里插入图片描述

以根证书来签发 EMQX 的实体证书:

openssl x509 -req -in ./emqx.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out emqx.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf

查看 EMQX 实体证书(可选):

openssl x509 -in emqx.pem -noout -text

在这里插入图片描述

验证 EMQX 实体证书,确定证书是否正确:

openssl verify -CAfile ca.pem emqx.pem
emqx.pem: OK

在这里插入图片描述

准备好证书后,我们就可以启用 EMQX 的 TLS/SSL 功能了。

四、SSL/TLS 启用及验证

在 EMQX 中 mqtt:ssl 的默认监听端口为 8883。

将前文中通过 OpenSSL 工具生成的 emqx.pem、emqx.key 及 ca.pem 文件拷贝到 EMQX 的 certs 目录下,并参考如下配置修改 emqx.conf

sudo cp emqx.key emqx.pem ca.pem /etc/emqx/certs/

在这里插入图片描述

## listener.ssl.$name is the IP address and port that the MQTT/SSL
## Value: IP:Port | Port
listener.ssl.external = 8883

## Path to the file containing the user's private PEM-encoded key.
## Value: File
listener.ssl.external.keyfile = etc/certs/emqx.key

## 注意:如果 emqx.pem 是证书链,请确保第一个证书是服务器的证书,而不是 CA 证书。
## Path to a file containing the user certificate.
## Value: File
listener.ssl.external.certfile = etc/certs/emqx.pem

## 注意:ca.pem 用于保存服务器的中间 CA 证书和根 CA 证书。可以附加其他受信任的 CA,用来进行客户端证书验证。
## Path to the file containing PEM-encoded CA certificates. The CA certificates
## Value: File
listener.ssl.external.cacertfile = etc/certs/ca.pem

在这里插入图片描述

# emqx.conf
listener.ssl.external = 8883
listener.ssl.external.keyfile = "./certs/emqx.key"
listener.ssl.external.certfile = "./certs/emqx.pem"
listener.ssl.external.cacertfile = "./certs/ca.pem"

注意,配置完成需要重启 EMQX !打开 EMQX 的 Dashboard 在 Listeners 页面可以看到在 8883 端口上有一个 ssl 连接:

在这里插入图片描述

MQTT 连接测试:

https://mqttx.app/zh/downloads

当配置完成并重启 EMQX 后,我们使用 MQTT 客户端工具 - MQTTX(该工具跨平台且支持 MQTT 5.0),来验证 TLS 服务是否正常运行。

在这里插入图片描述

sudo dpkg -i MQTTX_1.12.0_amd64.deb

在这里插入图片描述

在这里插入图片描述

在前文中我们配置了认证,所以这里需要注意输入帐号密码,同时开启 SSL/TLS,证书类型选择 CA or Self signed certificates

在这里插入图片描述

连接成功,订阅主题 testtop

在这里插入图片描述

五、创建项目

dotnet new console -f net8.0

在这里插入图片描述

六、安装 MQTTnet 包

注意,这里使用的 MQTTnet 版本为 4.2.0.706 ,并没有使用最新版。

在这里插入图片描述

七、设置连接

要连接到 EMQX Cloud Serverless 服务,需要创建 MqttClientOptionsBuilder 类的实例,并设置必要的选项,如代理地址、端口、用户名和密码。

string broker = "******.emqxsl.com";
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
string username = "emqxtest";
string password = "******";

// Create a MQTT client factory
var factory = new MqttFactory();

// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();

// Create MQTT client options
var options = new MqttClientOptionsBuilder()
    .WithTcpServer(broker, port) // MQTT broker address and port
    .WithCredentials(username, password) // Set username and password
    .WithClientId(clientId)
    .WithCleanSession()
    .Build();

八、使用 TLS/SSL

连接到 EMQX Serverless 时,需要注意的是,它依赖于多租户架构,该架构使多个用户能够共享单个 EMQX 集群,为了保证这种多租户环境内数据传输的安全性和可靠性,需要 TLS,并且如果服务器使用的是自签名证书,则必须从部署概览面板下载相应的 CA 文件,并在连接建立过程中提供。

要添加 TLS 并将证书文件设置为 MqttClientOptionsBuilder 实例,可以使用 WithTls()

string broker = "******.emqxsl.com";
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
string username = "emqxtest";
string password = "******";

// Create a MQTT client factory
var factory = new MqttFactory();

// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();

// Create MQTT client options
var options = new MqttClientOptionsBuilder()
    .WithTcpServer(broker, port) // MQTT broker address and port
    .WithCredentials(username, password) // Set username and password
    .WithClientId(clientId)
    .WithCleanSession()
    .WithTls(
        o =>
        {
            // The used public broker sometimes has invalid certificates. This sample accepts all
            // certificates. This should not be used in live environments.
            o.CertificateValidationHandler = _ => true;

            // The default value is determined by the OS. Set manually to force version.
            o.SslProtocol = SslProtocols.Tls12; ;

            // Please provide the file path of your certificate file. The current directory is /bin.
            var certificate = new X509Certificate("/opt/emqxsl-ca.crt", "");
            o.Certificates = new List<X509Certificate> { certificate };
        }
    )
    .Build();

九、连接到 MQTT 消息服务器

只需使用 MQTT 客户端的 PublishAsync 方法建立连接并开始发送和接收消息。

var connectResult = await mqttClient.ConnectAsync(options);

这里我们使用异步编程,在订阅的同时允许消息发布,防止阻塞。

十、订阅主题

连接到代理后,可以通过检查 ResultCode 的值来验证连接是否成功。如果连接成功,可以订阅主题来接收消息。

if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
    Console.WriteLine("Connected to MQTT broker successfully.");

    // Subscribe to a topic
    await mqttClient.SubscribeAsync(topic);

    // Callback function when a message is received
    mqttClient.ApplicationMessageReceivedAsync += e =>
    {
        Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
        return Task.CompletedTask;
    };
}

十一、发布消息

要向 EMQX Cloud Serverless 消息服务发送消息,请使用 MQTT 客户端的 PublishAsync 方法。以下是循环向消息服务发送消息的示例,每秒发送一条消息:

for (int i = 0; i < 10; i++)
{
    var message = new MqttApplicationMessageBuilder()
        .WithTopic(topic)
        .WithPayload($"Hello, MQTT! Message number {i}")
        .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
        .WithRetainFlag()
        .Build();

    await mqttClient.PublishAsync(message);
    await Task.Delay(1000); // Wait for 1 second
}

十二、取消订阅

要取消对消息主题的订阅,请调用:

await mqttClient.UnsubscribeAsync(topic);

十三、断开连接

要断开连接,请调用:

await mqttClient.DisconnectAsync();

十四、完整代码

下面的代码展示了如何连接到服务器、订阅主题以及发布和接收消息。

using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;

// string broker = "******.emqxsl.com";
string broker = "127.0.0.1";
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
// string username = "emqxtest";
// string password = "******";

string username = "sam";
string password = "0123456789";


// Create a MQTT client factory
var factory = new MqttFactory();

// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();

// Create MQTT client options
var options = new MqttClientOptionsBuilder()
    .WithTcpServer(broker, port) // MQTT broker address and port
    .WithCredentials(username, password) // Set username and password
    .WithClientId(clientId)
    .WithCleanSession()
    .WithTls(
        o =>
        {
            // The used public broker sometimes has invalid certificates. This sample accepts all
            // certificates. This should not be used in live environments.
            o.CertificateValidationHandler = _ => true;

            // The default value is determined by the OS. Set manually to force version.
            o.SslProtocol = SslProtocols.Tls12;

            // Please provide the file path of your certificate file. The current directory is /bin.
            // var certificate = new X509Certificate("/opt/emqxsl-ca.crt", "");
            var certificate = new X509Certificate("ca.pem", "");
            o.Certificates = new List<X509Certificate> { certificate };
        }
    )
    .Build();

// Connect to MQTT broker
var connectResult = await mqttClient.ConnectAsync(options);

if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
    Console.WriteLine("Connected to MQTT broker successfully.");

    // Subscribe to a topic
    await mqttClient.SubscribeAsync(topic);

    // Callback function when a message is received
    mqttClient.ApplicationMessageReceivedAsync += e =>
    {
        Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
        return Task.CompletedTask;
    };

    // Publish a message 10 times
    for (int i = 0; i < 10; i++)
    {
        var message = new MqttApplicationMessageBuilder()
            .WithTopic(topic)
            .WithPayload($"Hello, MQTT! Message number {i}")
            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
            .WithRetainFlag()
            .Build();

        await mqttClient.PublishAsync(message);
        await Task.Delay(1000); // Wait for 1 second
    }

    // Unsubscribe and disconnect
    await mqttClient.UnsubscribeAsync(topic);
    await mqttClient.DisconnectAsync();
}
else
{
    Console.WriteLine($"Failed to connect to MQTT broker: {connectResult.ResultCode}");
}

运行效果:

Connected to MQTT broker successfully.
Received message: Hello, MQTT! Message number 0
Received message: Hello, MQTT! Message number 1
Received message: Hello, MQTT! Message number 2
Received message: Hello, MQTT! Message number 3
Received message: Hello, MQTT! Message number 4
Received message: Hello, MQTT! Message number 5
Received message: Hello, MQTT! Message number 6
Received message: Hello, MQTT! Message number 7
Received message: Hello, MQTT! Message number 8
Received message: Hello, MQTT! Message number 9

在这里插入图片描述

参考文档

  • https://www.emqx.com/zh/blog/connecting-to-serverless-mqtt-broker-with-mqttnet-in-csharp
  • https://www.emqx.com/zh/blog/how-to-install-emqx-mqtt-broker-on-ubuntu
  • https://www.emqx.com/zh/blog/emqx-server-ssl-tls-secure-connection-configuration-guide
  • https://www.emqx.com/zh/blog/enable-two-way-ssl-for-emqx