MQTT Client源码分析

发布于:2024-09-05 ⋅ 阅读:(64) ⋅ 点赞:(0)

MQTT Client源码分析

之前基于杰杰的mqttclient代码韦东山老师的教程把MQTTClient程序移植到STM32F103开发板,F103的开发板串口连接ESP8266模组实现终端连接到MQTT服务器的功能,仅仅是对着韦老师的教程移植和使用杰杰的mqttclient代码,简单的将mqttclient\platform\FreeRTOS\platform_net_socket.c文件中的接口绑定到ESP8266的TCP AT命令,使用ESP8266的Socket,对于杰杰的mqttclient代码并没有深入分析和理解。

1. mqttclient架构

如下图:

在这里插入图片描述

1.1 API

mqttclient的API接口:

int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);

int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_yield(mqtt_client_t* c, int timeout_ms);

1.2 mqtt_client_t结构体

typedef struct mqtt_client {
    char                        *mqtt_client_id; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
    char                        *mqtt_user_name; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
    char                        *mqtt_password; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
    char                        *mqtt_host; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
    char                        *mqtt_port; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
    char                        *mqtt_ca; //TLS才会用到,暂时不分析
    void                        *mqtt_reconnect_data; //MQTT需要重连服务器时用到
    uint8_t                     *mqtt_read_buf; //读数据缓冲区
    uint8_t                     *mqtt_write_buf; //写数据缓冲区
    uint16_t                    mqtt_keep_alive_interval; //MQTT保活超时时间
    uint16_t                    mqtt_packet_id; //报文标识符
    uint32_t                    mqtt_will_flag          : 1; //遗嘱标志
    uint32_t                    mqtt_clean_session      : 1; //清理会话标志
    uint32_t                    mqtt_ping_outstanding   : 2; //PINGREQ后是否正在等待PINGRESP标志
    uint32_t                    mqtt_version            : 4; //MQTT协议版本
    uint32_t                    mqtt_ack_handler_number : 24; //用于QOS1和QOS2中ACK记录
    uint32_t                    mqtt_cmd_timeout; //命令超时时间(主要是读写阻塞时间、等待响应的时间、重连等待时间)
    uint32_t                    mqtt_read_buf_size; //读数据缓冲区大小
    uint32_t                    mqtt_write_buf_size; //写数据缓冲区大小
    uint32_t                    mqtt_reconnect_try_duration; //客户端在尝试重新连接到MQTT服务器时所允许的最大尝试时间
    size_t                      mqtt_client_id_len; //clientID最大长度
    size_t                      mqtt_user_name_len; //userName最大长度
    size_t                      mqtt_password_len; //password最大长度
    mqtt_will_options_t         *mqtt_will_options; //遗嘱消息配置
    client_state_t              mqtt_client_state; //客户端状态(INVALID、INITIALIZED、CONNECTED、DISCONNECTED、CLEAN_SESSION)
    platform_mutex_t            mqtt_write_lock; //写数据锁
    platform_mutex_t            mqtt_global_lock; //全局锁,比如转换客户端状态时需要先锁
    mqtt_list_t                 mqtt_msg_handler_list; //所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息),mqtt协议必须实现的内容
    mqtt_list_t                 mqtt_ack_handler_list; //所有等待响应的报文都会被挂载到这个链表上,异步实现的核心
    network_t                   *mqtt_network; //网卡接口,保存网络相关信息(host、port、socket)
    platform_thread_t           *mqtt_thread; //内部线程mqtt_yield_thread,所有来自服务器的mqtt包都会在这里被处理
    platform_timer_t            mqtt_reconnect_timer; //掉线重连定时器
    platform_timer_t            mqtt_last_sent; //用于保活定时器
    platform_timer_t            mqtt_last_received; //保活定时器
    reconnect_handler_t         mqtt_reconnect_handler; //mqtt重连处理
    interceptor_handler_t       mqtt_interceptor_handler; //publish数据时调用,个人理解是要发送的数据与client绑定
} mqtt_client_t;

1.3 mqtt_yield_thread内部线程

mqtt_yield_thread线程中主要执行mqtt_yield(c, c->mqtt_cmd_timeout)函数

在这里插入图片描述

mqtt_yield中主要执行mqtt_packet_handle(c, &timer)处理MQTT接收到的消息

在这里插入图片描述

static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
{
    int rc = MQTT_SUCCESS_ERROR;
    int packet_type = 0;
    
    rc = mqtt_read_packet(c, &packet_type, timer);

	//printf("read packet rc = %d, packet_type = %d, g_remain_len = %d\r\n", rc, packet_type, g_remain_len);
    switch (packet_type) {
        case 0: /* timed out reading packet */
            break;

        case CONNACK: /* has been processed */
            goto exit;

        case PUBACK:
        case PUBCOMP:
            rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
            break;

        case SUBACK:
            rc = mqtt_suback_packet_handle(c, timer);
            break;
            
        case UNSUBACK:
            rc = mqtt_unsuback_packet_handle(c, timer);
            break;

        case PUBLISH:
            rc = mqtt_publish_packet_handle(c, timer);
            break;

        case PUBREC:
        case PUBREL:
            rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
            break;

        case PINGRESP:
            c->mqtt_ping_outstanding = 0;    /* keep alive ping success */
            break;

        default:
            goto exit;
    }

    rc = mqtt_keep_alive(c);	/* Keep the treatment alive */

exit:
    if (rc == MQTT_SUCCESS_ERROR)
        rc = packet_type;

    RETURN_ERROR(rc);
}

