云平台+MQTT+C#上位机+单片机通信

发布于:2025-05-01 ⋅ 阅读:(38) ⋅ 点赞:(0)

1、MQTT协议

MQTT Version 3.1.1

一个 MQTT 数据包由: 固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成

(1) 固定头(Fixed header)。存在于所有 MQTT 数据包中,包含内容如MQTT报文类型(连接,断开连接,发布,订阅,心跳请求,心跳响应等)、Qos等级(0:最多分发一次,1:最少分发一次,2:只分发一次)等。
(2) 可变头(Variable header)。存在于部分 MQTT 数据包中,数据包类型决定了可变头是否存在及其具体内容。可变头部不是可选的意思,而是指这部分在有些协议类型中存在,在有些协议中不存在。
(3) 消息体(Payload)。存在于部分 MQTT 数据包中,表示客户端收到的具体内容。与可变头一样,在有些协议类型中有消息内容,有些协议类型中没有消息内容。

针对固定头和可变头的协议,一般进行上位机和单片机开发时都会移植第三方库,然后我们主要将焦点放在消息体上,传输数据时给消息体增加一个什么协议,方便数据的发送和解析

MQTT是一种轻量级的发布/订阅协议,适合物联网和低带宽环境。云平台可能需要处理大量设备连接,此时MQTT的高效性和低开销就很有优势。 

2、C#上位机连接云平台

(1)、框架升级:升级为 .NET Framework 4.8

(2)、安装MQTT Net程序包:建议选择4.3.6.1152版本,高版本未必可以使用,低版本API不相同

(3)、添加控件:云平台连接的几个参数:ip,port,username,password,clientId。

注意:

ip:服务器的ip地址,阿里云物联网云平台MQTT客户端直连:设备使用TCP接入的安全风险非常高,新建的企业版实例默认关闭TCP(非TLS加密)接入方式。有的云平台可以用ip连接,但有的只能用mqttHostUrl连接

clientId:表示客户端ID,可自定义,长度不可超过64个字符。建议使用设备的MAC地址或SN码,方便您识别区分不同的客户端。不同的云平台对其定义不一致,有的云平台对其值不做任何限制,有的云平台clientID必须完全匹配

(4)、代码编写

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Internal;  

 IMqttClient mqttClient = new MqttFactory().CreateMqttClient();

//连接阿里云平台
        private async void button_mqtt_connect_Click(object sender, EventArgs e)
        {
          
            try
            { 
                if (button_mqtt_connect.Text == "连接")
                {
                    string ip = textbox_mqtt_ip.Text;
                    int port = Convert.ToInt32(textbox_mqtt_port.Text);
                    string username = textbox_mqtt_username.Text;
                    string password = textbox_mqtt_password.Text;
                    string clientId = textbox_mqtt_clientId.Text;
   var options = new MqttClientOptionsBuilder()
                    .WithTcpServer(ip, port)
                    .WithCredentials(username, password)
                    .WithClientId(clientId)
                    .Build();

                    mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;//处理订阅主题报文

                    //设置成功连接回调,用于订阅消息
                    mqttClient.ConnectedAsync += async Connect =>
                    {
                        button_mqtt_connect.Invoke((Action)(() => button_mqtt_connect.Text = "断开"));
                        //button_mqtt_connect.Text = "断开";

                        string LogShow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + "  连接服务器成功" + "\r\n";
                        Invoke(new Action(() => textBox_mqtt_recv.AppendText(LogShow)));
   string topic = "0x04/" + textbox_mqtt_sn.Text;
                        await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());
                        Invoke(new Action(() => textBox_mqtt_recv.AppendText("subscribe topic:" + topic + "\r\n")));
                    };

                    //设置断开连接回调
                    mqttClient.DisconnectedAsync += async Disconnect =>
                    {
                        //button_mqtt_connect.Text = "连接";
                        button_mqtt_connect.Invoke((Action)(() => button_mqtt_connect.Text = "连接"));
                        string LogShow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + "  断开服务器" + "\r\n";
                        Invoke(new Action(() => textBox_mqtt_recv.AppendText(LogShow)));
                    };
                    //连接到MQTT服务器
                    await mqttClient.ConnectAsync(options);
                }
                else
                {
                    //button_mqtt_connect.Text = "连接";
                    button_mqtt_connect.Invoke((Action)(() => button_mqtt_connect.Text = "连接"));
                    mqttClient.DisconnectAsync();
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.Message);
            }
        }



 private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            var message = arg.ApplicationMessage;
            var Messagehex = new StringBuilder(message.Payload.Length * 2);
            foreach (byte b in message.Payload)
            {
                Messagehex.AppendFormat("{0:X2}", b); // 大写字母,如需小写改为 "x2"
            }
            string payload = Messagehex.ToString();
            string TopicLogShow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + "收到主题:" + message.Topic + "  消息:" + payload+ "\r\n";
            Invoke(new Action(() => textBox_mqtt_recv.AppendText(TopicLogShow)));    
            throw new NotImplementedException();
        }

 private void button_mqtt_publish_Click(object sender, EventArgs e)
        {
            try
            {
                if (mqttClient.IsConnected)
                {
                    byte[] payload = { 0x5A,0x5A};
                    string topic = "0x04/" + textbox_mqtt_sn.Text;//主题如果错误,则会断开连接
                    mqttClient.PublishBinaryAsync(topic, payload);
                    Invoke(new Action(() => textBox_mqtt_recv.AppendText("publish topic :" + topic + "\r\n")));
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.Message);
            }
        }

