实战案例:使用C#实现高效MQTT消息发布系统

发布于:2025-07-01 ⋅ 阅读:(24) ⋅ 点赞:(0)

1. 引言

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,广泛应用于物联网(IoT)和实时数据传输场景。在工业自动化、智能家居等领域,我们经常需要将大量设备数据上传至云端或本地服务器。

本文将分享一个基于C#的MQTT消息发布系统,该系统具备以下特点:

  • 异步编程:使用async/await提高吞吐量

  • 多线程优化:并行发送消息,提高效率

  • 并发控制:使用SemaphoreSlim防止服务器过载

  • 错误处理:自动记录日志并邮件通知异常

2. 开发环境与依赖

  • 开发语言:C# (.NET 4.8)

  • MQTT库MQTTnet(通过NuGet安装)

  • 数据库:Oracle 11g(存储待发送数据)

  • 日志系统:自定义日志记录

3. 核心代码实现

3.1 发布和订阅

在MQTT消息系统中,仅依赖PublishAsync的返回结果并不能100%保证消息已被Broker正确处理。我们曾遇到以下问题场景:

  • Broker返回Success,MQTTX订阅相关的主题,但是没有收到我们发布的消息。(发布消息和订阅消息都使用了Qos 1)

可能的原因:

 1)Qos 1的确认范围

  • PUBACK仅表示Broker接收成功,不保证:

    • 消息已持久化到磁盘(若Broker崩溃)

    • 消息已传递给订阅者

    • 订阅者已成功处理消息

2)网络连接不稳定等原因

解决方案:在发布消息后,主动订阅自身发布的消息,通过双重确认机制确保消息可靠投递。

private async Task<string> SendMQData(string testData, string testId, string topic_Data, IMqttClient mqttClient)
        {
            //防御性编程:发布消息后,自己订阅topic,确保broker有收到消息再更新数据库。
            //原因:就算用Qos1去发布消息后,即使publishResult.ReasonCode返回Success
            //      MQTTX有时候仍然无法收到消息(尤其并发量高的时候),如果此时更新数据库,将无法得知是哪笔数据没传输成功。

            #region 异步方式执行
            if (!mqttClient.IsConnected)
            {
                await mqttClient.ReconnectAsync();
            }
            // 1. 创建一个“任务完成源”(用于等待异步事件)
            var receivedSignal = new TaskCompletionSource<bool>();
            // 2. 订阅主题
            await mqttClient.SubscribeAsync(topic_Data, MqttQualityOfServiceLevel.AtLeastOnce);
            mqttClient.ApplicationMessageReceivedAsync += e =>
            {
                if (e.ApplicationMessage.Topic == topic_Data)
                {
                    
                    if (Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray()) == testData)
                    {
                        receivedSignal.TrySetResult(true); // 通知“已收到正确消息”
                    }
                }
                return Task.CompletedTask;
            };
            // 3. 发布消息
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic_Data)
                .WithPayload(testData)
                .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                .Build();

            var publishResult = await mqttClient.PublishAsync(message);

            if (publishResult.ReasonCode != MqttClientPublishReasonCode.Success)
            {
                UpdateLog("发布失败(Broker拒绝), Id:" + testId + "\r\n");
                log.SaveMsg("SendDataLog", $"发布失败(Broker拒绝), Id: {testId}", DateTime.Now, false);
                return "NG";
            }
            // 4. 等待最多 2 秒,看是否能收到消息
            var timeoutTask = Task.Delay(2000);
            var completedTask = await Task.WhenAny(receivedSignal.Task, timeoutTask);

            // 5. 判断结果
            if (completedTask == receivedSignal.Task && await receivedSignal.Task)
            {
                updateData(testId);
                UpdateLog("上传成功, Id:" + testId + "\r\n");
                log.SaveMsg("SendDataLog", $"上传成功, Id: {testId}", DateTime.Now, true);
                return "OK";
            }
            else
            {
                UpdateLog("上传失败(未收到确认), Id:" + testId + "\r\n");
                log.SaveMsg("SendDataLog", $"上传失败(未收到确认), Id: {testId}", DateTime.Now, true);
                return "NG";
            }
            #endregion

        }

3.2 MQTT连接配置和异步并发

特点:

1)一次连接,多次发布消息(减少连接开销)

2)使用Task并行发送消息,并通过SemaphoreSlim控制最大并发数。多次测试表明,当并发数大于18时,会显著出现3.1描述的情况(即:PublishAsync的返回结果是Success,但是MQTTX作为订阅者并不能收到消息,发布方和接收方都是用Qos 1 的情况下)

