MQTT
安装
安装Paho MQTT C库:
sudo apt-get install libpaho-mqtt3-dev
头文件包含:
#include "MQTTClient.h"
编译选项:
gcc -o $@ $^ -lpaho-mqtt3c
简介
MQTT协议全称是(Message Queuing Telemetry Transport),即消息队列遥测传输协议。
是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,并且该协议构建于TCP/IP协议之上,TCP协议本身就具有高可靠性的特点,因此基于其上的MQTT协议同样也是具有高可靠、低开销的特点,之所以低开销,是以为MQTT协议传输的最小的报文也只有两个字节。
在物联网开发中,MQTT不是唯一的选择,与MQTT互相竞争的协议有XMPP和CoAP协议等。
关于发布和订阅的概念我们拿抖音平台来举个例子,我们每一个用户就都是一个客户端,而抖音就是MQTT协议中的服务器,当我们(用户一)关注某一个视频发布者(用户二)时,这样一个关注的行为就可以理解为订阅;同时用户二也可以关注你,那么这就是相互订阅。当用户二发布作品的时候,这个作品是发布到了抖音平台,也就是我们现在的服务器,这个过程就是消息的发布。
在这里需要注意的是:用户二(客户端)发布的消息并不是直接发布给了用户一,而是发布到了抖音平台(服务器),由于用户一订阅了用户二的消息(相当于点了关注),所以抖音平台(服务器)就会向用户一推送这个消息(注意发布和推送的区别)。这就是MQTT协议订阅&发布的一个简单比喻。
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分
- Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)
- payload,可以理解为消息的内容,是指订阅者具体要使用的内容
MQTT客户端代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "thirdparty.mqtt.yoplore.com" //服务器的IP地址
#define CLIENTID "client01" //发布者的姓名(唯一的,如果发布者和订阅者用同一个姓名,就会出现顶号的现象)
#define TOPIC1 "MQTT topic/topic1" //订阅主题
#define TOPIC2 "MQTT topic/topic2" //订阅主题
#define QOS 1 //服务登记(0.最多一次,1.最少一次,2.确保一次)
#define TIMEOUT 10000L //响应时间
//定义一个传递令牌
volatile MQTTClient_deliveryToken deliveredtoken;
//令牌交付回调函数,当消息成功交付给 MQTT 服务器时调用
/**
context:用户自定义的上下文指针,此处未使用
dt:消息交付的令牌
*/
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
//接受订阅信息的回调函数,当接收到订阅主题的消息时调用
/**
context:用户自定义的上下文指针,此处未使用
topicName:接收到消息的主题名称
topicLen:主题名称的长度
message:接收到的 MQTT 消息结构体指针
*/
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
char *payloadptr;
printf("Message arrived\n");
printf("topic: %s\n", topicName);
printf("message: ");
payloadptr = message->payload;
for (i = 0; i < message->payloadlen; i++)
{
putchar(*payloadptr);
payloadptr++;
}
putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
//断开链接的回调函数
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf("cause: %s\n", cause);
}
int main(int argc, char *argv[])
{
printf("\nCreating MQTTClient\n");
// 消息缓冲区
char buf[1024];
// 1、定义一个MQTT客户端结构体指针
MQTTClient client;
// 2、创建一个MQTT客户端
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;// 连接保活时间
conn_opts.cleansession = 1; // 设置是否清除会话,1为清除
// 定义一个 MQTT 消息结构体,用于存储要发布的消息
MQTTClient_message publish_msg=MQTTClient_message_initializer;
// 令牌token
MQTTClient_deliveryToken token;
// 设置回调
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
// 链接
int rc = MQTTClient_connect(client, &conn_opts);
if (rc != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
return EXIT_FAILURE;
}
// 订阅多个主题
rc = MQTTClient_subscribe(client, TOPIC1, QOS);
if (rc != MQTTCLIENT_SUCCESS) {
printf("Failed to subscribe to %s, return code %d\n", TOPIC1, rc);
return EXIT_FAILURE;
}
rc = MQTTClient_subscribe(client, TOPIC2, QOS);
if (rc != MQTTCLIENT_SUCCESS) {
printf("Failed to subscribe to %s, return code %d\n", TOPIC2, rc);
return EXIT_FAILURE;
}
//用户退出
char ch;
while (1)
{
// 发送信息
printf("请输入要发布的内容(输入 'q' 或 'Q' 退出):\n");
if (fgets(buf, sizeof(buf), stdin) == NULL) {
printf("读取输入失败\n");
continue;
}
// 去除换行符(fgets会将换行符一并读取)
size_t len = strlen(buf);
if (len > 0 && buf[len - 1] == '\n') {
buf[len - 1] = '\0';
}
// 检查是否退出
if (buf[0] == 'q' || buf[0] == 'Q') {
break;
}
publish_msg.payload = (void *)buf;
publish_msg.payloadlen = strlen(buf);
rc = MQTTClient_publishMessage(client, TOPIC2, &publish_msg, &token);//用于将消息发布到指定的主题
if (rc != MQTTCLIENT_SUCCESS) {
printf("Failed to publish message, return code %d\n", rc);
continue;
}
rc = MQTTClient_waitForCompletion(client, token, 1000); //用于等待指定的消息交付完成
if (rc != MQTTCLIENT_SUCCESS) {
printf("Failed to wait for message completion, return code %d\n", rc);
continue;
}
printf("buf中的内容: %s\n", buf);
}
MQTTClient_disconnect(client,10000);
MQTTClient_destroy(&client);
printf("\nExiting\n");
return 0;
}