(5)、测试结果

3、单片机连接云平台

(1)、下载MQTT 源码

GitHub - eclipse-paho/paho.mqtt.embedded-c: Paho MQTT C client library for embedded systems. Paho is an Eclipse IoT project (https://iot.eclipse.org/)

(2)工程中新建一个MQTT文件夹

        将压缩包中paho.mqtt.embedded-c-master.zip\paho.mqtt.embedded-c-master\MQTTPacket\src 中的全部文件以及paho.mqtt.embedded-c-master.zip\paho.mqtt.embedded-c-master\MQTTPacket\samples 内的transport.c和transport.h拷入MQTT文件夹

 

(3)修改源码

第一步:

我们需要做的事情很简单, 只需要修改transport.c中的2个函数即可:
int transport_sendPacketBuffer(unsigned char* buf, int buflen);
int transport_getdata(unsigned char* buf, int count);
剩下的这三个可以不用去管,直接清空函数内容返回0即可
int transport_getdatanb(void *sck, unsigned char* buf, int count);
int transport_open(char* host, int port);
int transport_close(int sock);

//该函数把sock参数删除,原因是源代码使用sock发送数据,此处不需要
int transport_sendPacketBuffer( unsigned char* buf, int buflen)
{
	//发送数据到网络函数,此函数需要自己调用才会生效
    //mytcp_write(buf,buflen);
	return 1;
}

int transport_getdata(unsigned char* buf, int count)
{
    //从网络中接收的数据放到buf中,此函数需要自己调用才会生效,这个函数的作用是,获取count长度的数据,存放在buf数组中
	//将接收数据拷贝到buf数组中,如果接收数据长度小于count,说明接收过程是失败的,返回的值表示你接受到的数据长度
    if(recev_packet->length >= count)
	{
		memcpy(buf,recev_packet->bytes,count);
		memset(recev_packet->bytes,0,sizeof(recev_packet->bytes));
		recev_packet->length = 0;
		return count;
	}
	else if(recev_packet->length < count)
	{
		memcpy(buf,recev_packet->bytes,recev_packet->length);
		memset(recev_packet->bytes,0,sizeof(recev_packet->bytes));
		recev_packet->length = 0;
		return recev_packet->length;
	}
	return 0;
	return count;
}

int transport_getdatanb(void *sck, unsigned char* buf, int count)
{
	return 0;
}

int transport_open(char* addr, int port)
{
	return 0;
}

int transport_close(int sock)
{
	return 0;
}

第二步:

编译报错,发现 cannot open source input file "sys/types.h",将tranport.c除开函数的部分全部删除,加入自己所需的头文件

#include "main.h"
#include "transport.h"
#include "MQTTPacket.h"

第三步:

新增连接云平台函数,订阅和发布函数(注意:博主为验证功能,将新增函数添加到transport.c文件)

#include "main.h"
#include "transport.h"
#include "MQTTPacket.h"

char *pubtopic="Topic_Send";//设备发送的主题,服务器订阅的主题
char *subtopic = "Topic_Receive";//设备接收的主题,服务器发送的主题

#define MQTT_MAX_BUFF 200
uint8_t S_mqtt_buf[MQTT_MAX_BUFF] = {0};

//调用此函数前需要确保单片机已经连接了服务器
void mqtt_connect(void)
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;//配置可变头部分
	data.clientID.cstring = "ClientID";//设备唯一ID,请向服务器端工程师索要或者商量制定
	data.keepAliveInterval = 60;//保活计时器,即连续两次心跳包发送的最大时间间隔(单位:秒)
	data.cleansession = 1;//1:丢弃之前的连接信息
	data.username.cstring = "userName" ;//用户名,向服务器端工程师索要即可
	data.password.cstring = "password";//密码,向服务器端工程师索要即可
	data.MQTTVersion = 4;
	memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
	uint32_t len = MQTTSerialize_connect(S_mqtt_buf, MQTT_MAX_BUFF, &data); //构建连接报文并获取长度
	transport_sendPacketBuffer(S_mqtt_buf,len);//通过硬件接口发送给服务器已经封装好的报文
	
	//如果可以接受到报文信息,则说明连接成功,云平台连接成功报文是20  2  0  0
    //连接成功后即可订阅报文
    LOG("wait mqtt connect\r\n");
}

