【从零实现Json-Rpc框架】- 项目实现 - 客户端注册主题整合 及 rpc流程示意

发布于:2025-04-05 ⋅ 阅读:(18) ⋅ 点赞:(0)

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述

文章目录


📢前言

到目前为止,我们已经将整个 rpc项目 的基本完成了,就只剩下客户端的部分功能了

这篇文章将带大家实现一些客户端的最后三个模块

  1. 客户端注册模块
  2. 客户端主题模块
  3. 客户端整合

本系列博客的代码都在上述库中的下图(demo代表笔者模仿的案例)
在这里插入图片描述


🏳️‍🌈一、rpc_registry 注册模块

1.1 rpc_registry 框架逻辑分析

该代码实现了一个 RPC 客户端核心模块,包含 ​服务注册服务发现与动态维护负载均衡 三大核心功能,通过以下三个类协作完成

graph TD
    A[Provider] -->|注册服务| B(注册中心)
    C[Discoverer] -->|发现服务| B
    C -->|维护本地缓存| D[MethodHost]

1.1.1 Provider类

  • 这个类的 核心功能向服务端注册服务方法,并处理注册响应
  • 构造函数接受一个 Requestor 的智能指针,用于发送请求。
  • registryMethod 方法负责 发送服务注册请求构造ServiceRequest消息并通过Requestor发送,然后处理响应
  • 如果注册失败,会记录错误日志。
// 核心功能:向服务端注册服务方法,并处理注册响应。
class Provider {
public:
    using ptr = std::shared_ptr<Provider>;
    Provider(const Requestor::ptr& requestor);

    // 注册服务方法
    bool registryMethod(const BaseConnection::ptr& conn,
                        const std::string& method, const Address& host);

private:
    Requestor::ptr _requestor;
};

1.1.2 MethodHost类

  • 管理某个服务方法的主机地址列表支持动态增删和轮询选择
  • 成员变量包括互斥锁轮询索引主机地址列表
  • 提供添加主机、移除主机、选择主机的方法,这些操作都是线程安全的。
  • chooseHost 方法使用简单的轮询算法实现负载均衡。
// 核心功能:管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
// 设计目标:提供线程安全的主机列表管理,支持动态更新。
class MethodHost {
public:
    using ptr = std::shared_ptr<MethodHost>;

    MethodHost();
    MethodHost(const std::vector<Address>& hosts);

    // 添加主机地址
    void appendHost(const Address& host);

    // 移除主机地址
    void removerHost(const Address& host);

    // 轮询选择主机(简单负载均衡)
    Address chooseHost();

    bool empty();

private:
    std::mutex _mutex;
    size_t _idx;                 // 轮询选择下标
    std::vector<Address> _hosts; // 主机地址列表
};

1.1.3 Discoverer类

  • 发现服务提供者,处理上下线通知,维护本地服务缓存。
  • 包含一个离线回调函数,当服务下线时触发。
  • serviceDiscovery 方法处理服务发现请求,先检查本地缓存,若未命中则向服务端请求,并更新缓存。
  • onServiceRequest 处理服务端推送的上下线通知,更新本地的主机列表。
  • 使用 MethodHost 类来管理每个方法的主机列表,确保线程安全。
// 核心功能:发现服务提供者,处理上下线通知,维护本地服务缓存。
class Discoverer {
public:
    using ptr = std::shared_ptr<Discoverer>;
    using OfflineCallback = std::function<void(const Address&)>;

    Discoverer(const Requestor::ptr& requestor, const OfflineCallback& cb);

    // 主动发起服务发现请求。
    // 1. 本地缓存命中:直接返回已缓存的主机地址。
    // 2. ​缓存未命中:发送 SERVICE_DISCOVERY
    // 请求,获取服务地址并更新缓存。
    bool serviceDiscovery(const BaseConnection::ptr& conn,
                          const std::string& method, Address& host);

    // 提供给 Dispatcher 模块进行服务上线下线请求处理的回调函数
    // 处理服务端推送的上下线通知。
    // 设计目标:动态更新本地服务列表,确保客户端始终获取最新服务地址
    void onServiceRequest(const BaseConnection::ptr& conn,
                          const ServiceRequest::ptr& msg);

private:
    OfflineCallback _offline_callback;
    std::mutex _mutex;
    std::unordered_map<std::string, MethodHost::ptr> _method_hosts;
    Requestor::ptr _requestor;
};

1.2 Provider类

功能:向注册中心注册服务方法。
​关键成员

  • _requestor:发送请求和接收响应的工具类。

核心方法

  • registryMethod(conn, method, host)
    发送 SERVICE_REGISTRY 请求,将服务方法 method 和主机 host 注册到注册中心。
class Provider {
public:
    using ptr = std::shared_ptr<Provider>;
    Provider(const Requestor::ptr& requestor) : _requestor(requestor) {}

    // 注册服务方法
    bool registryMethod(const BaseConnection::ptr& conn,
                        const std::string& method, const Address& host) {
        // 1. 创建 SERVICE_REGISTRY 类型的请求消息。
        // 2. 通过 Requestor 发送请求并等待响应。
        // 3. 验证响应类型和状态码,返回注册结果。
        auto msg_req = MessageFactory::create<ServiceRequest>();
        msg_req->setId(UUID::uuid());
        msg_req->setMType(MType::REQ_SERVICE);
        msg_req->setMethod(method);
        msg_req->setHost(host);
        msg_req->setServiceOptype(ServiceOptype::SERVICE_REGISTRY);

        BaseMessage::ptr msg_rsp;
        bool ret = _requestor->send(conn, msg_req, msg_rsp);
        if (ret == false) {
            ELOG("%s 服务注册失败!", method.c_str());
            return false;
        }

        auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
        if (service_rsp.get() == nullptr) {
            ELOG("响应类型向下转换失败!");
            return false;
        }

        if (service_rsp->rcode() != RCode::RCODE_OK) {
            ELOG("服务注册失败,原因: %s",
                 errReason(service_rsp->rcode()).c_str());
            return false;
        }

        return true;
    }

private:
    Requestor::ptr _requestor;
};

1.3 MethodHost 类

功能:管理某个服务方法的主机列表,支持动态增删和轮询选择。
​关键成员

  • _hosts:服务提供者地址列表。
  • _idx:轮询索引,实现简单负载均衡。

