【从零开始一步步学习VSOA开发】发布订阅重连时同步

发布于:2024-08-09 ⋅ 阅读:(142) ⋅ 点赞:(0)

发布订阅重连时同步

概念

数据同步是指在数据发布与订阅场景中,当客户端因故障断开重连后,需要立即获取当前最新数据的需求。
如前面开发示例中的 axis_server 陀螺仪服务,若产生故障断开重连,则需要客户端上线后立即获取 /axis 的最新状态,以保证数据的一致性。通常情况下,客户端会在断开重连后,主动发起一次 RPC 请求以获取数据的最新状态,但如果需要获取的数据量较大,则会给代码编程带来更多的复杂性。此时,可以使用客户端机器人带有的自动数据同步接口进行处理。

程序源码

发布订阅重连时同步需要服务端和客户端都进行支持。服务端需要增加一个 RPC 服务,用于返回最后一次发布数据。客户端需要调用自动数据同步接口 vsoa_client_auto_consistent以便重连后能及时请求最新数据。

修改后的服务端源码如下,注意 RPC 服务的添加,同时为了测试效果明显,将发布周期由 1 秒改为了 10 秒。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_platform.h"
#include "vsoa_server.h"

#define MY_SERVER_ADDR                      "0.0.0.0"
#define MY_SERVER_PORT                      (4002)
#define MY_SERVER_NAME                      "{\"name\":\"axis_server\"}"
#define MY_SERVER_PASSWD                    "123456"

#define AXIS_SER_BUF_LEN                    100

static int roll = 1, pitch = 1, yaw = 1;

static void *publish_axis_thread (void *arg)
{
    vsoa_url_t url;
    vsoa_payload_t payload;
    vsoa_server_t *server = arg;
    char param[AXIS_SER_BUF_LEN + 1];

    url.url     = "/axis";
    url.url_len = strlen(url.url);

    payload.data = NULL;
    payload.data_len  = 0;
    payload.param = param;

    roll  = 1;
    pitch = 1;
    yaw   = 1;
    while (TRUE) {
        sleep(10);

        if (!vsoa_server_is_subscribed(server, &url)) {
            continue;
        }

        payload.param_len = snprintf(param, AXIS_SER_BUF_LEN,
                                    "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
                                    roll++, pitch++, yaw++);
        printf("publish:%s\n", param);
        vsoa_server_publish(server, &url, &payload);
    }

    return (NULL);
}

static void command_axis (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,
                          vsoa_header_t *vsoa_hdr, vsoa_url_t *url,
                          vsoa_payload_t *payload)
{
    vsoa_payload_t send;
    char param[100];
    uint32_t  seqno = vsoa_parser_get_seqno(vsoa_hdr);

    sprintf(param, "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",
            roll, pitch, yaw);

    send.data = NULL;
    send.data_len = 0;
    send.param = param;
    send.param_len = strlen(send.param);

    vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}

int main (int argc, char **argv)
{
    vsoa_server_t *server;

    /*
    * 创建服务端
    */
    server = vsoa_server_create(MY_SERVER_NAME);
    if (!server) {
        fprintf(stderr, "Can not create VSOA server!\n");
        return  (-1);
    }

    /*
    * 设置密码,设置为NULL,表示密码为空,客户端可以不输入密码
    */
    vsoa_server_passwd(server, MY_SERVER_PASSWD);

    vsoa_url_t url;
    url.url     = "/axis";
    url.url_len = strlen(url.url);
    vsoa_server_add_listener(server, &url, command_axis, NULL);

    /*
    * 启动微服务
    */
    struct sockaddr_in addr;
    bzero(&addr, sizeof(struct sockaddr_in));
    addr.sin_family      = AF_INET;
    addr.sin_port        = htons(MY_SERVER_PORT);
    addr.sin_addr.s_addr = inet_addr(MY_SERVER_ADDR);
    addr.sin_len         = sizeof(struct sockaddr_in);

    if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {
        vsoa_server_close(server);
        fprintf(stderr, "Can not start VSOA server!\n");
        return  (-1);
    }

   /*
    * Create publish thread
    */
    pthread_t pub_threadid;
    pthread_create(&pub_threadid, NULL, publish_axis_thread, server);

    /*
    * 进入监听事件循环
    */
    while (1) {
        int     cnt;
        int     max_fd;
        fd_set  fds;
        struct timespec timeout = {1, 0 };

        FD_ZERO(&fds);
        max_fd = vsoa_server_fds(server, &fds);

        cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);
        if (cnt > 0) {
            vsoa_server_input_fds(server, &fds);
        }
    }

    return (0);
}

修改后的客户端源码如下,注意第 31 行。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include "vsoa_client.h"
#include "vsoa_cliauto.h"

#define MY_SERVER_PASSWD                    "123456"

static void onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{
    printf("subscribe message, url:%.*s, quick:%s\n",
            (int)url->url_len, url->url,
            quick ? "ture":"false");

    printf("subscribe message, param:%.*s, data:%.*s\n",
               (int)payload->param_len, payload->param,
               (int)payload->data_len, (char *)payload->data);
}

int main (int argc, char **argv)
{
    vsoa_client_auto_t *cliauto;
    static char *sub_urls[] = { "/axis" };

    /*
     * 创建客户端机器人
     */
    cliauto = vsoa_client_auto_create(onmessage, NULL);

    vsoa_client_auto_consistent(cliauto, sub_urls, 1, 1000);
    /*
     * 启动客户端机器人127.0.0.1:4001  vsoa://axis_server
     */
    vsoa_client_auto_start(cliauto, "vsoa://axis_server", MY_SERVER_PASSWD, sub_urls, 1, 1000, 1000, 1000);

    while (true) {
        sleep(1);
    }
}

执行效果

先启动服务端再启动客户端,在客户端刚收到某次发布消息后立即“ctrl+c”退出客户端并重新执行,这个过程不会超过 1 秒,案例约 10 秒后才能收到下一次发布消息,但因为同步功能的原因会立即得到一次数据。
服务端执行效果:
image.png
客户端执行效果:
image.png