public async Task<string> AutoTaskUploadAsync(
            DataTable allData,
            (string Address, string Account, string Pwd, string Port) Config)
        {
            StringBuilder resId = new StringBuilder();
            List<Task> tasks = new List<Task>();
            object lockObj = new object();

            string broker = Config.Address;
            int port = int.Parse(Config.Port);
            string clientId = Guid.NewGuid().ToString();
            //string topic = topic_Data;
            string username = Config.Account;
            string password = Config.Pwd;

            //多次测试证明12个并发数比较安全(不同服务器性能不一致,这里大家可以按需修改)
            SemaphoreSlim _semaphore = new SemaphoreSlim(12);

            var factory = new MqttFactory();
            var mqttClient = factory.CreateMqttClient();
            var options = new MqttClientOptionsBuilder()
                .WithTcpServer(broker, port)
                .WithCredentials(username, password)
                .WithClientId(clientId)
                .WithCleanSession()
                .Build();
            
            var connectResult = await mqttClient.ConnectAsync(options);
            if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
            {
                
                for (int i = 0; i < allData.Rows.Count; i++)
                {
                    int currentRow = i;


                    tasks.Add(Task.Run(async () =>
                    {
                        //等待信号量许可
                        await _semaphore.WaitAsync().ConfigureAwait(false);
                        try
                        {
                            string res = await SendMQDataTo(
                                Config.Address,
                                Config.Account,
                                Config.Pwd,
                                Config.Port,
                                allData.Rows[currentRow]["test_data"].ToString(),
                                allData.Rows[currentRow]["tpi_id"].ToString(),
                                allData.Rows[currentRow]["topic"].ToString(),
                                mqttClient
                            ).ConfigureAwait(false);

                            //lock (lockObj)
                            //{
                            //    resId.Append(allData.Rows[currentRow]["tpi_id"].ToString() + ",");
                            //}
                        }
                        catch (Exception ex)
                        {
                            string errorMsg = $"发送数据异常--{ex}";
                            log.SaveMsg("SendDataLog", errorMsg, DateTime.Now, false);
                            common.SendEmail("exampleMES@example.COM",
                                $"MQTT上传失败,ID:{allData.Rows[currentRow][0]}",
                                ex.ToString());
                        }
                        finally
                        {
                            // 释放信号量
                            _semaphore.Release();
                        }
                    }
                    ));
                }
            }
            else
            {
                string strres = common.SendEmail("exampleMES@example.COM", string.Format(@"连接MQ Broker失败,原因:" + connectResult.ResultCode.ToString()), connectResult.ResultCode.ToString());
            }
            // Unsubscribe and disconnect



            await Task.WhenAll(tasks);
            //mqttClient.ApplicationMessageReceivedAsync -= OnMessageReceived;
            await mqttClient.DisconnectAsync();

            // 移除末尾逗号
            if (resId.Length > 0)
            {
                resId.Length--;
                resId.Append(")");
            }
            return resId.ToString();
        }

3.3 记录日志、定时执行和异步显示

日志部分:

特点:操作前显示做到哪一步,方便后续维护时排查问题和分析性能瓶颈。

如本次项目中,本人一直以为多线程的速度有瓶颈,加了日志后,发现是读取数据库表时存在瓶颈。