核心方法

  • appendHost(host):线程安全地添加主机地址。
  • removeHost(host):线程安全地移除主机地址。
  • chooseHost():轮询选择下一个主机地址。
// 核心功能:管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
// 设计目标:提供线程安全的主机列表管理,支持动态更新。
class MethodHost {
public:
    using ptr = std::shared_ptr<MethodHost>;

    MethodHost() : _idx(0) {}
    MethodHost(const std::vector<Address>& hosts)
        : _hosts(hosts.begin(), hosts.end()), _idx(0) {}

    // 添加主机地址
    void appendHost(const Address& host) {
        // 中途收到了服务上线请求后被调用
        std::unique_lock<std::mutex> lock(_mutex);
        _hosts.emplace_back(host);
    }

    // 移除主机地址
    void removerHost(const Address& host) {
        // 中途收到了服务下线请求后被调用
        std::unique_lock<std::mutex> lock(_mutex);
        for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {
            if (*it == host) {
                _hosts.erase(it);
                break;
            }
        }
    }

    // 轮询选择主机(简单负载均衡)
    Address chooseHost() {
        std::unique_lock<std::mutex> lock(_mutex);
        size_t pos = _idx++ % _hosts.size();
        return _hosts[pos];
    }

    bool empty() { return _hosts.empty(); }

private:
    std::mutex _mutex;
    size_t _idx;                 // 轮询选择下标
    std::vector<Address> _hosts; // 主机地址列表
};

1.4 Discoverer 类

功能:动态发现服务地址,处理上下线通知,维护本地缓存。
​关键成员

  • _method_hosts:服务名 → 主机列表的映射(使用 MethodHost 管理)。
  • _offline_callback:服务下线时的回调函数。

​核心方法

  • serviceDiscovery(conn, method, host)
    若本地缓存有效,直接返回地址;否则向注册中心发送 SERVICE_DISCOVERY 请求。
  • onServiceRequest(conn, msg)
    处理服务端推送的上下线通知,更新本地缓存。
// 核心功能:发现服务提供者,处理上下线通知,维护本地服务缓存。
class Discoverer {
public:
    using ptr = std::shared_ptr<Discoverer>;
    using OfflineCallback = std::function<void(const Address&)>;
    Discoverer(const Requestor::ptr& requestor, const OfflineCallback& cb)
        : _requestor(requestor), _offline_callback(cb) {}

    // 主动发起服务发现请求。
    // 1. 本地缓存命中:直接返回已缓存的主机地址。
    // 2. ​缓存未命中:发送 SERVICE_DISCOVERY
    // 请求,获取服务地址并更新缓存。
    bool serviceDiscovery(const BaseConnection::ptr& conn,
                          const std::string& method, Address& host) {
        {
            // 当前所报关的提供者信息存在,则直接返回地址
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _method_hosts.find(method);
            if (it != _method_hosts.end()) {
                if (it->second->empty() == false) {
                    host = it->second->chooseHost();
                    return true;
                }
            }
        }

        // 缓存未命中,发起服务发现请求
        // 当前服务的提供者为空
        auto msg_req = MessageFactory::create<ServiceRequest>();
        msg_req->setId(UUID::uuid());
        msg_req->setMType(MType::REQ_SERVICE);
        msg_req->setMethod(method);
        msg_req->setServiceOptype(ServiceOptype::SERVICE_DISCOVERY);

        BaseMessage::ptr msg_rsp;
        bool ret = _requestor->send(conn, msg_req, msg_rsp);

        if (ret == false) {
            ELOG("服务发现失败!");
            return false;
        }

        auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
        if (service_rsp.get() == nullptr) {
            ELOG("服务发现时响应类型向下转换失败!");
            return false;
        }
        if (service_rsp->rcode() != RCode::RCODE_OK) {
            ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());
            return false;
        }

        // 走到这里,代表当前服务没有对应主机提过服务的
        std::unique_lock<std::mutex> lock(_mutex);
        auto method_host = std::make_shared<MethodHost>(service_rsp->Hosts());
        if (method_host->empty()) {
            ELOG("%s 服务发现时,服务提供者列表为空!", method.c_str());
            return false;
        }

        DLOG("服务发现成功");
        host = method_host->chooseHost();
        _method_hosts[method] = method_host;
        return true;
    }

    // 提供给 Dispatcher 模块进行服务上线下线请求处理的回调函数
    // 处理服务端推送的上下线通知。
    // 设计目标:动态更新本地服务列表,确保客户端始终获取最新服务地址
    void onServiceRequest(const BaseConnection::ptr& conn,
                          const ServiceRequest::ptr& msg) {
        // 1. 判断是上线还是下线请求,如果都不是那就不用处理了
        ServiceOptype optype = msg->Serviceoptype();
        std::string method = msg->method();
        std::unique_lock<std::mutex> lock(_mutex);

        // 2. 上线请求:找到MethodHost,向其中新增一个主机地址
        if (optype == ServiceOptype::SERVICE_ONLINE) {
            auto it = _method_hosts.find(method);
            if (it == _method_hosts.end()) {
                MethodHost::ptr method_host = std::shared_ptr<MethodHost>();
                method_host->appendHost(msg->host());
                _method_hosts[method] = method_host;
            } else {
                it->second->appendHost(msg->host());
            }
        }

        // 3. 下线请求:找到MethodHost,从其中删除一个主机地址
        else if (optype == ServiceOptype::SERVICE_OFFLINE) {
            auto it = _method_hosts.find(method);
            if (it == _method_hosts.end())
                return;
            it->second->removerHost(msg->host());
            _offline_callback(msg->host());
        }
    }

private:
    OfflineCallback _offline_callback;
    std::mutex _mutex;
    std::unordered_map<std::string, MethodHost::ptr> _method_hosts;
    Requestor::ptr _requestor;
};

🏳️‍🌈二、rpc_topic 主题模块

先说说 客户端服务端 的主题模块的区别

  • ​客户端:负责发起主题操作请求,处理服务端推送的消息,​不维护全局状态。
  • ​服务端:维护全局主题和订阅者关系,处理客户端请求并广播消息,​不涉及业务逻辑。
  • 两者协同工作,客户端通过请求-响应模型与服务端交互,服务端确保消息的正确路由和广播。
    在这里插入图片描述

