day37 MQTT协议 多客户端服务器模型
服务器/多客户端模型
在服务器开发中,处理多个客户端连接是常见需求。根据不同的应用场景和资源限制,有多种实现方式:
多客户端处理模型
- 多路IO多客户端模型:使用
select
或epoll
等IO多路复用技术,单线程处理多个客户端连接 - 并发多客户端模型:使用
fork
创建子进程或pthread
创建子线程处理每个客户端 - 循环服务器模型:简单的
while(1)
循环中依次处理accept()
、recv()
、send()
,但只能处理单个客户端
线程模型实现 (tcp_thread)
服务器端代码 (ser.c)
#include <netinet/in.h> // 提供Internet地址族相关定义,如sockaddr_in结构体
#include <netinet/ip.h> // 提供IP协议相关定义
#include <pthread.h> // 提供线程操作相关函数和数据类型
#include <stdio.h> // 提供输入输出函数
#include <stdlib.h> // 提供标准库函数,如内存分配、程序退出等
#include <string.h> // 提供字符串操作函数
#include <sys/socket.h> // 提供套接字相关函数和数据类型
#include <sys/types.h> // 提供基本系统数据类型
#include <time.h> // 提供时间相关函数
#include <unistd.h> // 提供Unix标准函数,如close、read、write等
#include <semaphore.h> // 提供信号量相关函数和数据类型
// 定义一个信号量,用于同步主线程和子线程对连接套接字的处理
sem_t sem_conn;
// 线程处理函数:用于处理客户端连接的子线程
void *th(void *arg)
{
// 将传递过来的连接套接字描述符转换为int类型
int conn = *(int *)arg;
// 发送信号量,通知主线程可以继续接受新的连接
sem_post(&sem_conn);
// 分离当前线程,使其在结束时自动释放资源,无需主线程调用pthread_join
pthread_detach(pthread_self());
time_t tm; // 用于存储时间的变量
// 循环处理客户端请求
while (1)
{
char buf[1024] = {0}; // 缓冲区,用于接收和发送数据
// 从客户端接收数据,存入buf缓冲区
int ret = recv(conn, buf, sizeof(buf), 0);
// 如果接收失败或客户端关闭连接(ret <= 0)
if (ret <= 0)
{
close(conn); // 关闭连接套接字
break; // 退出循环,结束线程
}
// 获取当前系统时间
time(&tm);
// 将客户端发送的内容与当前时间拼接在一起
sprintf(buf, "%s %s", buf, ctime(&tm));
// 将拼接后的内容发送回客户端
send(conn, buf, strlen(buf), 0);
}
return NULL;
}
// 定义一个类型别名SA,代表struct sockaddr*,简化代码书写
typedef struct sockaddr *(SA);
int main(int argc, char **argv)
{
// 创建监听套接字
// AF_INET:使用IPv4地址族
// SOCK_STREAM:使用面向连接的TCP协议
// 0:自动选择合适的协议(此处为TCP)
int listfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == listfd) // 如果套接字创建失败
{
perror("scoket error\n"); // 打印错误信息
return 1; // 异常退出程序
}
// 定义服务器和客户端的地址结构体
struct sockaddr_in ser, cli;
// 初始化地址结构体为0
bzero(&ser, sizeof(ser));
bzero(&cli, sizeof(cli));
// 设置服务器地址信息
ser.sin_family = AF_INET; // 使用IPv4地址族
ser.sin_port = htons(50000); // 设置端口号为50000,htons用于主机字节序转网络字节序
ser.sin_addr.s_addr = INADDR_ANY; // 绑定到所有可用的网络接口(所有IP地址)
// 将监听套接字绑定到指定的地址和端口
int ret = bind(listfd, (SA)&ser, sizeof(ser));
if (-1 == ret) // 如果绑定失败
{
perror("bind"); // 打印错误信息
return 1; // 异常退出程序
}
// 开始监听套接字,等待客户端连接
// 第二个参数3表示等待连接队列的最大长度(三次握手未完成的连接)
listen(listfd, 3);
socklen_t len = sizeof(cli); // 用于存储客户端地址结构体的长度
// 初始化信号量,第二个参数0表示线程间共享,第三个参数0表示初始值
sem_init(&sem_conn, 0, 0);
// 循环接受客户端连接
while (1)
{
// 接受客户端连接,返回一个新的连接套接字用于与该客户端通信
// listfd:监听套接字
// (SA)&cli:用于存储客户端地址信息
// &len:用于存储客户端地址结构体的长度
int conn = accept(listfd, (SA)&cli, &len);
if (-1 == conn) // 如果接受连接失败
{
perror("accept"); // 打印错误信息
close(conn); // 关闭连接套接字
continue; // 继续接受下一个连接
}
// 创建子线程,用于处理当前客户端的请求
pthread_t tid;
// 第一个参数:线程ID
// 第二个参数:线程属性,NULL表示使用默认属性
// 第三个参数:线程处理函数
// 第四个参数:传递给线程处理函数的参数(连接套接字描述符)
pthread_create(&tid, NULL, th, &conn);
// 等待信号量,确保子线程已经获取了连接套接字描述符
sem_wait(&sem_conn);
}
// 关闭监听套接字(实际中由于上面是无限循环,这里的代码不会执行)
close(listfd);
// 销毁信号量
sem_destroy(&sem_conn);
return 0;
}
客户端代码 (cli.c)
#include <netinet/in.h> // 提供Internet地址族相关定义
#include <netinet/ip.h> // 提供IP协议相关定义
#include <stdio.h> // 提供输入输出函数
#include <stdlib.h> // 提供标准库函数
#include <string.h> // 提供字符串操作函数
#include <sys/socket.h> // 提供套接字相关函数
#include <sys/types.h> // 提供基本系统数据类型
#include <time.h> // 提供时间相关函数
#include <unistd.h> // 提供Unix标准函数(如sleep)
// 定义结构体指针别名,简化代码书写
typedef struct sockaddr *(SA);
int main(int argc, char **argv)
{
// 创建TCP套接字
// AF_INET: 使用IPv4地址族
// SOCK_STREAM: 使用面向连接的TCP协议
// 0: 自动选择合适的协议(此处为TCP)
int conn = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == conn) // 检查套接字创建是否失败
{
perror("socket"); // 输出错误信息
return 1; // 异常退出
}
// 定义服务器地址结构体
struct sockaddr_in ser;
// 初始化地址结构体为0
bzero(&ser, sizeof(ser));
// 设置服务器地址信息
ser.sin_family = AF_INET; // 使用IPv4地址族
ser.sin_port = htons(50000); // 服务器端口号(50000),转换为网络字节序
ser.sin_addr.s_addr = INADDR_ANY; // 连接到本地所有可用接口(实际应指定服务器IP)
// 连接到服务器
int ret = connect(conn, (SA)&ser, sizeof(ser));
if (-1 == ret) // 检查连接是否失败
{
perror("connect error\n"); // 输出错误信息
return 1; // 异常退出
}
// 循环发送10次数据
int i = 10;
while (i)
{
char buf[1024] = "hello,this is tcp test"; // 要发送的消息
// 发送数据到服务器
send(conn, buf, strlen(buf), 0);
// 清空缓冲区,准备接收数据
bzero(buf, sizeof(buf));
// 接收服务器返回的数据
recv(conn, buf, sizeof(buf), 0);
// 打印服务器返回的内容
printf("from ser:%s\n", buf);
// 休眠1秒
sleep(1);
i--; // 减少循环计数
}
// 关闭连接套接字
close(conn);
return 0;
}
理想运行结果:
- 服务器启动后监听50000端口
- 客户端连接成功后,每秒发送一条"hello,this is tcp test"消息
- 服务器接收消息后添加当前时间戳返回给客户端
- 客户端输出类似:
from ser:hello,this is tcp test Thu Jun 20 14:30:45 2023
进程模型实现 (tcp_fork)
#include <netinet/in.h> // 提供Internet地址族相关定义
#include <netinet/ip.h> // 提供IP协议相关定义
#include <signal.h> // 提供信号处理相关函数
#include <stdio.h> // 提供输入输出函数
#include <stdlib.h> // 提供标准库函数
#include <string.h> // 提供字符串操作函数
#include <sys/socket.h> // 提供套接字相关函数
#include <sys/types.h> // 提供基本系统数据类型
#include <sys/wait.h> // 提供进程等待相关函数
#include <time.h> // 提供时间相关函数
#include <unistd.h> // 提供Unix标准函数
// 定义结构体指针别名,简化代码书写
typedef struct sockaddr *(SA);
// 信号处理函数:用于处理子进程退出信号,回收僵尸进程
void myhandle(int num)
{
// 等待子进程结束,回收其资源,防止僵尸进程
wait(NULL);
}
int main(int argc, char **argv)
{
// 注册SIGCHLD信号的处理函数为myhandle
// 当子进程退出时会产生SIGCHLD信号,触发该函数回收资源
signal(SIGCHLD, myhandle);
// 创建监听套接字
// AF_INET: 使用IPv4地址族
// SOCK_STREAM: 使用面向连接的TCP协议
// 0: 自动选择合适的协议(此处为TCP)
int listfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == listfd) // 检查套接字创建是否失败
{
perror("scoket error\n"); // 输出错误信息
return 1; // 异常退出
}
// 定义服务器和客户端的地址结构体
struct sockaddr_in ser, cli;
// 初始化地址结构体为0
bzero(&ser, sizeof(ser));
bzero(&cli, sizeof(cli));
// 设置服务器地址信息
ser.sin_family = AF_INET; // 使用IPv4地址族
ser.sin_port = htons(50000); // 服务器端口号(50000),转换为网络字节序
ser.sin_addr.s_addr = INADDR_ANY; // 绑定到所有可用的网络接口(所有IP地址)
// 将监听套接字绑定到指定的地址和端口
int ret = bind(listfd, (SA)&ser, sizeof(ser));
if (-1 == ret) // 检查绑定是否失败
{
perror("bind"); // 输出错误信息
return 1; // 异常退出
}
// 开始监听套接字,等待客户端连接
// 第二个参数3表示等待连接队列的最大长度(三次握手未完成的连接)
listen(listfd, 3);
socklen_t len = sizeof(cli); // 用于存储客户端地址结构体的长度
time_t tm; // 用于存储时间的变量
// 循环接受客户端连接
while (1)
{
// 接受客户端连接,返回一个新的连接套接字用于与该客户端通信
// listfd:监听套接字
// (SA)&cli:用于存储客户端地址信息
// &len:用于存储客户端地址结构体的长度
int conn = accept(listfd, (SA)&cli, &len);
if (-1 == conn) // 检查接受连接是否失败
{
perror("accept"); // 输出错误信息
close(conn); // 关闭连接套接字
continue; // 继续接受下一个连接
}
// 创建子进程,用于处理当前客户端的请求
pid_t pid = fork();
if (pid > 0) // 父进程分支
{
// 父进程不需要连接套接字,关闭它
close(conn);
// wait(); // 注释掉的等待方式,改用信号处理
}
else if (0 == pid) // 子进程分支
{
// 子进程不需要监听套接字,关闭它
close(listfd);
// 循环处理客户端请求
while (1)
{
char buf[1024] = {0}; // 缓冲区,用于接收和发送数据
// 从客户端接收数据
int ret = recv(conn, buf, sizeof(buf), 0);
// 如果接收失败或客户端关闭连接(ret <= 0)
if (ret <= 0)
{
break; // 退出循环
}
// 获取当前系统时间
time(&tm);
// 将客户端发送的内容与当前时间拼接
sprintf(buf, "%s %s", buf, ctime(&tm));
// 将拼接后的内容发送回客户端
send(conn, buf, strlen(buf), 0);
}
// 处理完毕,退出子进程
exit(1);
}
else // fork失败分支
{
perror("fork"); // 输出错误信息
continue; // 继续接受下一个连接
}
}
// 关闭监听套接字(实际中由于上面是无限循环,这里的代码不会执行)
close(listfd);
return 0;
}
理想运行结果:
- 服务器启动后监听50000端口
- 每个客户端连接都会创建一个子进程处理
- 信号处理函数自动回收僵尸进程
- 客户端连接后,服务器会回复带时间戳的消息
- 多个客户端可以同时连接并获得服务
MQTT协议详解
MQTT概述
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,构建于TCP/IP协议之上。该协议由IBM在1999年发布,具有轻量、简单、开放和易于实现的特点,在物联网、小型设备、移动应用等领域应用广泛。
核心概念
发布/订阅模式:
- 客户端分为发布者和订阅者
- 发布者将消息发布到特定主题(Topic)
- 订阅者通过订阅感兴趣的主题来接收消息
- 这种模式实现了应用程序之间的解耦
- 多个订阅者可以同时接收来自同一主题的消息
服务质量(QoS):
- QoS 0:最多分发一次,消息可能丢失或重复,适用于对消息可靠性要求不高的场景,如环境传感器数据采集
- QoS 1:至少分发一次,确保消息到达,但可能出现重复,常用于设备控制指令传输
- QoS 2:仅分发一次,保证消息只到达一次,适用于对消息可靠性要求极高的场景,如金融交易数据传输
主题(Topic)与主题过滤器(Topic Filter):
- 主题是消息的分类标识,是UTF-8编码字符串
- 不能超过65535字节,层级数量无限制
- 区分大小写
- 主题过滤器用于订阅时筛选感兴趣的主题,可包含通配符
+
:单层通配符,只能用于单个主题层级匹配#
:多层通配符,用于匹配主题中任意层级,但必须位于主题过滤器的最后
会话(Session):
- 客户端与服务器建立连接后形成一个会话
- 会话存在于网络连接期间,也可能跨越多个连续的网络连接
- 会话用于存储客户端和服务器之间的交互状态,如客户端的订阅信息、未确认的消息等
遗嘱(Last Will and Testament):
- 客户端可设置遗嘱消息,当客户端异常断开连接时,服务器会发布该遗嘱消息
- 需在Connect时由客户端指定相关设置项
- 包括Will Flag(开启或关闭遗嘱功能)、Will QoS(遗嘱消息的服务质量等级)、Will Retain(遗嘱是否保留)、Will Topic(遗嘱话题)和Will Payload(遗嘱消息内容)
协议格式
MQTT数据包由三部分构成:
1. 固定头(Fixed Header)
- 存在于所有MQTT数据包中
- 包含数据包类型(如CONNECT、PUBLISH等)
- 包含标识位(如DUP、QoS、RETAIN)
- 剩余长度(表示可变头和消息体的总大小)
2. 可变头(Variable Header)
- 部分MQTT数据包包含可变头
- 内容因数据包类型而异
- 一些数据包(如PUBLISH (QoS > 0)、PUBACK等)的可变头中包含2字节的数据包标识字段
3. 消息体(Payload)
- 部分MQTT数据包包含消息体
- 不同类型的消息体内容不同
- CONNECT消息体:包含客户端的ClientID、订阅的Topic、Message以及用户名和密码
- SUBSCRIBE消息体:是一系列要订阅的主题以及QoS
- SUBACK消息体:是服务器对SUBSCRIBE申请的主题及QoS的确认和回复
- UNSUBSCRIBE消息体:是要取消订阅的主题
服务器与客户端工作流程
连接建立阶段
客户端发起连接:
- 客户端通过TCP/IP建立底层连接
- 发送
CONNECT
报文(首个报文) - 报文中含协议名、协议级别、连接标志、保持连接时长等
- 有效载荷包含客户端标识符(ClientId)、用户名/密码等
服务器确认连接:
- 服务器验证
CONNECT
报文合法性 - 发送
CONNACK
报文响应 - 报文中含"当前会话标志"(
Session Present
)和"连接返回码" - 连接返回码0表示连接成功,非0表示拒绝原因
- 服务器验证
消息交互阶段(发布-订阅核心流程)
客户端订阅主题:
- 客户端发送
SUBSCRIBE
报文 - 有效载荷含"主题过滤器+请求QoS"
- 服务器发送
SUBACK
报文确认,有效载荷含对应主题的"授权QoS"
- 客户端发送
客户端发布消息:
- 发布端客户端发送
PUBLISH
报文 - 可变报头含"主题名"和"报文标识符"(仅QoS>0时需含)
- 服务器接收后,根据主题名匹配所有订阅该主题的客户端
- 按订阅的授权QoS,向每个匹配客户端转发
PUBLISH
报文
- 发布端客户端发送
客户端取消订阅:
- 客户端发送
UNSUBSCRIBE
报文 - 有效载荷含待取消的主题过滤器
- 服务器发送
UNSUBACK
报文确认
- 客户端发送
连接维护与断开
心跳保活:
- 客户端需确保控制报文发送间隔不超过"保持连接时长"
- 无报文可发时需发送
PINGREQ
报文 - 服务器收到后必须回复
PINGRESP
报文 - 若1.5倍保持连接时长内无客户端报文,服务器需断开连接
正常断开:
- 客户端需发送
DISCONNECT
报文,之后关闭TCP连接 - 服务器收到
DISCONNECT
后,需丢弃该客户端的遗嘱消息(若存在)
- 客户端需发送
发布-订阅机制核心规则
主题与主题过滤器:
- 主题名是消息标识(如
/sensor/temp
,UTF-8编码,无通配符) - 主题过滤器是订阅时的匹配规则(支持
+
单层通配符,如/sensor/+
;#
多层通配符,如/sensor/#
) - 服务器按"逐层级匹配"规则,将
PUBLISH
报文转发给所有主题过滤器匹配的订阅客户端
- 主题名是消息标识(如
保留消息(Retain):
- 客户端发布
PUBLISH
时若设RETAIN=1
,服务器需存储该消息 - 新订阅匹配主题的客户端会立即收到该保留消息
- 若发布
RETAIN=1
且有效载荷为空的消息,服务器会删除对应主题的保留消息
- 客户端发布
遗嘱消息(Will):
- 客户端
CONNECT
时设Will Flag=1
,需指定"遗嘱主题"、“遗嘱消息”、“遗嘱QoS”、“遗嘱Retain” - 若客户端异常断开(如网络故障、超时),服务器会自动将遗嘱消息发布到遗嘱主题
- 若客户端正常发送
DISCONNECT
,服务器需丢弃遗嘱消息
- 客户端
QoS(服务质量)等级与流程
QoS 0(最多一次):
- 流程:发布端发送
PUBLISH
(QoS=0,无报文标识符,DUP=0
),接收端无需回复 - 消息可能丢失(如网络中断),不重发
- 适用场景:环境传感器数据(如实时温度,丢失一次可容忍)
- 流程:发布端发送
QoS 1(至少一次):
- 流程:
- 发布端发送
PUBLISH
(QoS=1,含报文标识符,DUP=0
),并存储消息 - 接收端接收后,发送
PUBACK
报文(含相同标识符),并将消息交给应用 - 发布端收到
PUBACK
后,删除存储的消息 - 若未收到
PUBACK
,发布端需重发PUBLISH
(DUP=1
),接收端可能收到重复消息
- 发布端发送
- 适用场景:设备控制指令(如开灯,需确保到达,重复可通过应用层去重)
- 流程:
QoS 2(仅一次):
- 流程(四次握手):
- 发布端发送
PUBLISH
(QoS=2,含标识符,DUP=0
),存储消息 - 接收端接收后,发送
PUBREC
报文(含相同标识符),存储标识符,不交给应用 - 发布端收到
PUBREC
后,删除PUBLISH
消息,发送PUBREL
报文(含相同标识符) - 接收端收到
PUBREL
后,将消息交给应用,发送PUBCOMP
报文(含相同标识符) - 发布端收到
PUBCOMP
后,删除PUBREL
相关状态 - 各环节未收到确认均需重发,确保消息仅到达一次
- 发布端发送
- 适用场景:计费数据、交易指令(不允许丢失或重复)
- 流程(四次握手):
应用场景
物联网场景:
- 智能家居(智能家电、传感器通信)
- 工业物联网(工厂设备数据交互)
- 农业监测(土壤湿度等传感器数据传输)
- 设备通过MQTT实现数据上报与远程控制
传感器数据传输:
- 环境监测(气象站、水质传感器)
- 农业监测等场景
- 传感器借MQTT将采集数据发送至服务器,供相关人员订阅获取实时数据
设备监控管理:
- 服务器集群(CPU、内存等状态监控)
- 移动设备(基站、智能电表状态上报)
- 运维/维护人员订阅主题掌握设备状态
消息推送:
- 轻量级即时通讯软件
- 移动应用(新闻、社交类)用MQTT推送消息
- 用户订阅主题接收实时信息
Wireshark抓包分析
环境准备
安装Wireshark:
- 从Wireshark官网(https://www.wireshark.org/download.html)下载并安装
- 安装过程中选择合适的网络适配器相关选项
配置MQTT相关参数:
- 打开Wireshark,进入"编辑"菜单选择"首选项"
- 在"协议"中找到"mqtt",设置对应参数(一般选择3.1.1版本和1883端口)
抓包操作
选择网络接口:
- 选择合适的网络接口(本地运行选回环地址,局域网环境选对应接口)
设置捕获过滤器(可选):
- 如只想捕获MQTT数据包,可设置"tcp.port == 1883"
开始捕获:
- 点击"开始捕获"按钮或选择"捕获">“开始捕获”
抓包后的操作
停止捕获:
- 点击"停止捕获"按钮
保存与导出数据包:
- 选择"文件">“保存”,通常保存为.pcapng格式
利用显示过滤器筛选分析:
- 过滤所有MQTT连接:输入"mqtt"
- 过滤特定QoS等级的消息:“mqtt.qos == 1”
- 过滤特定主题的消息:“mqtt.topic == ‘test/topic’”
MQTT实战应用
库的移植
MQTT库移植步骤:
OpenSSL编译安装:
tar -xvf openssl-1.0.0s.tar.gz cd openssl-1.0.0s ./config enable-shared -fPIC # 必须加入-fPIC选项 make sudo make install
Paho MQTT C库编译:
unzip paho.mqtt.c-master.zip cd paho.mqtt.c-master
- 修改Makefile:
- 122行:
CC ?= gcc
- 133行:添加
CFLAGS += -I /usr/local/ssl/include LDFLAGS += -L /usr/local/ssl/lib
- 192行:确保路径正确
CCFLAGS_SO += -Wno-deprecated-declarations -DOSX -I /usr/local/ssl/include LDFLAGS_CS += -Wl,-install_name,lib$(MQTTLIB_CS).so.${MAJOR_VERSION} -L /usr/local/ssl/lib
- 122行:
- 编译安装:
make sudo make install
- 修改Makefile:
云平台设备配置
OneNET云平台配置
注册账号并登录控制台
产品开发 -> 产品品类 -> 设备接入 -> MQTT协议
创建产品:
- 产品ID:Qon3io17BJ
- 设备名称:test1
- 设备密钥:c2Q2OVJKcW5KNDBQdmFLcm1OZEFmZU56cUJhSkhjd2o=
生成连接参数:
products/{产品id}/devices/{设备名字}
- 示例:
products/Qon3io17BJ/devices/test1
- 签名参数:
version=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D
添加物模型:
- 产品开发 -> 详情 -> 设置物模型
- 添加自定义功能点(如温度)
设备管理:
- 设备管理 -> 详情 -> 属性 -> 实时刷新
MQTT Demo程序详解
头文件 (head.h)
#ifndef HEAD_H
#define HEAD_H
#include <MQTTAsync.h>
#include <MQTTClient.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
// OneNET MQTT服务器地址
#define NEW_ADDRESS "tcp://183.230.40.96:1883"
// 设备名称
#define DEV_NAME "test1"
// 客户端ID(与设备名称相同)
#define CLIENTID DEV_NAME
// 产品ID
#define PRODUCT_ID "Qon3io17BJ"
// 连接密码(包含签名信息)
#define PASSWD "version=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D"
// 服务质量等级
#define QOS 0
// 等待消息完成的超时时间(毫秒)
#define TIMEOUT 10000L
#endif // HEAD_H
主程序 (main.c)
//https://eclipse.dev/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html#a9a0518d9ca924d12c1329dbe3de5f2b6
#include <stdio.h>
#include "head.h"
// 存储订阅和发布的主题
static char topic[2][200] = {0};
// MQTT客户端实例
static MQTTClient client;
// 消息ID计数器
static int id = 10000;
// 用于存储已确认送达的消息令牌
volatile static MQTTClient_deliveryToken deliveredtoken;
// 构建主题名称
void pack_topic(char * dev_name, char * pro_id)
{
// 订阅主题格式:$sys/{产品ID}/{设备名称}/thing/property/post/reply
sprintf(topic[0], "$sys/%s/%s/thing/property/post/reply", pro_id, dev_name);
// 发布主题格式:$sys/{产品ID}/{设备名称}/thing/property/post
sprintf(topic[1], "$sys/%s/%s/thing/property/post", pro_id, dev_name);
}
// 发送成功后的回调函数
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
// 接收到消息的回调函数
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
// 打印消息内容
payloadptr = (char*)message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
// 释放消息和主题内存
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
// 掉线后的回调函数
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
// MQTT客户端初始化
int mqtt_init()
{
// 构建主题
pack_topic(DEV_NAME, PRODUCT_ID);
// 创建MQTT客户端
int rc = MQTTClient_create(&client, NEW_ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
if(MQTTCLIENT_SUCCESS != rc)
{
printf("create mqtt client failure...\n");
exit(1);
}
// 初始化连接选项
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
conn_opts.keepAliveInterval = 20; // 保持连接间隔(秒)
conn_opts.cleansession = 1; // 清理会话
conn_opts.username = PRODUCT_ID; // 用户名(产品ID)
conn_opts.password = PASSWD; // 密码(包含签名信息)
// 设置回调函数
rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
if(MQTTCLIENT_SUCCESS != rc)
{
printf("Failed to set callbacks, return code %d\n", rc);
exit(EXIT_FAILURE);
}
// 连接到MQTT服务器
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
// 订阅主题(当前被注释,可根据需要启用)
#if 0
MQTTClient_subscribe(client, topic[0], QOS);
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
, topic[0], CLIENTID, QOS);
#endif
return rc;
}
// 发送MQTT消息
int mqtt_send(char * key, int value)
{
MQTTClient_deliveryToken deliveryToken;
MQTTClient_message test2_pubmsg = MQTTClient_message_initializer;
// 需要发送的正文
char message[1024] = {0};
// 设置消息属性
test2_pubmsg.qos = QOS;
test2_pubmsg.retained = 0;
test2_pubmsg.payload = message;
// 构建JSON格式的消息
sprintf(message,"{\"id\":\"%d\",\"version\":\"1.0\",\"params\":{\"%s\":{\"value\":%d}}}",id++, key, value);
test2_pubmsg.payloadlen = strlen(message);
printf("%s\n",message);
// 发布消息
int rc = MQTTClient_publishMessage(client, topic[1], &test2_pubmsg, &deliveryToken);
if(MQTTCLIENT_SUCCESS != rc)
{
printf("client to publish failure.. %lu\n", pthread_self());
exit(1);
}
// 等待消息确认
printf("Waiting for up to %d seconds for publication on topic %s for client with ClientID: %s\n"
,(int)(TIMEOUT/1000), topic[0], CLIENTID);
MQTTClient_waitForCompletion(client, deliveryToken, TIMEOUT);
sleep(1);
return rc;
}
// 释放MQTT资源
void mqtt_deinit()
{
// 断开连接
MQTTClient_disconnect(client, 10000);
// 销毁客户端
MQTTClient_destroy(&client);
}
// 主函数
int main(void)
{
// 初始化MQTT客户端
mqtt_init();
// 持续发送随机温度数据
while(1)
{
int value = rand() % 100 + 1;
mqtt_send("tmp", value);
}
// 释放资源(实际不会执行到此处)
mqtt_deinit();
return 0;
}
理想运行结果:
{"id":"10000","version":"1.0","params":{"tmp":{"value":42}}}
Waiting for up to 10 seconds for publication on topic $sys/Qon3io17BJ/test1/thing/property/post/reply for client with ClientID: test1
Message arrived
topic: $sys/Qon3io17BJ/test1/thing/property/post/reply
message: {"id":"10000","code":200,"msg":"success"}
{"id":"10001","version":"1.0","params":{"tmp":{"value":75}}}
Waiting for up to 10 seconds for publication on topic $sys/Qon3io17BJ/test1/thing/property/post/reply for client with ClientID: test1
Message arrived
topic: $sys/Qon3io17BJ/test1/thing/property/post/reply
message: {"id":"10001","code":200,"msg":"success"}
...
程序将不断生成随机温度值(1-100之间),以JSON格式发送到OneNET平台,平台会返回确认消息。在OneNET控制台的设备属性中,可以看到实时更新的温度数据。
wireshark抓包
MQTTtest1
Qon3io17BJyversion=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D