一、背景介绍
随着物联网的兴起,.Net 框架在构建物联网应用程序方面变得越来越流行。微软的 .Net Core 和 .Net 框架为开发人员提供了一组工具和库,以构建可以在 Raspberry Pi、HummingBoard、BeagleBoard、Pine A64 等平台上运行的物联网应用程序。
MQTTnet 是一个实现 MQTT 协议的高性能 .Net 库,在 GitHub 上开源,具有丰富的功能,支持 MQTT 5.0 协议和 TLS/SSL。
本文介绍使用 MQTTnet 库连接到 EMQX Serverless MQTT 消息服务器。
二、准备 MQTT Broker
1、使用 APT 安装 EMQX
# 配置 EMQX APT 源
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
# 安装 EMQX 最新版
sudo apt-get install emqx
# 启动 EMQX
sudo emqx start
2、EMQX 运行情况检查
netstat -tunlp
端口 | 说明 |
---|---|
1883 | MQTT/TCP 协议端口 |
8883 | MQTT/SSL 协议端口 |
8083 | MQTT/WS 协议端口 |
8084 | MQTT/WSS 协议端口 |
18083 | EMQX Dashboard 端口 |
4370 | Erlang 分布式传输端口 |
5370 | 集群 RPC 端口,默认情况下,每个 EMQX 节点有一个 RPC 监听端口。 |
3、访问 Dashboard
EMQX 提供了 Dashboard,以方便用户通过 Web 页面管理、监控 EMQX 并配置所需的功能。EMQX 成功启动之后可以通过浏览器打开 http://localhost:18083/
。
Dashboard 的默认用户名为 admin
,密码为 public
,第一次登录成功后会提示修改密码。密码修改完成后,我们也可以在 Settings 页面将 Dahshboard 的语言改为 简体中文
。
4、配置认证
EMQX 从 5.0 开始支持在 Dashbaord 配置认证,以方便用户能更加方便、快速的创建安全的 MQTT 服务。我们点击 访问控制
菜单下的 认证
进入认证配置页面,然后点击最右侧的 创建
按钮:
选择 Password-Based
选项:
数据库选择 内置数据库
:
接下来选择账户类型、加密方式、加盐方式,并点击 创建
:
认证创建成功后如下图,接下来我们点击 用户管理
添加用户:
进入用户管理页面后,我们点击最右侧的 添加
按钮,并在弹出框里设置用户名与密码,之后点击 保存
:
如下图表示创建成功:
使用 Dashboard 提供的 Websocket 工具来测试认证是否已配置成功,在连接配置里输入刚才创建的用户名与密码,然后点击连接,将会看到右侧弹窗提示已连接:
使用一个未创建的用户名 test1
,点击连接将会看到如下连接失败信息:
至此,我们已完成了 EMQX 的认证配置,搭建了一台可用于生产环境的单节点 MQTT 服务器。
三、SSL/TLS 证书准备
通常来说,我们会需要数字证书来保证 TLS 通讯的强认证,数字证书的使用本身是一个三方协议,除了通讯双方,还有一个颁发证书的受信第三方,有时候这个受信第三方就是一个 CA。
# 安装 OpenSSL
sudo apt install openssl
首先,我们需要一个自签名的 CA 证书,生成这个证书需要有一个私钥为它签名,可以执行以下命令来生成私钥:
openssl genrsa -out ca.key 2048
这个命令将生成一个密钥长度为 2048 的密钥并保存在 ca.key 中,有了这个密钥,就可以用它来生成 EMQX 的根证书了:
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.pem
查看 CA 证书信息(可选):
openssl x509 -in ca.pem -noout -text
根证书是整个信任链的起点,如果一个证书的每一级签发者向上一直到根证书都是可信的,那个我们就可以认为这个证书也是可信的,有了这个根证书,我们就可以用它来给其他实体签发实体证书了。
除了 CA 证书,实体(在这里指的是 EMQX)也需要一个自己的 Key 对来保证它对自己证书的控制权,生成这个密钥的过程和上面类似:
openssl genrsa -out emqx.key 2048
新建 openssl.cnf
文件:
[req]
default_bits = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = Guangdong
localityName = Dongguan
organizationName = EMQX
commonName = Server certificate
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = BROKER_ADDRESS
DNS.1 = BROKER_ADDRESS
- req_distinguished_name :根据情况进行修改,
- alt_names: BROKER_ADDRESS 修改为 EMQX 服务器实际的 IP 或 DNS 地址,例如:IP.1 = 127.0.0.1,或 DNS.1 = broker.xxx.com
然后以这个密钥和配置签发一个证书请求:
openssl req -new -key ./emqx.key -config openssl.cnf -out emqx.csr
以根证书来签发 EMQX 的实体证书:
openssl x509 -req -in ./emqx.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out emqx.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf
查看 EMQX 实体证书(可选):
openssl x509 -in emqx.pem -noout -text
验证 EMQX 实体证书,确定证书是否正确:
openssl verify -CAfile ca.pem emqx.pem
emqx.pem: OK
准备好证书后,我们就可以启用 EMQX 的 TLS/SSL 功能了。
四、SSL/TLS 启用及验证
在 EMQX 中 mqtt:ssl 的默认监听端口为 8883。
将前文中通过 OpenSSL 工具生成的 emqx.pem、emqx.key 及 ca.pem 文件拷贝到 EMQX 的 certs 目录下,并参考如下配置修改 emqx.conf
:
sudo cp emqx.key emqx.pem ca.pem /etc/emqx/certs/
## listener.ssl.$name is the IP address and port that the MQTT/SSL
## Value: IP:Port | Port
listener.ssl.external = 8883
## Path to the file containing the user's private PEM-encoded key.
## Value: File
listener.ssl.external.keyfile = etc/certs/emqx.key
## 注意:如果 emqx.pem 是证书链,请确保第一个证书是服务器的证书,而不是 CA 证书。
## Path to a file containing the user certificate.
## Value: File
listener.ssl.external.certfile = etc/certs/emqx.pem
## 注意:ca.pem 用于保存服务器的中间 CA 证书和根 CA 证书。可以附加其他受信任的 CA,用来进行客户端证书验证。
## Path to the file containing PEM-encoded CA certificates. The CA certificates
## Value: File
listener.ssl.external.cacertfile = etc/certs/ca.pem
# emqx.conf
listener.ssl.external = 8883
listener.ssl.external.keyfile = "./certs/emqx.key"
listener.ssl.external.certfile = "./certs/emqx.pem"
listener.ssl.external.cacertfile = "./certs/ca.pem"
注意,配置完成需要重启 EMQX !打开 EMQX 的 Dashboard 在 Listeners 页面可以看到在 8883 端口上有一个 ssl
连接:
MQTT 连接测试:
https://mqttx.app/zh/downloads
当配置完成并重启 EMQX 后,我们使用 MQTT 客户端工具 - MQTTX(该工具跨平台且支持 MQTT 5.0),来验证 TLS 服务是否正常运行。
sudo dpkg -i MQTTX_1.12.0_amd64.deb
在前文中我们配置了认证,所以这里需要注意输入帐号密码,同时开启 SSL/TLS,证书类型选择 CA or Self signed certificates
:
连接成功,订阅主题 testtop
五、创建项目
dotnet new console -f net8.0
六、安装 MQTTnet 包
注意,这里使用的 MQTTnet 版本为 4.2.0.706
,并没有使用最新版。
七、设置连接
要连接到 EMQX Cloud Serverless 服务,需要创建 MqttClientOptionsBuilder
类的实例,并设置必要的选项,如代理地址、端口、用户名和密码。
string broker = "******.emqxsl.com";
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
string username = "emqxtest";
string password = "******";
// Create a MQTT client factory
var factory = new MqttFactory();
// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();
// Create MQTT client options
var options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port) // MQTT broker address and port
.WithCredentials(username, password) // Set username and password
.WithClientId(clientId)
.WithCleanSession()
.Build();
八、使用 TLS/SSL
连接到 EMQX Serverless 时,需要注意的是,它依赖于多租户架构,该架构使多个用户能够共享单个 EMQX 集群,为了保证这种多租户环境内数据传输的安全性和可靠性,需要 TLS,并且如果服务器使用的是自签名证书,则必须从部署概览面板下载相应的 CA 文件,并在连接建立过程中提供。
要添加 TLS 并将证书文件设置为 MqttClientOptionsBuilder
实例,可以使用 WithTls()
。
string broker = "******.emqxsl.com";
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
string username = "emqxtest";
string password = "******";
// Create a MQTT client factory
var factory = new MqttFactory();
// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();
// Create MQTT client options
var options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port) // MQTT broker address and port
.WithCredentials(username, password) // Set username and password
.WithClientId(clientId)
.WithCleanSession()
.WithTls(
o =>
{
// The used public broker sometimes has invalid certificates. This sample accepts all
// certificates. This should not be used in live environments.
o.CertificateValidationHandler = _ => true;
// The default value is determined by the OS. Set manually to force version.
o.SslProtocol = SslProtocols.Tls12; ;
// Please provide the file path of your certificate file. The current directory is /bin.
var certificate = new X509Certificate("/opt/emqxsl-ca.crt", "");
o.Certificates = new List<X509Certificate> { certificate };
}
)
.Build();
九、连接到 MQTT 消息服务器
只需使用 MQTT 客户端的 PublishAsync
方法建立连接并开始发送和接收消息。
var connectResult = await mqttClient.ConnectAsync(options);
这里我们使用异步编程,在订阅的同时允许消息发布,防止阻塞。
十、订阅主题
连接到代理后,可以通过检查 ResultCode 的值来验证连接是否成功。如果连接成功,可以订阅主题来接收消息。
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
Console.WriteLine("Connected to MQTT broker successfully.");
// Subscribe to a topic
await mqttClient.SubscribeAsync(topic);
// Callback function when a message is received
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};
}
十一、发布消息
要向 EMQX Cloud Serverless 消息服务发送消息,请使用 MQTT 客户端的 PublishAsync
方法。以下是循环向消息服务发送消息的示例,每秒发送一条消息:
for (int i = 0; i < 10; i++)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload($"Hello, MQTT! Message number {i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(message);
await Task.Delay(1000); // Wait for 1 second
}
十二、取消订阅
要取消对消息主题的订阅,请调用:
await mqttClient.UnsubscribeAsync(topic);
十三、断开连接
要断开连接,请调用:
await mqttClient.DisconnectAsync();
十四、完整代码
下面的代码展示了如何连接到服务器、订阅主题以及发布和接收消息。
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
// string broker = "******.emqxsl.com";
string broker = "127.0.0.1";
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
// string username = "emqxtest";
// string password = "******";
string username = "sam";
string password = "0123456789";
// Create a MQTT client factory
var factory = new MqttFactory();
// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();
// Create MQTT client options
var options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port) // MQTT broker address and port
.WithCredentials(username, password) // Set username and password
.WithClientId(clientId)
.WithCleanSession()
.WithTls(
o =>
{
// The used public broker sometimes has invalid certificates. This sample accepts all
// certificates. This should not be used in live environments.
o.CertificateValidationHandler = _ => true;
// The default value is determined by the OS. Set manually to force version.
o.SslProtocol = SslProtocols.Tls12;
// Please provide the file path of your certificate file. The current directory is /bin.
// var certificate = new X509Certificate("/opt/emqxsl-ca.crt", "");
var certificate = new X509Certificate("ca.pem", "");
o.Certificates = new List<X509Certificate> { certificate };
}
)
.Build();
// Connect to MQTT broker
var connectResult = await mqttClient.ConnectAsync(options);
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
Console.WriteLine("Connected to MQTT broker successfully.");
// Subscribe to a topic
await mqttClient.SubscribeAsync(topic);
// Callback function when a message is received
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};
// Publish a message 10 times
for (int i = 0; i < 10; i++)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload($"Hello, MQTT! Message number {i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(message);
await Task.Delay(1000); // Wait for 1 second
}
// Unsubscribe and disconnect
await mqttClient.UnsubscribeAsync(topic);
await mqttClient.DisconnectAsync();
}
else
{
Console.WriteLine($"Failed to connect to MQTT broker: {connectResult.ResultCode}");
}
运行效果:
Connected to MQTT broker successfully.
Received message: Hello, MQTT! Message number 0
Received message: Hello, MQTT! Message number 1
Received message: Hello, MQTT! Message number 2
Received message: Hello, MQTT! Message number 3
Received message: Hello, MQTT! Message number 4
Received message: Hello, MQTT! Message number 5
Received message: Hello, MQTT! Message number 6
Received message: Hello, MQTT! Message number 7
Received message: Hello, MQTT! Message number 8
Received message: Hello, MQTT! Message number 9
参考文档
- https://www.emqx.com/zh/blog/connecting-to-serverless-mqtt-broker-with-mqttnet-in-csharp
- https://www.emqx.com/zh/blog/how-to-install-emqx-mqtt-broker-on-ubuntu
- https://www.emqx.com/zh/blog/emqx-server-ssl-tls-secure-connection-configuration-guide
- https://www.emqx.com/zh/blog/enable-two-way-ssl-for-emqx