2.1 逻辑框架

#pragma once
#include "requestor.hpp"
#include <unordered_set>

namespace rpc
{
    namespace client
    {
        class TopicManager
        {
        public:
            using SubCallback = std::function<void(const std::string &key, const std::string &msg)>;
            using ptr = std::shared_ptr<TopicManager>;
            TopicManager(const Requestor::ptr &requestor) : _requestor(requestor) {}

            // 创建主题
            bool createTopic(const BaseConnection::ptr &_conn, const std::string &key);

            // 取消主题
            bool removeTopic(const BaseConnection::ptr &_conn, const std::string &key);

            // 订阅主题
            bool subscribeTopic(const BaseConnection::ptr &conn, const std::string &key, const SubCallback &cb);

            // 取消订阅主题的订阅者
            bool cancelTopic(const BaseConnection::ptr &conn, const std::string &key);

            // 发布消息
            bool publishTopic(const BaseConnection::ptr &conn, const std::string &key, const std::string &msg);

            // 处理服务器返回的主题消息
            void onPublishTopic(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);

        private:
            void addSubscriber(const std::string &key, const SubCallback &cb);

            void delSubscriber(const std::string &key);

            const SubCallback getSubcallback(const std::string &key);

            bool commonRequest(const BaseConnection::ptr &conn, const std::string &key, TopicOptype type, const std::string &msg = " ");

        private:
            std::mutex _mutex;
            std::unordered_map<std::string, SubCallback> _topic_callbacks; // 主题名 -> 回调函数
            Requestor::ptr _requestor;
        };
    }
}

2.2 核心功能

client::TopicManager 是 RPC 客户端用于管理 ​主题(Topic)生命周期和消息订阅/发布 的核心类,主要功能包括:

  • 主题管理:创建、删除主题。
  • 订阅管理:订阅/取消订阅主题,绑定消息回调。
  • 消息发布:向指定主题发送消息。
  • 消息处理:接收服务端推送的主题消息,触发本地回调。
    在这里插入图片描述

2.3 接口方法详解

2.3.1 主题管理

在这里插入图片描述

// 创建主题
bool createTopic(const BaseConnection::ptr& _conn, const std::string& key) {
    return commonRequest(_conn, key, TopicOptype::TOPIC_CREATE);
}

// 删除主题
bool removeTopic(const BaseConnection::ptr& _conn, const std::string& key) {
    return commonRequest(_conn, key, TopicOptype::TOPIC_REMOVE);
}
// 创建主题 "news"
topic_mgr->createTopic(conn, "news");
// 删除主题 "news"
topic_mgr->removeTopic(conn, "news");

2.3.2 订阅管理

在这里插入图片描述

// 订阅主题
bool subscribeTopic(const BaseConnection::ptr& conn, const std::string& key,
                    const SubCallback& cb) {
    addSubscriber(key, cb);
    return commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);
}

// 取消订阅主题的订阅者
bool cancelTopic(const BaseConnection::ptr& conn, const std::string& key) {
    delSubscriber(key);
    return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);
}
// 订阅主题 "news",定义回调函数
topic_mgr->subscribeTopic(conn, "news", [](const std::string& key, const std::string& msg) {
    std::cout << "收到消息: " << msg << std::endl;
});
// 取消订阅 "news"
topic_mgr->cancelTopic(conn, "news");

2.3.3 消息发布

在这里插入图片描述

// 发布消息
bool publishTopic(const BaseConnection::ptr& conn, const std::string& key,
                  const std::string& msg) {
    return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);
}
// 发布消息到主题 "news"
topic_mgr->publishTopic(conn, "news", "Breaking: RPC框架发布!");

2.3.4 消息处理

在这里插入图片描述
内部流程

  1. ​验证操作类型:确保消息类型为 TOPIC_PUBLISH
  2. 提取消息内容:获取主题名 topic_key 和消息内容 topic_msg
  3. 查找回调函数:通过 getSubcallback 查找本地注册的回调。
  4. 触发回调:执行回调函数处理消息。
// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn,
                    const TopicRequest::ptr& msg) {
    // 1. 从消息中取出操作类型进行判断,是否时消息请求
    auto type = msg->optype(); // 获取主题操作类型
    if (type != TopicOptype::TOPIC_PUBLISH) {
        ELOG("错误的主题操作类型:%d", static_cast<int>(type));
        return;
    }
    // 2. 取出消息主题名称,以及消息内容
    std::string topic_key = msg->topicKey();
    std::string topic_msg = msg->topicMsg();

    // 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错
    auto callback = getSubcallback(topic_key);
    if (!callback) {
        ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())
        return;
    }
    return callback(topic_key, topic_msg);
}

2.4 核心逻辑:commonRequest 方法

所有主题操作请求通过 commonRequest 统一处理,流程如下

2.4.1 ​构造请求:

auto msg_rsp = MessageFactory::create<TopicRequest>();
msg_rsp->setTopicKey(key);
msg_rsp->setOptype(type); // 设置操作类型(如 TOPIC_CREATE)

2.4.2 发送请求

bool ret = _requestor->send(conn, msg_rsp, msg_rep);

2.4.3 处理响应:

  • 检查响应类型是否为 TopicResponse
  • 验证状态码 rcode() 是否成功。

2.4.4 实现代码

// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn,
                    const TopicRequest::ptr& msg) {
    // 1. 从消息中取出操作类型进行判断,是否时消息请求
    auto type = msg->optype(); // 获取主题操作类型
    if (type != TopicOptype::TOPIC_PUBLISH) {
        ELOG("错误的主题操作类型:%d", static_cast<int>(type));
        return;
    }
    // 2. 取出消息主题名称,以及消息内容
    std::string topic_key = msg->topicKey();
    std::string topic_msg = msg->topicMsg();
    bool commonRequest(const BaseConnection::ptr& conn, const std::string& key,
                       TopicOptype type, const std::string& msg = " ") {
        // 1. 构造请求对象,并填充数据
        auto msg_rsp = MessageFactory::create<TopicRequest>();
        msg_rsp->setId(UUID::uuid());
        msg_rsp->setMType(MType::REQ_TOPIC);
        msg_rsp->setOptype(type);
        msg_rsp->setTopicKey(key);

        if (type == TopicOptype::TOPIC_PUBLISH)
            msg_rsp->setTopicMsg(msg);

        // 2. 向 服务端 发送 同步 请求,等待响应
        BaseMessage::ptr msg_rep;
        bool ret = _requestor->send(conn, msg_rsp, msg_rep);
        if (ret == false) {
            ELOG("主题操作请求失败!");
            return false;
        }

        // 3. 获取服务端响应信息,判断请求处理是否成功
        auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rep);
        if (!topic_rsp_msg) {
            ELOG("主题操作响应,但向下转换时失败");
            return false;
        }
        if (topic_rsp_msg->rcode() != RCode::RCODE_OK) {
            ELOG("主题操作请求出错:%s",
                 errReason(topic_rsp_msg->rcode()).c_str());
            return false;
        }
        ELOG("主题操作请求成功");
        return true;
    }
    // 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错
    auto callback = getSubcallback(topic_key);
    if (!callback) {
        ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())
        return;
    }
    return callback(topic_key, topic_msg);
}

