【嵌入式】MQTT

发布于:2025-03-04 ⋅ 阅读:(14) ⋅ 点赞:(0)

MQTT

安装

安装Paho MQTT C库:

sudo apt-get install libpaho-mqtt3-dev

头文件包含:

#include "MQTTClient.h"

编译选项:

gcc -o $@ $^ -lpaho-mqtt3c

简介

MQTT协议全称是(Message Queuing Telemetry Transport),即消息队列遥测传输协议。

是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,并且该协议构建于TCP/IP协议之上,TCP协议本身就具有高可靠性的特点,因此基于其上的MQTT协议同样也是具有高可靠、低开销的特点,之所以低开销,是以为MQTT协议传输的最小的报文也只有两个字节。

在物联网开发中,MQTT不是唯一的选择,与MQTT互相竞争的协议有XMPP和CoAP协议等。

关于发布和订阅的概念我们拿抖音平台来举个例子,我们每一个用户就都是一个客户端,而抖音就是MQTT协议中的服务器,当我们(用户一)关注某一个视频发布者(用户二)时,这样一个关注的行为就可以理解为订阅;同时用户二也可以关注你,那么这就是相互订阅。当用户二发布作品的时候,这个作品是发布到了抖音平台,也就是我们现在的服务器,这个过程就是消息的发布。

在这里需要注意的是:用户二(客户端)发布的消息并不是直接发布给了用户一,而是发布到了抖音平台(服务器),由于用户一订阅了用户二的消息(相当于点了关注),所以抖音平台(服务器)就会向用户一推送这个消息(注意发布和推送的区别)。这就是MQTT协议订阅&发布的一个简单比喻。

img

img

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容

MQTT客户端代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
 
#define ADDRESS "thirdparty.mqtt.yoplore.com"  //服务器的IP地址
#define CLIENTID "client01"   //发布者的姓名(唯一的,如果发布者和订阅者用同一个姓名,就会出现顶号的现象)
#define TOPIC1 "MQTT topic/topic1"              //订阅主题
#define TOPIC2 "MQTT topic/topic2"              //订阅主题
#define QOS 1                              //服务登记(0.最多一次,1.最少一次,2.确保一次)
#define TIMEOUT 10000L                     //响应时间
 
 
//定义一个传递令牌
volatile MQTTClient_deliveryToken deliveredtoken;

//令牌交付回调函数,当消息成功交付给 MQTT 服务器时调用
/**
    context:用户自定义的上下文指针,此处未使用
    dt:消息交付的令牌
*/
void delivered(void *context, MQTTClient_deliveryToken dt)
{
    printf("Message with token value %d delivery confirmed\n", dt);
    deliveredtoken = dt;
}

//接受订阅信息的回调函数,当接收到订阅主题的消息时调用
/**
    context:用户自定义的上下文指针,此处未使用
    topicName:接收到消息的主题名称
    topicLen:主题名称的长度
    message:接收到的 MQTT 消息结构体指针
*/
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    int i;
    char *payloadptr;
    printf("Message arrived\n");
    printf("topic: %s\n", topicName);
    printf("message: ");
    payloadptr = message->payload;
    for (i = 0; i < message->payloadlen; i++)
    {
        putchar(*payloadptr);
        payloadptr++;
    }
    putchar('\n');
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}
//断开链接的回调函数
void connlost(void *context, char *cause)
{
    printf("\nConnection lost\n");
    printf("cause: %s\n", cause);
}
 
int main(int argc, char *argv[])
{
    printf("\nCreating MQTTClient\n");
    // 消息缓冲区
    char buf[1024];
    // 1、定义一个MQTT客户端结构体指针
    MQTTClient client;
    // 2、创建一个MQTT客户端
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_create(&client, ADDRESS, CLIENTID,
                      MQTTCLIENT_PERSISTENCE_NONE, NULL);
                      
    conn_opts.keepAliveInterval = 20;// 连接保活时间
    conn_opts.cleansession = 1; // 设置是否清除会话,1为清除
 
    // 定义一个 MQTT 消息结构体,用于存储要发布的消息
    MQTTClient_message publish_msg=MQTTClient_message_initializer;
    // 令牌token
    MQTTClient_deliveryToken token;
 
    // 设置回调
    MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
    
    // 链接
    int rc = MQTTClient_connect(client, &conn_opts);
    if (rc != MQTTCLIENT_SUCCESS) 
    {
        printf("Failed to connect, return code %d\n", rc);
        return EXIT_FAILURE;
    }
    // 订阅多个主题
    rc = MQTTClient_subscribe(client, TOPIC1, QOS);
    if (rc != MQTTCLIENT_SUCCESS) {
        printf("Failed to subscribe to %s, return code %d\n", TOPIC1, rc);
        return EXIT_FAILURE;
    }
    rc = MQTTClient_subscribe(client, TOPIC2, QOS);
    if (rc != MQTTCLIENT_SUCCESS) {
        printf("Failed to subscribe to %s, return code %d\n", TOPIC2, rc);
        return EXIT_FAILURE;
    }
    //用户退出
    char ch;
    while (1) 
    {
        // 发送信息
        printf("请输入要发布的内容(输入 'q' 或 'Q' 退出):\n");
        if (fgets(buf, sizeof(buf), stdin) == NULL) {
            printf("读取输入失败\n");
            continue;
        }
        // 去除换行符(fgets会将换行符一并读取)
        size_t len = strlen(buf);
        if (len > 0 && buf[len - 1] == '\n') {
            buf[len - 1] = '\0';
        }
        // 检查是否退出
        if (buf[0] == 'q' || buf[0] == 'Q') {
            break;
        }
        publish_msg.payload = (void *)buf;
        publish_msg.payloadlen = strlen(buf);
        rc = MQTTClient_publishMessage(client, TOPIC2, &publish_msg, &token);//用于将消息发布到指定的主题
        if (rc != MQTTCLIENT_SUCCESS) {
            printf("Failed to publish message, return code %d\n", rc);
            continue;
        }
        rc = MQTTClient_waitForCompletion(client, token, 1000); //用于等待指定的消息交付完成
        if (rc != MQTTCLIENT_SUCCESS) {
            printf("Failed to wait for message completion, return code %d\n", rc);
            continue;
        }
        printf("buf中的内容: %s\n", buf);
    }
    
    MQTTClient_disconnect(client,10000);
    MQTTClient_destroy(&client);
    printf("\nExiting\n");
    return 0;
}