1.4 keepalive

当第一次发生超时时,会在mqtt_keep_alive中序列还一个心跳包发送给服务器,并将mqtt_ping_outstanding加1,当第二次超时时会设置client状态为CLIENT_STATE_DISCONNECTED尝试重连,若重连成功后需要重新订阅

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

1.5 ack链表

需要等待服务器应答消息时会加入ack链表(参考2.2中SUBACK),每次接收到服务器消息时,会对ack链表进行扫描,超时后会销毁链表节点,如果是PUBACK、PUBREC、PUBREL、PUBCOMP则需要重发

在这里插入图片描述

在这里插入图片描述

2. mqttclient流程

2.1 MQTT CONNECT

  • 用户初始化mqtt_client_t参数后,调用int mqtt_connect(mqtt_client_t* c);连接MQTT服务器

在这里插入图片描述

  • 调用到底层rc = network_connect(c->mqtt_network);连接MQTT服务器

在这里插入图片描述

  • 最终调用到平台层的n->socket = platform_net_socket_connect(n->host, n->port, PLATFORM_NET_PROTO_TCP);发送连接请求,其中实现了与ESP8266 TCP AT SOCKET绑定

在这里插入图片描述

  • 之后使用MQTTSerialize_connect(c->mqtt_write_buf, c->mqtt_write_buf_size, &connect_data)序列化mqtt的CONNECT报文,并使用mqtt_send_packet(c, len, &connect_timer)发送出去

在这里插入图片描述

  • mqtt_send_packet也是调用网络接口network_write(c->mqtt_network, &c->mqtt_write_buf[sent], length, platform_timer_remain(timer)最终调用平台接口platform_net_socket_write_timeout(n->socket, write_buf, len, timeout)

在这里插入图片描述

在这里插入图片描述

  • 发送完毕后在mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK)等待服务器回复的CONNACK报文

在这里插入图片描述

  • 连接服务器成功后创建一个MQTT内部线程platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)并启动,所有来自服务器的mqtt包都会在这里被处理

在这里插入图片描述

  • 当需要进行重连时,不需要重新创建MQTT内部线程,只需要改变MQTT Client的状态即可

在这里插入图片描述

在这里插入图片描述

2.2 MQTT SUBSCRIBE

  • 连接MQTT服务器后,用户可以直接调用mqtt_subscribe(client, "home", QOS0, smarthome_msg_handler);订阅主题

在这里插入图片描述

  • mqtt_subscribe会调用MQTTSerialize_subscribe(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, packet_id, 1, &topic, (int*)&qos)序列化订阅报文,并调用mqtt_send_packet(c, len, &timer)发送订阅消息

在这里插入图片描述

  • 然后使用mqtt_msg_handler_create(topic_filter, qos, handler)创建对应的消息处理节点,这个消息节点在收到服务器的SUBACK订阅应答报文后会挂在到消息处理列表msg_handler上

在这里插入图片描述

  • mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler)中记录等待服务器响应的SUBACK

在这里插入图片描述

  • 收到服务器响应的SUBACK回复后,在mqtt_suback_packet_handle(c, timer)中会取消ack_list里的SUBACK记录,并安装到msg_handler_list

在这里插入图片描述

在这里插入图片描述

MQTT UNSUBSCRIBE的流程与MQTT SUBSCRIBE的流程差不多。

2.3 MQTT PUBLISH

  • 连接服务器后,用户可以直接调用mqtt_publish(client, "home", &msg)向某主题发布消息

在这里插入图片描述

  • 与订阅流程类似,先使用MQTTSerialize_publish(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, msg->qos, msg->retained, msg->id, topic, (uint8_t*)msg->payload, msg->payloadlen)序列化发布报文,在使用mqtt_send_packet(c, len, &timer)并发布到服务器

在这里插入图片描述

  • 对于QOS1或QOS2需要将PUBACK或PUBREC加入到ack_list中,与SUBACK类似,并提前设置了DUP重发标志位

在这里插入图片描述

在这里插入图片描述

2.4 接收服务器PUBLISH消息

  • 服务器发送的PUBLISH消息会在client的内部线程mqtt_yield_thread中的mqtt_packet_handle中处理

在这里插入图片描述

  • 先对收到的消息进行反序列化,QOS1和QOS2类型需要回复ACK,让后处理收到的消息,注意QOS还需要先等待服务器的PUBREL ACK后再处理接收的消息

在这里插入图片描述

  • mqtt_get_msg_handler(c, topic_name)中获取当前主题的处理函数,并将接收的数据在数据处理函数msg_handler中处理,该处理函数在订阅主题时定义

在这里插入图片描述

在这里插入图片描述

以上是杰杰mqttclient代码分析,后期如果能够有更深的认识再继续补充,欢迎各位大佬补充和指正!