2.5 整体代码

#pragma once
#include "requestor.hpp"
#include <unordered_set>


namespace rpc{
    namespace client{
        class TopicManager{
            public:
                using SubCallback = std::function<void(const std::string& key, const std::string& msg)>;
                using ptr = std::shared_ptr<TopicManager>;
                TopicManager(const Requestor::ptr& requestor) : _requestor(requestor) {}
                
                // 创建主题
                bool createTopic(const BaseConnection::ptr& _conn, const std::string& key){
                    return commonRequest(_conn, key, TopicOptype::TOPIC_CREATE);
                }

                // 删除主题
                bool removeTopic(const BaseConnection::ptr& _conn, const std::string& key){
                    return commonRequest(_conn, key, TopicOptype::TOPIC_REMOVE);
                }

                // 订阅主题
                bool subscribeTopic(const BaseConnection::ptr& conn, const std::string& key, const SubCallback& cb){
                    addSubscriber(key, cb);
                    return commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);
                }
 
                // 取消订阅主题的订阅者
                bool cancelTopic(const BaseConnection::ptr& conn, const std::string& key){
                    delSubscriber(key);
                    return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);
                }

                // 发布消息
                bool publishTopic(const BaseConnection::ptr& conn, const std::string& key, const std::string& msg){
                    return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);
                }

                // 处理服务器返回的主题消息
                void onPublishTopic(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){
                    // 1. 从消息中取出操作类型进行判断,是否时消息请求
                    auto type = msg->optype();  // 获取主题操作类型
                    if(type != TopicOptype::TOPIC_PUBLISH){
                        ELOG("错误的主题操作类型:%d", static_cast<int>(type));
                        return;
                    }
                    // 2. 取出消息主题名称,以及消息内容
                    std::string topic_key = msg->topicKey();
                    std::string topic_msg = msg->topicMsg();

                    // 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错
                    auto callback = getSubcallback(topic_key);
                    if(!callback){
                        ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())
                        return;
                    }
                    return callback(topic_key, topic_msg);
                }
            
            private:
                void addSubscriber(const std::string& key, const SubCallback& cb){
                    std::unique_lock<std::mutex> lock(_mutex);
                    _topic_callbacks.insert(std::make_pair(key, cb));
                }

                void delSubscriber(const std::string& key){
                    std::unique_lock<std::mutex> lock(_mutex);
                    _topic_callbacks.erase(key);
                }

                const SubCallback getSubcallback(const std::string& key){
                    std::unique_lock<std::mutex> _lock(_mutex);
                    auto it = _topic_callbacks.find(key);
                    if(it == _topic_callbacks.end()){
                        ELOG("主题 %s 没有订阅者", key.c_str());
                        return SubCallback();
                    }
                    return it->second;
                }

                bool commonRequest(const BaseConnection::ptr& conn, const std::string& key, TopicOptype type, const std::string& msg = " "){
                    // 1. 构造请求对象,并填充数据
                    auto msg_rsp = MessageFactory::create<TopicRequest>();
                    msg_rsp->setId(UUID::uuid());
                    msg_rsp->setMType(MType::REQ_TOPIC);
                    msg_rsp->setOptype(type);
                    msg_rsp->setTopicKey(key);
 
                    if(type == TopicOptype::TOPIC_PUBLISH)
                        msg_rsp->setTopicMsg(msg);
                    
                    // 2. 向 服务端 发送 同步 请求,等待响应
                    BaseMessage::ptr msg_rep;
                    bool ret = _requestor->send(conn, msg_rsp, msg_rep);
                    if(ret == false){
                        ELOG("主题操作请求失败!");
                        return false;
                    }

                    // 3. 获取服务端响应信息,判断请求处理是否成功
                    auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rep);
                    if(!topic_rsp_msg){
                        ELOG("主题操作响应,但向下转换时失败");
                        return false;
                    }
                    if(topic_rsp_msg->rcode() != RCode::RCODE_OK){
                        ELOG("主题操作请求出错:%s", errReason(topic_rsp_msg->rcode()).c_str());
                        return false;
                    }
                    ELOG("主题操作请求成功");
                    return true;
                }
            
            private:
                std::mutex _mutex;
                std::unordered_map<std::string, SubCallback> _topic_callbacks;  // 主题名 -> 回调函数
                Requestor::ptr _requestor;
        };
    }
}

🏳️‍🌈三、rpc_client 客户端整合

3.1 整体架构

这段代码实现了一个 ​分布式RPC客户端框架,包含四个核心类:

  • RegistryClient(服务注册)
  • DiscoveryClient(服务发现)
  • RpcClient(RPC调用)
  • TopicClient(主题发布/订阅)

它们通过 Dispatcher(消息分发器)和 Requestor(请求处理器)协作,实现服务注册、发现、调用及消息推送功能

graph TD
    A[RegistryClient] -->|注册服务| B(注册中心)
    C[DiscoveryClient] -->|发现服务| B
    D[RpcClient] -->|调用服务| E(服务提供者)
    F[TopicClient] -->|发布/订阅| G(消息中心)

3.2 核心类详解

3.2.1 RegistryClient(服务注册客户端)​

​功能:将本地服务方法注册到远程注册中心。
​成员变量

  • _requestor:管理请求生命周期(超时、重试)。
  • _provider:封装服务注册协议,构造 SERVICE_REGISTRY 请求。
  • _dispatcher:分发注册响应消息。
  • _client:连接注册中心的网络客户端。