void MQTT_Disconnect(void)
{
	memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
	uint16_t send_len = MQTTSerialize_disconnect(S_mqtt_buf, MQTT_MAX_BUFF);
	transport_sendPacketBuffer(S_mqtt_buf,send_len);//Send disconnected data to the server
	
	memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
	send_len = MQTTSerialize_disconnect(S_mqtt_buf, MQTT_MAX_BUFF);
	transport_sendPacketBuffer(S_mqtt_buf,send_len);//Send disconnected data to the server
}

/*超过keepAliveInterval时间没有通信,连接会自动断开,所以需要定期发送心跳包(PINGREQ/PINGRESP)来维持连接
MQTT协议允许通过发送任何上行报文(如PUBLISH,subscribe)替代心跳包(PINGREQ/PINGRESP)来维持连接*/
void MQTTSendHeartbeat(void)
{
	unsigned char h_buf[200] = {0};//长度自行设定测试
	uint8_t len = MQTTSerialize_pingreq(h_buf, 200);//调用mqtt心跳包封装函数
	transport_sendPacketBuffer((unsigned char*)h_buf, len);//发送数据
}


int mqtt_publish(char *pTopic,char *pMessage,uint8_t msglen)
{
	MQTTString topicString = MQTTString_initializer;
	topicString.cstring = pTopic;
	memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
	int32_t  len = MQTTSerialize_publish(S_mqtt_buf, MQTT_MAX_BUFF, 0, 0, 0, 0, topicString, (unsigned char*)pMessage, msglen); //
	transport_sendPacketBuffer(S_mqtt_buf,len);//
	return 0;
}


int mqtt_subscribe(char *pTopic)
{
    int32_t len;
    MQTTString topicString = MQTTString_initializer;
    memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
    topicString.cstring = pTopic;
    len = MQTTSerialize_subscribe(S_mqtt_buf, MQTT_MAX_BUFF, 0, 1, 1, &topicString, 0);
	transport_sendPacketBuffer((unsigned char*)S_mqtt_buf, len);
    return 0;
}


//0:success 1:Fault
uint8_t MQTT_Unsubscribe_Topic(char *unsub_topic,int msgid)
{
	MQTTString topicString = MQTTString_initializer;
	topicString.cstring = unsub_topic;
	uint16_t send_len = MQTTSerialize_unsubscribe(S_mqtt_buf,MQTT_MAX_BUFF, 0, msgid,1, &topicString);//Serialized unsubscribe data message
	transport_sendPacketBuffer(S_mqtt_buf,send_len);//Send unsubscribe data message to the server
	memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
	
	return 0;
}

第四步:开始验证代码移植情况

如果没有联网条件,可以通过查看报文来判断是否移植成功,首先通过网络调试助手连接到服务器,然后发送报文数据到服务器,就可以连接成功,报文的详细解释可以看下面的博客

网络调试助手使用MQTT协议与Mosquitto通信(3)_mqtt调试助手-CSDN博客

第五步:采用以太网+路由器 连接云平台

(1)首先配置客户端连接云平台服务器

tcp_client_init函数,改变ip和端口号

(2)在连接成功回调函数中加入mqtt连接函数,下面代码只述说逻辑

static err_t tcp_client_connected(void *arg, struct tcp_pcb *pcb, err_t err)
{
    tcp_arg(pcb, mem_calloc(sizeof(struct recev_packet), 1));
    /* configure LwIP to use our call back functions */
    tcp_recv(pcb, tcp_client_recv);
    
    tcp_connect_ok = 1;
    

    mqtt_subscribe(subtopic);//mqtt连接成功后,订阅主题,开始测试
    .......
    return ERR_OK;
}

if(tcp_connect_ok)
{
    mqtt_connect();//服务器连接成功后,调用mqtt连接函数,注意修改用户名,密码,clientId
}


tcp_client_recv()
{
    LOG("client recv : ");
		for(int i=0;i<recev_packet->length;i++)
		{
			LOG("%x  ",recev_packet->bytes[i]);
		}
		LOG("\r\n");

    //tcp接收函数处理,如果发送mqtt连接报文后,接收到20 2 0 0报文,说明mqtt连接成功
    mqtt_connect_ok = 1;
}


if(mqtt_connect_ok )
{
    LOG("mqtt connect OK\r\n");
    LOG("subscribe\r\n");
	mqtt_subscribe(subtopic);//连接成功后订阅报文
}

if(时间超过2S)
{
    char buf[10] = {0x5a,0x5a};
    mqtt_publish(pubtopic,buf,2);//每2S钟发布一次,此处验证代码,所以保证pubtopic和subtopic主题相同
}

tcp_client_recv()//如果可以接收报文,并且数据正确,说明订阅和发布功能都成功

3、测试,移植成功


网站公告

今日签到

点亮在社区的每一天
去签到