1、MQTT协议
一个 MQTT 数据包由: 固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成
(1) 固定头(Fixed header)。存在于所有 MQTT 数据包中,包含内容如MQTT报文类型(连接,断开连接,发布,订阅,心跳请求,心跳响应等)、Qos等级(0:最多分发一次,1:最少分发一次,2:只分发一次)等。
(2) 可变头(Variable header)。存在于部分 MQTT 数据包中,数据包类型决定了可变头是否存在及其具体内容。可变头部不是可选的意思,而是指这部分在有些协议类型中存在,在有些协议中不存在。
(3) 消息体(Payload)。存在于部分 MQTT 数据包中,表示客户端收到的具体内容。与可变头一样,在有些协议类型中有消息内容,有些协议类型中没有消息内容。
针对固定头和可变头的协议,一般进行上位机和单片机开发时都会移植第三方库,然后我们主要将焦点放在消息体上,传输数据时给消息体增加一个什么协议,方便数据的发送和解析
MQTT是一种轻量级的发布/订阅协议,适合物联网和低带宽环境。云平台可能需要处理大量设备连接,此时MQTT的高效性和低开销就很有优势。
2、C#上位机连接云平台
(1)、框架升级:升级为 .NET Framework 4.8
(2)、安装MQTT Net程序包:建议选择4.3.6.1152版本,高版本未必可以使用,低版本API不相同
(3)、添加控件:云平台连接的几个参数:ip,port,username,password,clientId。
注意:
ip:服务器的ip地址,阿里云物联网云平台MQTT客户端直连:设备使用TCP接入的安全风险非常高,新建的企业版实例默认关闭TCP(非TLS加密)接入方式。有的云平台可以用ip连接,但有的只能用mqttHostUrl连接
clientId:表示客户端ID,可自定义,长度不可超过64个字符。建议使用设备的MAC地址或SN码,方便您识别区分不同的客户端。不同的云平台对其定义不一致,有的云平台对其值不做任何限制,有的云平台clientID必须完全匹配
(4)、代码编写
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Internal;
IMqttClient mqttClient = new MqttFactory().CreateMqttClient();
//连接阿里云平台
private async void button_mqtt_connect_Click(object sender, EventArgs e)
{
try
{
if (button_mqtt_connect.Text == "连接")
{
string ip = textbox_mqtt_ip.Text;
int port = Convert.ToInt32(textbox_mqtt_port.Text);
string username = textbox_mqtt_username.Text;
string password = textbox_mqtt_password.Text;
string clientId = textbox_mqtt_clientId.Text;
var options = new MqttClientOptionsBuilder()
.WithTcpServer(ip, port)
.WithCredentials(username, password)
.WithClientId(clientId)
.Build();
mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;//处理订阅主题报文
//设置成功连接回调,用于订阅消息
mqttClient.ConnectedAsync += async Connect =>
{
button_mqtt_connect.Invoke((Action)(() => button_mqtt_connect.Text = "断开"));
//button_mqtt_connect.Text = "断开";
string LogShow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + " 连接服务器成功" + "\r\n";
Invoke(new Action(() => textBox_mqtt_recv.AppendText(LogShow)));
string topic = "0x04/" + textbox_mqtt_sn.Text;
await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());
Invoke(new Action(() => textBox_mqtt_recv.AppendText("subscribe topic:" + topic + "\r\n")));
};
//设置断开连接回调
mqttClient.DisconnectedAsync += async Disconnect =>
{
//button_mqtt_connect.Text = "连接";
button_mqtt_connect.Invoke((Action)(() => button_mqtt_connect.Text = "连接"));
string LogShow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + " 断开服务器" + "\r\n";
Invoke(new Action(() => textBox_mqtt_recv.AppendText(LogShow)));
};
//连接到MQTT服务器
await mqttClient.ConnectAsync(options);
}
else
{
//button_mqtt_connect.Text = "连接";
button_mqtt_connect.Invoke((Action)(() => button_mqtt_connect.Text = "连接"));
mqttClient.DisconnectAsync();
}
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
var message = arg.ApplicationMessage;
var Messagehex = new StringBuilder(message.Payload.Length * 2);
foreach (byte b in message.Payload)
{
Messagehex.AppendFormat("{0:X2}", b); // 大写字母,如需小写改为 "x2"
}
string payload = Messagehex.ToString();
string TopicLogShow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + "收到主题:" + message.Topic + " 消息:" + payload+ "\r\n";
Invoke(new Action(() => textBox_mqtt_recv.AppendText(TopicLogShow)));
throw new NotImplementedException();
}
private void button_mqtt_publish_Click(object sender, EventArgs e)
{
try
{
if (mqttClient.IsConnected)
{
byte[] payload = { 0x5A,0x5A};
string topic = "0x04/" + textbox_mqtt_sn.Text;//主题如果错误,则会断开连接
mqttClient.PublishBinaryAsync(topic, payload);
Invoke(new Action(() => textBox_mqtt_recv.AppendText("publish topic :" + topic + "\r\n")));
}
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
}
(5)、测试结果
3、单片机连接云平台
(1)、下载MQTT 源码
(2)工程中新建一个MQTT文件夹
将压缩包中paho.mqtt.embedded-c-master.zip\paho.mqtt.embedded-c-master\MQTTPacket\src 中的全部文件以及paho.mqtt.embedded-c-master.zip\paho.mqtt.embedded-c-master\MQTTPacket\samples 内的transport.c和transport.h拷入MQTT文件夹
(3)修改源码
第一步:
我们需要做的事情很简单, 只需要修改transport.c中的2个函数即可:
int transport_sendPacketBuffer(unsigned char* buf, int buflen);
int transport_getdata(unsigned char* buf, int count);
剩下的这三个可以不用去管,直接清空函数内容返回0即可
int transport_getdatanb(void *sck, unsigned char* buf, int count);
int transport_open(char* host, int port);
int transport_close(int sock);
//该函数把sock参数删除,原因是源代码使用sock发送数据,此处不需要
int transport_sendPacketBuffer( unsigned char* buf, int buflen)
{
//发送数据到网络函数,此函数需要自己调用才会生效
//mytcp_write(buf,buflen);
return 1;
}
int transport_getdata(unsigned char* buf, int count)
{
//从网络中接收的数据放到buf中,此函数需要自己调用才会生效,这个函数的作用是,获取count长度的数据,存放在buf数组中
//将接收数据拷贝到buf数组中,如果接收数据长度小于count,说明接收过程是失败的,返回的值表示你接受到的数据长度
if(recev_packet->length >= count)
{
memcpy(buf,recev_packet->bytes,count);
memset(recev_packet->bytes,0,sizeof(recev_packet->bytes));
recev_packet->length = 0;
return count;
}
else if(recev_packet->length < count)
{
memcpy(buf,recev_packet->bytes,recev_packet->length);
memset(recev_packet->bytes,0,sizeof(recev_packet->bytes));
recev_packet->length = 0;
return recev_packet->length;
}
return 0;
return count;
}
int transport_getdatanb(void *sck, unsigned char* buf, int count)
{
return 0;
}
int transport_open(char* addr, int port)
{
return 0;
}
int transport_close(int sock)
{
return 0;
}
第二步:
编译报错,发现 cannot open source input file "sys/types.h",将tranport.c除开函数的部分全部删除,加入自己所需的头文件
#include "main.h"
#include "transport.h"
#include "MQTTPacket.h"
第三步:
新增连接云平台函数,订阅和发布函数(注意:博主为验证功能,将新增函数添加到transport.c文件)
#include "main.h"
#include "transport.h"
#include "MQTTPacket.h"
char *pubtopic="Topic_Send";//设备发送的主题,服务器订阅的主题
char *subtopic = "Topic_Receive";//设备接收的主题,服务器发送的主题
#define MQTT_MAX_BUFF 200
uint8_t S_mqtt_buf[MQTT_MAX_BUFF] = {0};
//调用此函数前需要确保单片机已经连接了服务器
void mqtt_connect(void)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;//配置可变头部分
data.clientID.cstring = "ClientID";//设备唯一ID,请向服务器端工程师索要或者商量制定
data.keepAliveInterval = 60;//保活计时器,即连续两次心跳包发送的最大时间间隔(单位:秒)
data.cleansession = 1;//1:丢弃之前的连接信息
data.username.cstring = "userName" ;//用户名,向服务器端工程师索要即可
data.password.cstring = "password";//密码,向服务器端工程师索要即可
data.MQTTVersion = 4;
memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
uint32_t len = MQTTSerialize_connect(S_mqtt_buf, MQTT_MAX_BUFF, &data); //构建连接报文并获取长度
transport_sendPacketBuffer(S_mqtt_buf,len);//通过硬件接口发送给服务器已经封装好的报文
//如果可以接受到报文信息,则说明连接成功,云平台连接成功报文是20 2 0 0
//连接成功后即可订阅报文
LOG("wait mqtt connect\r\n");
}
void MQTT_Disconnect(void)
{
memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
uint16_t send_len = MQTTSerialize_disconnect(S_mqtt_buf, MQTT_MAX_BUFF);
transport_sendPacketBuffer(S_mqtt_buf,send_len);//Send disconnected data to the server
memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
send_len = MQTTSerialize_disconnect(S_mqtt_buf, MQTT_MAX_BUFF);
transport_sendPacketBuffer(S_mqtt_buf,send_len);//Send disconnected data to the server
}
/*超过keepAliveInterval时间没有通信,连接会自动断开,所以需要定期发送心跳包(PINGREQ/PINGRESP)来维持连接
MQTT协议允许通过发送任何上行报文(如PUBLISH,subscribe)替代心跳包(PINGREQ/PINGRESP)来维持连接*/
void MQTTSendHeartbeat(void)
{
unsigned char h_buf[200] = {0};//长度自行设定测试
uint8_t len = MQTTSerialize_pingreq(h_buf, 200);//调用mqtt心跳包封装函数
transport_sendPacketBuffer((unsigned char*)h_buf, len);//发送数据
}
int mqtt_publish(char *pTopic,char *pMessage,uint8_t msglen)
{
MQTTString topicString = MQTTString_initializer;
topicString.cstring = pTopic;
memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
int32_t len = MQTTSerialize_publish(S_mqtt_buf, MQTT_MAX_BUFF, 0, 0, 0, 0, topicString, (unsigned char*)pMessage, msglen); //
transport_sendPacketBuffer(S_mqtt_buf,len);//
return 0;
}
int mqtt_subscribe(char *pTopic)
{
int32_t len;
MQTTString topicString = MQTTString_initializer;
memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
topicString.cstring = pTopic;
len = MQTTSerialize_subscribe(S_mqtt_buf, MQTT_MAX_BUFF, 0, 1, 1, &topicString, 0);
transport_sendPacketBuffer((unsigned char*)S_mqtt_buf, len);
return 0;
}
//0:success 1:Fault
uint8_t MQTT_Unsubscribe_Topic(char *unsub_topic,int msgid)
{
MQTTString topicString = MQTTString_initializer;
topicString.cstring = unsub_topic;
uint16_t send_len = MQTTSerialize_unsubscribe(S_mqtt_buf,MQTT_MAX_BUFF, 0, msgid,1, &topicString);//Serialized unsubscribe data message
transport_sendPacketBuffer(S_mqtt_buf,send_len);//Send unsubscribe data message to the server
memset(S_mqtt_buf,0,MQTT_MAX_BUFF);
return 0;
}
第四步:开始验证代码移植情况
如果没有联网条件,可以通过查看报文来判断是否移植成功,首先通过网络调试助手连接到服务器,然后发送报文数据到服务器,就可以连接成功,报文的详细解释可以看下面的博客
网络调试助手使用MQTT协议与Mosquitto通信(3)_mqtt调试助手-CSDN博客
第五步:采用以太网+路由器 连接云平台
(1)首先配置客户端连接云平台服务器
tcp_client_init函数,改变ip和端口号
(2)在连接成功回调函数中加入mqtt连接函数,下面代码只述说逻辑
static err_t tcp_client_connected(void *arg, struct tcp_pcb *pcb, err_t err)
{
tcp_arg(pcb, mem_calloc(sizeof(struct recev_packet), 1));
/* configure LwIP to use our call back functions */
tcp_recv(pcb, tcp_client_recv);
tcp_connect_ok = 1;
mqtt_subscribe(subtopic);//mqtt连接成功后,订阅主题,开始测试
.......
return ERR_OK;
}
if(tcp_connect_ok)
{
mqtt_connect();//服务器连接成功后,调用mqtt连接函数,注意修改用户名,密码,clientId
}
tcp_client_recv()
{
LOG("client recv : ");
for(int i=0;i<recev_packet->length;i++)
{
LOG("%x ",recev_packet->bytes[i]);
}
LOG("\r\n");
//tcp接收函数处理,如果发送mqtt连接报文后,接收到20 2 0 0报文,说明mqtt连接成功
mqtt_connect_ok = 1;
}
if(mqtt_connect_ok )
{
LOG("mqtt connect OK\r\n");
LOG("subscribe\r\n");
mqtt_subscribe(subtopic);//连接成功后订阅报文
}
if(时间超过2S)
{
char buf[10] = {0x5a,0x5a};
mqtt_publish(pubtopic,buf,2);//每2S钟发布一次,此处验证代码,所以保证pubtopic和subtopic主题相同
}
tcp_client_recv()//如果可以接收报文,并且数据正确,说明订阅和发布功能都成功
3、测试,移植成功