关键接口

  • registryMethod(method, host):注册方法 method 到地址 host。
// 示例:注册服务方法
RegistryClient client("127.0.0.1", 8080);
client.registryMethod("add", Address("192.168.1.100", 9090));
// 服务注册客户端,用于将本地服务方法注册到远程注册中心
class RegistryClient {
public:
    using ptr = std::shared_ptr<RegistryClient>;

    // 构造函数传入注册中心的地址信息,用于连接注册中心
    RegistryClient(const std::string& ip, int port)
        : _requestor(std::make_shared<Requestor>()),
          _provider(std::make_shared<client::Provider>(_requestor)),
          _dispatcher(std::make_shared<Dispatcher>()) {
        // 设置响应的回调函数
        auto rsp_cb =
            std::bind(&client::Requestor::onResponse, _requestor.get(),
                      std::placeholders::_1, std::placeholders::_2);

        // 注册 rpc 响应处理函数
        _dispatcher->registerHandler<BaseMessage>(fields::MType::RSP_SERVICE,
                                                  rsp_cb);
        // std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>
        // message_cb = std::bind(&rpc::Dispatcher::onMessage,
        // _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
        auto message_cb =
            std::bind(&Dispatcher::onMessage, _dispatcher.get(),
                      std::placeholders::_1, std::placeholders::_2);

        // 设置客户端连接注册中心的地址
        _client = rpc::ClientFactory::create(ip, port);
        _client->setMessageCallback(message_cb);
        _client->connect();
    }

    // 向外提供的服务注册接口
    bool registryMethod(const std::string& method, const Address& host) {
        return _provider->registryMethod(_client->connection(), method, host);
    }

private:
    Requestor::ptr
        _requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系
    client::Provider::ptr _provider; // 封装服务注册协议,构造 SERVICE_REGISTRY
                                     // 请求,验证响应状态码
    Dispatcher::ptr
        _dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如处理注册响应)
    BaseClient::ptr
        _client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};

3.2.2DiscoveryClient(服务发现客户端)​

​功能:动态发现服务地址,处理上下线通知。
​成员变量

  • _discoverer:维护本地服务地址缓存。
  • _dispatcher:处理服务发现响应和上下线通知。

关键接口

  • serviceDiscovery(method, host):查询 method 的可用地址。
// 示例:发现服务地址
DiscoveryClient client("127.0.0.1", 8080, [](const Address& host) {
    std::cout << "服务下线: " << host.first << std::endl;
});
Address addr;
client.serviceDiscovery("add", addr); // 获取 "add" 方法的地址
// 服务注册客户端,用于将本地服务方法注册到远程注册中心
class RegistryClient {
public:
    using ptr = std::shared_ptr<RegistryClient>;

    // 构造函数传入注册中心的地址信息,用于连接注册中心
    RegistryClient(const std::string& ip, int port)
        : _requestor(std::make_shared<Requestor>()),
          _provider(std::make_shared<client::Provider>(_requestor)),
          _dispatcher(std::make_shared<Dispatcher>()) {
        // 设置响应的回调函数
        auto rsp_cb =
            std::bind(&client::Requestor::onResponse, _requestor.get(),
                      std::placeholders::_1, std::placeholders::_2);

        // 注册 rpc 响应处理函数
        _dispatcher->registerHandler<BaseMessage>(fields::MType::RSP_SERVICE,
                                                  rsp_cb);
        // std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>
        // message_cb = std::bind(&rpc::Dispatcher::onMessage,
        // _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
        auto message_cb =
            std::bind(&Dispatcher::onMessage, _dispatcher.get(),
                      std::placeholders::_1, std::placeholders::_2);

        // 设置客户端连接注册中心的地址
        _client = rpc::ClientFactory::create(ip, port);
        _client->setMessageCallback(message_cb);
        _client->connect();
    }

    // 向外提供的服务注册接口
    bool registryMethod(const std::string& method, const Address& host) {
        return _provider->registryMethod(
            _client->class DiscoveryClient {
            public:
                using ptr = std::shared_ptr<DiscoveryClient>;

                // 构造函数传入注册中心的地址信息,用于连接注册中心
                // DiscoveryClient 是
                // ​服务发现客户端,用于从注册中心动态发现服务提供者的地址,并处理服务上下线通知
                // 连接注册中心:建立与注册中心的网络连接。
                // ​服务发现请求:主动查询指定服务方法的可用地址。
                // ​动态通知处理:监听服务上下线通知,更新本地缓存。
                // ​离线回调机制:当服务下线时,触发用户自定义逻辑。
                DiscoveryClient(const std::string& ip, int port,
                                const Discoverer::OfflineCallback& cb)
                    : _requestor(std::make_shared<Requestor>()),
                      _discoverer(
                          std::make_shared<client::Discoverer>(_requestor, cb)),
                      _dispatcher(std::make_shared<Dispatcher>()) {
                    // 1. 注册响应处理回调(处理服务发现响应)
                    auto rsp_cb =
                        std::bind(&Requestor::onResponse, _requestor.get(),
                                  std::placeholders::_1, std::placeholders::_2);
                    _dispatcher->registerHandler<BaseMessage>(
                        MType::RSP_SERVICE, rsp_cb);

                    // 2. 注册服务请求处理回调(处理服务上下线通知)
                    auto req_cb =
                        std::bind(&client::Discoverer::onServiceRequest,
                                  _discoverer.get(), std::placeholders::_1,
                                  std::placeholders::_2);
                    _dispatcher->registerHandler<ServiceRequest>(
                        MType::REQ_SERVICE, req_cb);

                    // 3. 设置消息总回调入口
                    auto message_cb =
                        std::bind(&Dispatcher::onMessage, _dispatcher.get(),
                                  std::placeholders::_1, std::placeholders::_2);
                    _client = rpc::ClientFactory::create(ip, port);
                    _client->setMessageCallback(message_cb);

                    // 4. 连接注册中心
                    _client->connect();
                }

                // 向外提供的服务发现接口
                bool serviceDiscovery(const std::string& method,
                                      Address& host) {
                    return _discoverer->serviceDiscovery(_client->connection(),
                                                         method, host);
                }

            private:
                Requestor::ptr
                    _requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系。
                client::Discoverer::ptr
                    _discoverer; // 封装服务发现逻辑,维护本地服务地址缓存,处理上下线通知。
                Dispatcher::ptr
                    _dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如服务上下线通知)
                BaseClient::ptr
                    _client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
            };
            connection(), method, host);
    }