private async Task SendData(string getDate)
        {
            string Addres = string.Empty; // MQ服务器地址    
            string Account = string.Empty; // MQ连接账号  
            string Pwd = string.Empty;   // MQ连接密码
            string Port = string.Empty; // 端口


            StringBuilder result = new StringBuilder();
            StringBuilder resId = new StringBuilder();


            string ngMsg = string.Empty;

            Addres = txtAddress.Text;
            Account = txtAccount.Text;
            Pwd = txtPassWord.Text;
            Port = txtPort.Text;
            var Config = (Addres, Account, Pwd, Port);

            if (string.IsNullOrEmpty(Addres))
            {
                MessageBox.Show("请输入MQ服务器地址");
                return;
            }

            if (string.IsNullOrEmpty(Account))
            {
                MessageBox.Show("请输入MQ账号");
                return;
            }

            if (string.IsNullOrEmpty(Pwd))
            {
                MessageBox.Show("请输入MQ密码");
                return;
            }

            if (string.IsNullOrEmpty(Port))
            {
                MessageBox.Show("请输入MQ端口号");
                return;
            }

            string strDate = getDate;

            //获取全部没上传的记录
            UpdateLog("开始读取T_TABLE信息"+"\r\n");
            DataTable allData = GetDataToSend();
            UpdateLog("读取完成,准备上传" + "\r\n");

            ///如果有输入日期,则只获取日期内没上传的记录
            if (!string.IsNullOrEmpty(strDate))
            {
                allData = GetDataToSend(strDate);
            }

            if (allData.Rows.Count == 0 || allData is null)
            {
                UpdateLog(string.Format(@"----------暂无数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");
                log.SaveMsg("SendDataLog", string.Format(@"----------暂无数据,时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true);  //插入日志
                return;
            }

            result.Append(string.Format(@"本次上传共{0}条数据,ID(", allData.Rows.Count));

            UpdateLog(string.Format(@"----------开始上传数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");
            log.SaveMsg("SendDataLog", string.Format(@"----------开始上传数据时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true);  //插入日志

            #region 异步调用(更稳定)
            string asyncResult = await AutoTaskUploadAsync(allData, Config);
            result.Append(asyncResult);
            #endregion


            UpdateLog(result.ToString() + "\r\n");
            log.SaveMsg("SendDataLog", result.ToString(), DateTime.Now, true);
            UpdateLog(string.Format(@"----------完成本次上传数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");
            log.SaveMsg("SendDataLog", string.Format(@"----------完成本次上传数据时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true);

        }

定时执行部分

特点:

利用定时器,根据前端设定的时间间隔,定时执行

private void xcButton3_Click(object sender, EventArgs e)
        {
            int seconds = int.Parse(this.txt_Seconds.Text.ToString());
            this.timer1.Interval = seconds * 1000;
            this.btnStart.Enabled = false;
            this.btnStop.Enabled = true;
            点击启动时,立刻运行一次
            this.BeginInvoke(new Action(() => timer1_Tick(null, EventArgs.Empty)));
            this.timer1.Start();
        }

        private void btnStop_Click(object sender, EventArgs e)
        {
            this.btnStart.Enabled = true;
            this.btnStop.Enabled = false;
            this.timer1.Stop();
        }

        private async void timer1_Tick(object sender, EventArgs e)
        {
            LogClear();
            UpdateLog("\r\n" + "本次上传时间:" + DateTime.Now.ToString() + "\r\n");
            int seconds = int.Parse(this.txt_Seconds.Text.ToString());
            await SendData("");
            UpdateLog("下次上传时间:" + DateTime.Now.AddSeconds(seconds).ToString() + "\r\n");
        }

异步显示部分

特点:

1)如果当前线程不在主线程,利用委托,将日志显示在控件上

private void UpdateLog(string message)
        {
            if (txt_SendData_Log.InvokeRequired)
            {
                txt_SendData_Log.Invoke((MethodInvoker)delegate
                {
                    txt_SendData_Log.AppendText(message + "\r\n");
                });
            }
            else
            {
                txt_SendData_Log.AppendText(message + "\r\n");
            }
        }

2)当txtbox超过100行日志时,清楚控件上的记录,释放内存,防止内存溢出。

private void LogClear()
        {
            int loglines = this.txt_SendData_Log.Lines.Length;
            if (loglines >= 100)
            {
                this.txt_SendData_Log.Clear();
            }
        }

3)将日志写入到txt文档中,做完整的日志记录。

Loger类

public class Loger
    {
        private object LockObj = new object();
        private string ExePath;

        public Loger()
        {
            ExePath = Path.GetDirectoryName(System.Diagnostics.Process.GetCurrentProcess().MainModule.FileName);
        }

        private StreamWriter Create(string fullPath)
        {
            StreamWriter sr;
            if (File.Exists(fullPath))
            {
                sr = File.AppendText(fullPath);
            }
            else
            {
                sr = File.CreateText(fullPath);
            }
            return sr;
        }

        public void SaveMsg(string cataLog, string message, DateTime dt, bool isLoger)
        {
            string fileName = string.Format("{0}.log", dt.ToString("yyyyMMdd"));
            Save(cataLog, fileName, message, isLoger);

        }

        public void Save(string cataLog, string fileName, string msg, bool isLoger)
        {
            lock (LockObj)
            {
                if (isLoger)
                {
                    try
                    {
                        string fullPath = Path.Combine(ExePath + "\\" + cataLog, fileName);
                        EnsureDirectory(fullPath);
                        using (StreamWriter sw = Create(fullPath))
                        {
                            sw.WriteLine(string.Format("{0}-->:{1}\r\n", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg));
                            sw.Close();
                        }
                    }
                    catch { }
                }
            }
        }

        private void EnsureDirectory(string fullPath)
        {
            string path = Path.GetDirectoryName(fullPath);
            if (!Directory.Exists(path))
                Directory.CreateDirectory(path);
        }

        public void Remove(string fileName)
        {
            lock (LockObj)
            {
                if (File.Exists(fileName))
                {
                    File.Delete(fileName);
                }
            }
        }

调用方式


private Loger log = new Loger();

log.SaveMsg("SendDataLog", $"上传成功, Id: {testId}", DateTime.Now, true);


网站公告

今日签到

点亮在社区的每一天
去签到