private:
    Requestor::ptr
        _requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系
    client::Provider::ptr _provider; // 封装服务注册协议,构造 SERVICE_REGISTRY
                                     // 请求,验证响应状态码
    Dispatcher::ptr
        _dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如处理注册响应)
    BaseClient::ptr
        _client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};

3.2.3RpcClient(RPC调用客户端)​

​功能:根据配置(直连或服务发现)调用远程方法。
​成员变量

  • _enableDiscovery:模式开关(直连/服务发现)。
  • _caller:执行 RPC 调用,封装请求和响应解析。
  • _rpc_clients:连接池(服务发现模式下动态管理)。

​关键接口

  • call(method, params, result):调用远程方法 method。
// 示例:调用远程方法
RpcClient client(true, "127.0.0.1", 8080); // 启用服务发现
Json::Value params, result;
params.append(1); params.append(1);
client.call("add", params, result); // 调用 "add(1,1)"
class RpcClient {
public:
    using ptr = std::shared_ptr<RpcClient>;

    // enableDiscovery --
    // 是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址
    RpcClient(bool enableDiscovery, const std::string& ip, int port)
        : _enableDiscovery(enableDiscovery),
          _requestor(std::make_shared<Requestor>()),
          _dispatcher(std::make_shared<Dispatcher>()),
          _caller(std::make_shared<client::RpcCaller>(_requestor)) {
        // 1. 注册响应处理回调(处理服务发现响应)
        auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),
                                std::placeholders::_1, std::placeholders::_2);
        _dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);

        // 如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client
        // 如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好
        // rpc_client
        if (_enableDiscovery) {
            auto offline_cb =
                std::bind(&RpcClient::delclient, this, std::placeholders::_1);
            _discovery_client =
                std::make_shared<DiscoveryClient>(ip, port, offline_cb);
        } else {
            auto message_cb =
                std::bind(&Dispatcher::onMessage, _dispatcher.get(),
                          std::placeholders::_1, std::placeholders::_2);
            _rpc_client = ClientFactory::create(ip, port);
            _rpc_client->setMessageCallback(message_cb);
            _rpc_client->connect();
        }
    }

    bool call(const std::string& method, const Json::Value& params,
              Json::Value& result) {
        // 获取服务提供者: 1. 服务发现    2. 固定服务提供者
        BaseClient::ptr client = getClient(method);
        if (client.get() == nullptr) {
            ELOG("没有找到相应的客户端信息");
            return false;
        }
        return _caller->call(client->connection(), method, params, result);
    }

    bool call(const std::string& method, const Json::Value& params,
              RpcCaller::JsonAsyncResponse& result) {
        BaseClient::ptr client = getClient(method);
        if (client.get() == nullptr) {
            return false;
        }
        // 3. 通过客户端连接,发送rpc请求
        return _caller->call(client->connection(), method, params, result);
    }

    bool call(const std::string& method, const Json::Value& params,
              const RpcCaller::JsonResponseCallback& cb) {
        BaseClient::ptr client = getClient(method);
        if (client.get() == nullptr) {
            return false;
        }
        // 3. 通过客户端连接,发送rpc请求
        return _caller->call(client->connection(), method, params, cb);
    }

private:
    BaseClient::ptr newClient(const Address& host) {
        auto message_cb =
            std::bind(&Dispatcher::onMessage, _dispatcher.get(),
                      std::placeholders::_1, std::placeholders::_2);
        auto client = ClientFactory::create(host.first, host.second);
        client->setMessageCallback(message_cb);
        client->connect();
        putClient(host, client);
        return client;
    }

    // 获取客户端信息 - 通过地址信息
    BaseClient::ptr getClient(const Address& host) {
        std::unique_lock<std::mutex> lock(_mutex);
        auto it = _rpc_clients.find(host);
        if (it == _rpc_clients.end()) {
            ELOG("没有找到相应的客户端信息");
            return BaseClient::ptr();
        }
        return it->second;
    }

    // 获取客户端信息 - 通过服务方法
    BaseClient::ptr getClient(const std::string& method) {
        BaseClient::ptr client;
        if (_enableDiscovery) {
            // 1. 通过服务器发现,获取服务提供者地址信息
            Address host;
            bool ret = _discovery_client->serviceDiscovery(method, host);
            if (ret == false) {
                ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());
                return BaseClient::ptr();
            }
            // 2. 通过地址信息,获取客户端信息
            client = getClient(host);
            if (client.get() == nullptr) {
                ELOG("没有找到一实例化的客户端信息,则创建");
                client = newClient(host);
            }
        } else {
            client = _rpc_client;
        }
        return client;
    }

    // 添加客户端信息
    void putClient(const Address& host, BaseClient::ptr& client) {
        std::unique_lock<std::mutex> lock(_mutex);
        _rpc_clients.insert(std::make_pair(host, client));
    }

    // 删除客户端信息
    void delclient(const Address& host) {
        std::unique_lock<std::mutex> lock(_mutex);
        _rpc_clients.erase(host);
    }

private:
    struct AddressHash {
        size_t operator()(const Address& host) const {
            std::string addr = host.first + std::to_string(host.second);
            return std::hash<std::string>{}(addr);
        }
    };

private:
    bool _enableDiscovery;     // 模式开关:true 启用服务发现,false
                               // 直连固定服务提供者
    Requestor::ptr _requestor; // 管理请求-响应生命周期,处理超时和重试。
    Dispatcher::ptr _dispatcher; // 分发消息到对应处理器(如 RPC 响应)
    RpcCaller::ptr _caller;      // 执行 RPC 调用,封装请求发送和响应解析逻辑

    DiscoveryClient::ptr
        _discovery_client;       // 服务发现客户端(仅在服务发现模式下有效)
    BaseClient::ptr _rpc_client; // 直连模式下的固定服务提供者连接。

    std::mutex _mutex;
    std::unordered_map<Address, BaseClient::ptr, AddressHash>
        _rpc_clients; // 用于服务发现的客户端连接池
};

3.2.4TopicClient(主题客户端)​

​功能:管理主题的创建、订阅、发布。
​成员变量

  • _topic_manager:处理主题操作(如订阅回调)。
  • _dispatcher:分发主题相关消息。

关键接口

  • subscribeTopic(key, callback):订阅主题 key 并绑定回调。
  • publishTopic(key, msg):向主题 key 发布消息 msg。
// 示例:发布/订阅主题
TopicClient client("127.0.0.1", 9090);
client.subscribeTopic("news", [](const std::string& key, const std::string& msg) {
    std::cout << "收到消息: " << msg << std::endl;
});
client.publishTopic("news", "RPC框架发布!");
class TopicClient {
public:
    using ptr = std::shared_ptr<TopicClient>;
    TopicClient(const std::string& ip, int port)
        : _requestor(std::make_shared<Requestor>()),
          _dispatcher(std::make_shared<Dispatcher>()),
          _topic_manager(std::make_shared<TopicManager>(_requestor)) {
        auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),
                                std::placeholders::_1, std::placeholders::_2);
        _dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);

        auto msg_cb =
            std::bind(&TopicManager::onPublishTopic, _topic_manager.get(),
                      std::placeholders::_1, std::placeholders::_2);
        _dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);

        auto message_cb =
            std::bind(&Dispatcher::onMessage, _dispatcher.get(),
                      std::placeholders::_1, std::placeholders::_2);
        _rpc_client = ClientFactory::create(ip, port);
        _rpc_client->setMessageCallback(message_cb);
        _rpc_client->connect();
    }

    bool createTopic(const std::string& key) {
        return _topic_manager->createTopic(_rpc_client->connection(), key);
    }

    bool removeTopic(const std::string& key) {
        return _topic_manager->removeTopic(_rpc_client->connection(), key);
    }

    bool subscribeTopic(const std::string& key,
                        const TopicManager::SubCallback& cb) {
        return _topic_manager->subscribeTopic(_rpc_client->connection(), key,
                                              cb);
    }

    bool cancelTopic(const std::string& key) {
        return _topic_manager->cancelTopic(_rpc_client->connection(), key);
    }

    bool publishTopic(const std::string& key, const std::string& msg) {
        return _topic_manager->publishTopic(_rpc_client->connection(), key,
                                            msg);
    }

    void shutdown() { _rpc_client->shutdown(); }

private:
    Requestor::ptr _requestor;
    TopicManager::ptr _topic_manager;
    Dispatcher::ptr _dispatcher;
    BaseClient::ptr _rpc_client;
};

3.3 协作逻辑与设计思路

3.3.1 消息分发(Dispatcher)​

​作用:根据消息类型(MType)路由到对应处理器。
​注册逻辑

// 注册处理函数(以 RpcClient 为例)
_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, [](auto conn, auto msg) {
    // 处理 RPC 响应
});

3.3.2 请求处理(Requestor)​

​作用:统一管理请求发送、超时重试、响应映射。
​流程

  • 发送请求时生成唯一ID,记录到等待队列。
  • 接收响应时根据ID匹配请求,触发回调。

3.3.3 服务发现与连接池(RpcClient)​

​动态连接管理

  • 启用服务发现时,通过 DiscoveryClient 获取地址,并维护连接池 _rpc_clients
  • 直连模式时固定连接 _rpc_client

​线程安全:使用 std::mutex 保护连接池操作。

3.3.4 主题管理(TopicManager)​

​回调机制:订阅时绑定回调函数到 _topic_callbacks
​消息推送:服务端广播消息后,客户端通过 onPublishTopic 触发本地回调。

🏳️‍🌈四、主题发布订阅流程示意

截至目前,我们已经封装好了 rpc 项目中所有的内容,现在我们来模拟一下 客户端A发布一个主题消息客户端B接收到这个主题消息 的全部流程

4.1 核心文件与类说明

在这里插入图片描述

4.2 步骤 1:客户端A创建主题

// 1. 创建 TopicManager
auto requestor = std::make_shared<Requestor>();
auto topic_mgr = std::make_shared<client::TopicManager>(requestor);

// 2. 创建主题 "news"
BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
topic_mgr->createTopic(conn, "news");

逻辑

  1. TopicManager::createTopic 发送 TopicOptype::TOPIC_CREATE 请求到服务端。
  2. 服务端 server::TopicManager::onTopicRequest 处理请求,创建全局主题 "news"

4.3 步骤 2:客户端B订阅主题

// 1. 订阅主题 "news",定义回调函数
auto requestor = std::make_shared<Requestor>();
auto topic_mgr = std::make_shared<client::TopicManager>(requestor);

BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
topic_mgr->subscribeTopic(conn, "news", [](const std::string& key, const std::string& msg) {
    std::cout << "收到主题消息: " << msg << std::endl;
});

逻辑

  1. TopicManager::subscribeTopic 发送 TopicOptype::TOPIC_SUBSCRIBE 请求到服务端。
  2. 服务端记录客户端B为 "news" 的订阅者(存储到 server::Topic::_subscribers)。

4.4 步骤 3:客户端A发布消息

// 客户端A继续发布消息
topic_mgr->publishTopic(conn, "news", "Breaking: RPC框架发布!");

逻辑

  1. TopicManager::publishTopic 发送 TopicOptype::TOPIC_PUBLISH 请求,携带消息内容。
  2. 服务端 server::TopicManager::onTopicRequest 处理请求,广播消息给所有订阅者。

4.5 步骤 4:服务端广播消息

// 服务端代码(server/rpc_topic.hpp)
void server::TopicManager::topicPublish(...) {
    // 1. 查找主题 "news"
    auto topic = _topics["news"];
    // 2. 遍历所有订阅者(客户端B)
    for (auto& subscriber : topic->_subscribers) {
        // 3. 通过订阅者连接发送消息
        subscriber->_conn->send(msg);
    }
}

逻辑

  • 服务端通过 Topic::pushMessage 将消息推送到所有订阅者的连接(客户端B)。

4.6 步骤 5:客户端B接收消息

// 客户端B的 TopicManager 处理消息
void client::TopicManager::onPublishTopic(...) {
    // 1. 提取消息内容
    std::string topic_key = msg->topicKey();  // "news"
    std::string topic_msg = msg->topicMsg();  // "Breaking: RPC框架发布!"
    // 2. 触发订阅时定义的回调函数
    callback(topic_key, topic_msg);
}

输出

收到主题消息: Breaking: RPC框架发布!

4.7 关键类交互图

sequenceDiagram
    participant ClientA as 客户端A
    participant Server as 服务端
    participant ClientB as 客户端B

    ClientA->>Server: createTopic("news")
    Server-->>ClientA: 创建成功
    ClientB->>Server: subscribeTopic("news")
    Server-->>ClientB: 订阅成功
    ClientA->>Server: publishTopic("news", "Breaking: RPC框架发布!")
    Server->>ClientB: 推送消息
    ClientB->>ClientB: 触发回调函数

4.8 补充说明

  • ​线程安全
    客户端和服务端的 TopicManager 使用 std::mutex 保护共享数据(如订阅列表)。
  • ​错误处理
    若主题不存在,服务端返回 RCode::RCODE_NOT_FOUND_TOPIC,客户端记录错误日志。
  • ​网络通信
    所有请求通过 Requestor::send 发送,底层使用 BaseConnection 的 TCP 连接。

🏳️‍🌈五、运程调用add方法流程示意

为了方便各位更好地理解 rpc 框架运程调用的功能,这里模拟一下 客户端调用服务端注册好的add方法,来得到1 + 1的结果的整个流程

5.1 核心文件与类说明

在这里插入图片描述

5.2 步骤 1:服务端启动并注册 add 方法

// 文件: test_server.cpp
#include "rpc_server.hpp"
#include "rpc_registry.hpp"
#include "message.hpp"

// 1. 定义服务端加法方法
int add(int a, int b) { return a + b; }

int main() {
    // 2. 创建 RpcServer 并绑定端口
    RpcServer server("0.0.0.0:8080");
    
    // 3. 注册方法到 RpcRegistry
    server.getRegistry()->registerMethod("add", &add);
    
    // 4. 启动服务端监听
    server.start();
}

​逻辑

  • 服务端通过 RpcRegistry::registerMethodadd 方法注册到全局注册表(rpc_registry.hpp)。
  • 注册时生成映射:“add” → 函数指针 &add。

5.3 步骤 2:客户端构造调用请求

// 文件: test_client.cpp
#include "rpc_caller.hpp"
#include "requestor.hpp"

int main() {
    // 1. 创建 RpcCaller 工具
    RpcCaller caller;
    
    // 2. 生成调用 add(1,1) 的请求
    auto req = caller.prepareCall("add", 1, 1);
    
    // 3. 创建 Requestor 发送请求
    Requestor requestor;
    BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
    auto rsp = requestor.send(conn, req);
    
    // 4. 解析响应结果
    if (rsp->rcode() == RCode::RCODE_OK) {
        int result = rsp->getResult<int>();
        std::cout << "1+1=" << result << std::endl; // 输出 1+1=2
    }
}

逻辑

  • RpcCaller::prepareCall(rpc_caller.hpp)生成 MethodRequest 消息,包含方法名 “add” 和参数 [1,1]。
  • Requestor::send(requestor.hpp)将请求序列化并通过 BaseConnection 发送到服务端。

5.4 步骤3:服务端接收请求并路由到 add 方法

// 文件: dispatcher.hpp
// Dispatcher 注册处理函数(服务端初始化时执行)
Dispatcher dispatcher;
dispatcher.registerHandler<MethodRequest>(
    MType::REQ_METHOD, 
    [registry](const BaseConnection::ptr& conn, std::shared_ptr<MethodRequest>& req) {
        // 1. 从 RpcRegistry 查找方法 "add"
        auto method = registry->getMethod(req->method());
        
        // 2. 执行方法并获取结果
        int result = method->invoke<int>(req->params());
        
        // 3. 构造响应消息
        auto rsp = MessageFactory::create<MethodResponse>();
        rsp->setResult(result);
        rsp->setRcode(RCode::RCODE_OK);
        
        // 4. 发送响应
        conn->send(rsp);
    }
);

逻辑

  • Dispatcher(dispatcher.hpp)根据消息类型 MType::REQ_METHOD 路由到注册的 Lambda 处理函数。
  • RpcRegistry 中查找方法 “add”,调用并返回结果。

5.5 步骤4:网络层数据传输

// 文件: net.hpp
// 服务端接收请求(BaseConnection::onMessage)
void BaseConnection::onMessage(const Buffer& buf) {
    // 1. 反序列化为 MethodRequest
    auto msg = deserialize<MethodRequest>(buf);
    
    // 2. 通过 Dispatcher 分发消息
    Dispatcher::instance().onMessage(shared_from_this(), msg);
}

​逻辑

  • 服务端 BaseConnection 接收字节流,反序列化为 MethodRequest 对象。
  • 调用 Dispatcher::onMessage 触发多路转接。

5.6 关键类交互图

sequenceDiagram
    participant Client as 客户端
    participant Requestor as Requestor (requestor.hpp)
    participant Dispatcher as Dispatcher (dispatcher.hpp)
    participant Registry as RpcRegistry (rpc_registry.hpp)
    participant Server as RpcServer (rpc_server.hpp)

    Client->>Requestor: 调用 add(1,1)
    Requestor->>Client: 生成 MethodRequest
    Client->>Server: 发送 MethodRequest (TCP)
    Server->>Dispatcher: 接收消息并路由
    Dispatcher->>Registry: 查找方法 "add"
    Registry->>Dispatcher: 返回函数指针
    Dispatcher->>Server: 调用 add(1,1)
    Server->>Client: 返回 MethodResponse (TCP)
    Client->>Client: 解析结果为 2

5.7 补充说明

  1. 客户端通过 RpcCaller 生成 MethodRequest,通过 Requestor 发送请求。
  2. 服务端通过 Dispatcher 多路转接,将请求路由到 RpcRegistry 中注册的 add 方法。
  3. 网络层通过 BaseConnection 实现 TCP 通信,Dispatcher 确保消息正确路由。
  4. 结果返回通过 MethodResponse 序列化回客户端,完成一次完整的 RPC 调用。

👥总结

本篇博文对 【从零实现Json-Rpc框架】- 项目实现 - 客户端注册主题及整合 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~

请添加图片描述


网站公告

今日签到

点亮在社区的每一天
去签到