📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
📢前言
到目前为止,我们已经将整个 rpc项目 的基本完成了,就只剩下客户端的部分功能了
这篇文章将带大家实现一些客户端的最后三个模块
- 客户端注册模块
- 客户端主题模块
- 客户端整合
本系列博客的代码都在上述库中的下图(demo代表笔者模仿的案例)
🏳️🌈一、rpc_registry 注册模块
1.1 rpc_registry 框架逻辑分析
该代码实现了一个 RPC
客户端核心模块,包含 服务注册、服务发现与动态维护、负载均衡 三大核心功能,通过以下三个类协作完成
graph TD
A[Provider] -->|注册服务| B(注册中心)
C[Discoverer] -->|发现服务| B
C -->|维护本地缓存| D[MethodHost]
1.1.1 Provider类
- 这个类的 核心功能 是 向服务端注册服务方法,并处理注册响应。
- 构造函数接受一个
Requestor
的智能指针,用于发送请求。 registryMethod
方法负责 发送服务注册请求,构造ServiceRequest消息,并通过Requestor发送,然后处理响应。- 如果注册失败,会记录错误日志。
// 核心功能:向服务端注册服务方法,并处理注册响应。
class Provider {
public:
using ptr = std::shared_ptr<Provider>;
Provider(const Requestor::ptr& requestor);
// 注册服务方法
bool registryMethod(const BaseConnection::ptr& conn,
const std::string& method, const Address& host);
private:
Requestor::ptr _requestor;
};
1.1.2 MethodHost类
- 管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
- 成员变量包括互斥锁、轮询索引和主机地址列表。
- 提供添加主机、移除主机、选择主机的方法,这些操作都是线程安全的。
chooseHost
方法使用简单的轮询算法实现负载均衡。
// 核心功能:管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
// 设计目标:提供线程安全的主机列表管理,支持动态更新。
class MethodHost {
public:
using ptr = std::shared_ptr<MethodHost>;
MethodHost();
MethodHost(const std::vector<Address>& hosts);
// 添加主机地址
void appendHost(const Address& host);
// 移除主机地址
void removerHost(const Address& host);
// 轮询选择主机(简单负载均衡)
Address chooseHost();
bool empty();
private:
std::mutex _mutex;
size_t _idx; // 轮询选择下标
std::vector<Address> _hosts; // 主机地址列表
};
1.1.3 Discoverer类
- 发现服务提供者,处理上下线通知,维护本地服务缓存。
- 包含一个离线回调函数,当服务下线时触发。
serviceDiscovery
方法处理服务发现请求,先检查本地缓存,若未命中则向服务端请求,并更新缓存。onServiceRequest
处理服务端推送的上下线通知,更新本地的主机列表。- 使用
MethodHost
类来管理每个方法的主机列表,确保线程安全。
// 核心功能:发现服务提供者,处理上下线通知,维护本地服务缓存。
class Discoverer {
public:
using ptr = std::shared_ptr<Discoverer>;
using OfflineCallback = std::function<void(const Address&)>;
Discoverer(const Requestor::ptr& requestor, const OfflineCallback& cb);
// 主动发起服务发现请求。
// 1. 本地缓存命中:直接返回已缓存的主机地址。
// 2. 缓存未命中:发送 SERVICE_DISCOVERY
// 请求,获取服务地址并更新缓存。
bool serviceDiscovery(const BaseConnection::ptr& conn,
const std::string& method, Address& host);
// 提供给 Dispatcher 模块进行服务上线下线请求处理的回调函数
// 处理服务端推送的上下线通知。
// 设计目标:动态更新本地服务列表,确保客户端始终获取最新服务地址
void onServiceRequest(const BaseConnection::ptr& conn,
const ServiceRequest::ptr& msg);
private:
OfflineCallback _offline_callback;
std::mutex _mutex;
std::unordered_map<std::string, MethodHost::ptr> _method_hosts;
Requestor::ptr _requestor;
};
1.2 Provider类
功能:向注册中心注册服务方法。
关键成员:
_requestor
:发送请求和接收响应的工具类。
核心方法:
- registryMethod(conn, method, host)
发送SERVICE_REGISTRY
请求,将服务方法method
和主机host
注册到注册中心。
class Provider {
public:
using ptr = std::shared_ptr<Provider>;
Provider(const Requestor::ptr& requestor) : _requestor(requestor) {}
// 注册服务方法
bool registryMethod(const BaseConnection::ptr& conn,
const std::string& method, const Address& host) {
// 1. 创建 SERVICE_REGISTRY 类型的请求消息。
// 2. 通过 Requestor 发送请求并等待响应。
// 3. 验证响应类型和状态码,返回注册结果。
auto msg_req = MessageFactory::create<ServiceRequest>();
msg_req->setId(UUID::uuid());
msg_req->setMType(MType::REQ_SERVICE);
msg_req->setMethod(method);
msg_req->setHost(host);
msg_req->setServiceOptype(ServiceOptype::SERVICE_REGISTRY);
BaseMessage::ptr msg_rsp;
bool ret = _requestor->send(conn, msg_req, msg_rsp);
if (ret == false) {
ELOG("%s 服务注册失败!", method.c_str());
return false;
}
auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
if (service_rsp.get() == nullptr) {
ELOG("响应类型向下转换失败!");
return false;
}
if (service_rsp->rcode() != RCode::RCODE_OK) {
ELOG("服务注册失败,原因: %s",
errReason(service_rsp->rcode()).c_str());
return false;
}
return true;
}
private:
Requestor::ptr _requestor;
};
1.3 MethodHost 类
功能:管理某个服务方法的主机列表,支持动态增删和轮询选择。
关键成员:
_hosts
:服务提供者地址列表。_idx
:轮询索引,实现简单负载均衡。
核心方法:
- appendHost(host):线程安全地添加主机地址。
- removeHost(host):线程安全地移除主机地址。
- chooseHost():轮询选择下一个主机地址。
// 核心功能:管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
// 设计目标:提供线程安全的主机列表管理,支持动态更新。
class MethodHost {
public:
using ptr = std::shared_ptr<MethodHost>;
MethodHost() : _idx(0) {}
MethodHost(const std::vector<Address>& hosts)
: _hosts(hosts.begin(), hosts.end()), _idx(0) {}
// 添加主机地址
void appendHost(const Address& host) {
// 中途收到了服务上线请求后被调用
std::unique_lock<std::mutex> lock(_mutex);
_hosts.emplace_back(host);
}
// 移除主机地址
void removerHost(const Address& host) {
// 中途收到了服务下线请求后被调用
std::unique_lock<std::mutex> lock(_mutex);
for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {
if (*it == host) {
_hosts.erase(it);
break;
}
}
}
// 轮询选择主机(简单负载均衡)
Address chooseHost() {
std::unique_lock<std::mutex> lock(_mutex);
size_t pos = _idx++ % _hosts.size();
return _hosts[pos];
}
bool empty() { return _hosts.empty(); }
private:
std::mutex _mutex;
size_t _idx; // 轮询选择下标
std::vector<Address> _hosts; // 主机地址列表
};
1.4 Discoverer 类
功能:动态发现服务地址,处理上下线通知,维护本地缓存。
关键成员:
_method_hosts
:服务名 → 主机列表的映射(使用MethodHost
管理)。_offline_callback
:服务下线时的回调函数。
核心方法:
- serviceDiscovery(conn, method, host)
若本地缓存有效,直接返回地址;否则向注册中心发送SERVICE_DISCOVERY
请求。 - onServiceRequest(conn, msg)
处理服务端推送的上下线通知,更新本地缓存。
// 核心功能:发现服务提供者,处理上下线通知,维护本地服务缓存。
class Discoverer {
public:
using ptr = std::shared_ptr<Discoverer>;
using OfflineCallback = std::function<void(const Address&)>;
Discoverer(const Requestor::ptr& requestor, const OfflineCallback& cb)
: _requestor(requestor), _offline_callback(cb) {}
// 主动发起服务发现请求。
// 1. 本地缓存命中:直接返回已缓存的主机地址。
// 2. 缓存未命中:发送 SERVICE_DISCOVERY
// 请求,获取服务地址并更新缓存。
bool serviceDiscovery(const BaseConnection::ptr& conn,
const std::string& method, Address& host) {
{
// 当前所报关的提供者信息存在,则直接返回地址
std::unique_lock<std::mutex> lock(_mutex);
auto it = _method_hosts.find(method);
if (it != _method_hosts.end()) {
if (it->second->empty() == false) {
host = it->second->chooseHost();
return true;
}
}
}
// 缓存未命中,发起服务发现请求
// 当前服务的提供者为空
auto msg_req = MessageFactory::create<ServiceRequest>();
msg_req->setId(UUID::uuid());
msg_req->setMType(MType::REQ_SERVICE);
msg_req->setMethod(method);
msg_req->setServiceOptype(ServiceOptype::SERVICE_DISCOVERY);
BaseMessage::ptr msg_rsp;
bool ret = _requestor->send(conn, msg_req, msg_rsp);
if (ret == false) {
ELOG("服务发现失败!");
return false;
}
auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
if (service_rsp.get() == nullptr) {
ELOG("服务发现时响应类型向下转换失败!");
return false;
}
if (service_rsp->rcode() != RCode::RCODE_OK) {
ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());
return false;
}
// 走到这里,代表当前服务没有对应主机提过服务的
std::unique_lock<std::mutex> lock(_mutex);
auto method_host = std::make_shared<MethodHost>(service_rsp->Hosts());
if (method_host->empty()) {
ELOG("%s 服务发现时,服务提供者列表为空!", method.c_str());
return false;
}
DLOG("服务发现成功");
host = method_host->chooseHost();
_method_hosts[method] = method_host;
return true;
}
// 提供给 Dispatcher 模块进行服务上线下线请求处理的回调函数
// 处理服务端推送的上下线通知。
// 设计目标:动态更新本地服务列表,确保客户端始终获取最新服务地址
void onServiceRequest(const BaseConnection::ptr& conn,
const ServiceRequest::ptr& msg) {
// 1. 判断是上线还是下线请求,如果都不是那就不用处理了
ServiceOptype optype = msg->Serviceoptype();
std::string method = msg->method();
std::unique_lock<std::mutex> lock(_mutex);
// 2. 上线请求:找到MethodHost,向其中新增一个主机地址
if (optype == ServiceOptype::SERVICE_ONLINE) {
auto it = _method_hosts.find(method);
if (it == _method_hosts.end()) {
MethodHost::ptr method_host = std::shared_ptr<MethodHost>();
method_host->appendHost(msg->host());
_method_hosts[method] = method_host;
} else {
it->second->appendHost(msg->host());
}
}
// 3. 下线请求:找到MethodHost,从其中删除一个主机地址
else if (optype == ServiceOptype::SERVICE_OFFLINE) {
auto it = _method_hosts.find(method);
if (it == _method_hosts.end())
return;
it->second->removerHost(msg->host());
_offline_callback(msg->host());
}
}
private:
OfflineCallback _offline_callback;
std::mutex _mutex;
std::unordered_map<std::string, MethodHost::ptr> _method_hosts;
Requestor::ptr _requestor;
};
🏳️🌈二、rpc_topic 主题模块
先说说 客户端 和 服务端 的主题模块的区别
- 客户端:负责发起主题操作请求,处理服务端推送的消息,不维护全局状态。
- 服务端:维护全局主题和订阅者关系,处理客户端请求并广播消息,不涉及业务逻辑。
- 两者协同工作,客户端通过请求-响应模型与服务端交互,服务端确保消息的正确路由和广播。
2.1 逻辑框架
#pragma once
#include "requestor.hpp"
#include <unordered_set>
namespace rpc
{
namespace client
{
class TopicManager
{
public:
using SubCallback = std::function<void(const std::string &key, const std::string &msg)>;
using ptr = std::shared_ptr<TopicManager>;
TopicManager(const Requestor::ptr &requestor) : _requestor(requestor) {}
// 创建主题
bool createTopic(const BaseConnection::ptr &_conn, const std::string &key);
// 取消主题
bool removeTopic(const BaseConnection::ptr &_conn, const std::string &key);
// 订阅主题
bool subscribeTopic(const BaseConnection::ptr &conn, const std::string &key, const SubCallback &cb);
// 取消订阅主题的订阅者
bool cancelTopic(const BaseConnection::ptr &conn, const std::string &key);
// 发布消息
bool publishTopic(const BaseConnection::ptr &conn, const std::string &key, const std::string &msg);
// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);
private:
void addSubscriber(const std::string &key, const SubCallback &cb);
void delSubscriber(const std::string &key);
const SubCallback getSubcallback(const std::string &key);
bool commonRequest(const BaseConnection::ptr &conn, const std::string &key, TopicOptype type, const std::string &msg = " ");
private:
std::mutex _mutex;
std::unordered_map<std::string, SubCallback> _topic_callbacks; // 主题名 -> 回调函数
Requestor::ptr _requestor;
};
}
}
2.2 核心功能
client::TopicManager
是 RPC 客户端用于管理 主题(Topic)生命周期和消息订阅/发布 的核心类,主要功能包括:
- 主题管理:创建、删除主题。
- 订阅管理:订阅/取消订阅主题,绑定消息回调。
- 消息发布:向指定主题发送消息。
- 消息处理:接收服务端推送的主题消息,触发本地回调。
2.3 接口方法详解
2.3.1 主题管理
// 创建主题
bool createTopic(const BaseConnection::ptr& _conn, const std::string& key) {
return commonRequest(_conn, key, TopicOptype::TOPIC_CREATE);
}
// 删除主题
bool removeTopic(const BaseConnection::ptr& _conn, const std::string& key) {
return commonRequest(_conn, key, TopicOptype::TOPIC_REMOVE);
}
// 创建主题 "news"
topic_mgr->createTopic(conn, "news");
// 删除主题 "news"
topic_mgr->removeTopic(conn, "news");
2.3.2 订阅管理
// 订阅主题
bool subscribeTopic(const BaseConnection::ptr& conn, const std::string& key,
const SubCallback& cb) {
addSubscriber(key, cb);
return commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);
}
// 取消订阅主题的订阅者
bool cancelTopic(const BaseConnection::ptr& conn, const std::string& key) {
delSubscriber(key);
return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);
}
// 订阅主题 "news",定义回调函数
topic_mgr->subscribeTopic(conn, "news", [](const std::string& key, const std::string& msg) {
std::cout << "收到消息: " << msg << std::endl;
});
// 取消订阅 "news"
topic_mgr->cancelTopic(conn, "news");
2.3.3 消息发布
// 发布消息
bool publishTopic(const BaseConnection::ptr& conn, const std::string& key,
const std::string& msg) {
return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);
}
// 发布消息到主题 "news"
topic_mgr->publishTopic(conn, "news", "Breaking: RPC框架发布!");
2.3.4 消息处理
内部流程:
- 验证操作类型:确保消息类型为
TOPIC_PUBLISH
。 - 提取消息内容:获取主题名
topic_key
和消息内容topic_msg
。 - 查找回调函数:通过
getSubcallback
查找本地注册的回调。 - 触发回调:执行回调函数处理消息。
// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn,
const TopicRequest::ptr& msg) {
// 1. 从消息中取出操作类型进行判断,是否时消息请求
auto type = msg->optype(); // 获取主题操作类型
if (type != TopicOptype::TOPIC_PUBLISH) {
ELOG("错误的主题操作类型:%d", static_cast<int>(type));
return;
}
// 2. 取出消息主题名称,以及消息内容
std::string topic_key = msg->topicKey();
std::string topic_msg = msg->topicMsg();
// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错
auto callback = getSubcallback(topic_key);
if (!callback) {
ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())
return;
}
return callback(topic_key, topic_msg);
}
2.4 核心逻辑:commonRequest 方法
所有主题操作请求通过 commonRequest 统一处理,流程如下
2.4.1 构造请求:
auto msg_rsp = MessageFactory::create<TopicRequest>();
msg_rsp->setTopicKey(key);
msg_rsp->setOptype(type); // 设置操作类型(如 TOPIC_CREATE)
2.4.2 发送请求
bool ret = _requestor->send(conn, msg_rsp, msg_rep);
2.4.3 处理响应:
- 检查响应类型是否为
TopicResponse
。 - 验证状态码
rcode()
是否成功。
2.4.4 实现代码
// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn,
const TopicRequest::ptr& msg) {
// 1. 从消息中取出操作类型进行判断,是否时消息请求
auto type = msg->optype(); // 获取主题操作类型
if (type != TopicOptype::TOPIC_PUBLISH) {
ELOG("错误的主题操作类型:%d", static_cast<int>(type));
return;
}
// 2. 取出消息主题名称,以及消息内容
std::string topic_key = msg->topicKey();
std::string topic_msg = msg->topicMsg();
bool commonRequest(const BaseConnection::ptr& conn, const std::string& key,
TopicOptype type, const std::string& msg = " ") {
// 1. 构造请求对象,并填充数据
auto msg_rsp = MessageFactory::create<TopicRequest>();
msg_rsp->setId(UUID::uuid());
msg_rsp->setMType(MType::REQ_TOPIC);
msg_rsp->setOptype(type);
msg_rsp->setTopicKey(key);
if (type == TopicOptype::TOPIC_PUBLISH)
msg_rsp->setTopicMsg(msg);
// 2. 向 服务端 发送 同步 请求,等待响应
BaseMessage::ptr msg_rep;
bool ret = _requestor->send(conn, msg_rsp, msg_rep);
if (ret == false) {
ELOG("主题操作请求失败!");
return false;
}
// 3. 获取服务端响应信息,判断请求处理是否成功
auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rep);
if (!topic_rsp_msg) {
ELOG("主题操作响应,但向下转换时失败");
return false;
}
if (topic_rsp_msg->rcode() != RCode::RCODE_OK) {
ELOG("主题操作请求出错:%s",
errReason(topic_rsp_msg->rcode()).c_str());
return false;
}
ELOG("主题操作请求成功");
return true;
}
// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错
auto callback = getSubcallback(topic_key);
if (!callback) {
ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())
return;
}
return callback(topic_key, topic_msg);
}
2.5 整体代码
#pragma once
#include "requestor.hpp"
#include <unordered_set>
namespace rpc{
namespace client{
class TopicManager{
public:
using SubCallback = std::function<void(const std::string& key, const std::string& msg)>;
using ptr = std::shared_ptr<TopicManager>;
TopicManager(const Requestor::ptr& requestor) : _requestor(requestor) {}
// 创建主题
bool createTopic(const BaseConnection::ptr& _conn, const std::string& key){
return commonRequest(_conn, key, TopicOptype::TOPIC_CREATE);
}
// 删除主题
bool removeTopic(const BaseConnection::ptr& _conn, const std::string& key){
return commonRequest(_conn, key, TopicOptype::TOPIC_REMOVE);
}
// 订阅主题
bool subscribeTopic(const BaseConnection::ptr& conn, const std::string& key, const SubCallback& cb){
addSubscriber(key, cb);
return commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);
}
// 取消订阅主题的订阅者
bool cancelTopic(const BaseConnection::ptr& conn, const std::string& key){
delSubscriber(key);
return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);
}
// 发布消息
bool publishTopic(const BaseConnection::ptr& conn, const std::string& key, const std::string& msg){
return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);
}
// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){
// 1. 从消息中取出操作类型进行判断,是否时消息请求
auto type = msg->optype(); // 获取主题操作类型
if(type != TopicOptype::TOPIC_PUBLISH){
ELOG("错误的主题操作类型:%d", static_cast<int>(type));
return;
}
// 2. 取出消息主题名称,以及消息内容
std::string topic_key = msg->topicKey();
std::string topic_msg = msg->topicMsg();
// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错
auto callback = getSubcallback(topic_key);
if(!callback){
ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())
return;
}
return callback(topic_key, topic_msg);
}
private:
void addSubscriber(const std::string& key, const SubCallback& cb){
std::unique_lock<std::mutex> lock(_mutex);
_topic_callbacks.insert(std::make_pair(key, cb));
}
void delSubscriber(const std::string& key){
std::unique_lock<std::mutex> lock(_mutex);
_topic_callbacks.erase(key);
}
const SubCallback getSubcallback(const std::string& key){
std::unique_lock<std::mutex> _lock(_mutex);
auto it = _topic_callbacks.find(key);
if(it == _topic_callbacks.end()){
ELOG("主题 %s 没有订阅者", key.c_str());
return SubCallback();
}
return it->second;
}
bool commonRequest(const BaseConnection::ptr& conn, const std::string& key, TopicOptype type, const std::string& msg = " "){
// 1. 构造请求对象,并填充数据
auto msg_rsp = MessageFactory::create<TopicRequest>();
msg_rsp->setId(UUID::uuid());
msg_rsp->setMType(MType::REQ_TOPIC);
msg_rsp->setOptype(type);
msg_rsp->setTopicKey(key);
if(type == TopicOptype::TOPIC_PUBLISH)
msg_rsp->setTopicMsg(msg);
// 2. 向 服务端 发送 同步 请求,等待响应
BaseMessage::ptr msg_rep;
bool ret = _requestor->send(conn, msg_rsp, msg_rep);
if(ret == false){
ELOG("主题操作请求失败!");
return false;
}
// 3. 获取服务端响应信息,判断请求处理是否成功
auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rep);
if(!topic_rsp_msg){
ELOG("主题操作响应,但向下转换时失败");
return false;
}
if(topic_rsp_msg->rcode() != RCode::RCODE_OK){
ELOG("主题操作请求出错:%s", errReason(topic_rsp_msg->rcode()).c_str());
return false;
}
ELOG("主题操作请求成功");
return true;
}
private:
std::mutex _mutex;
std::unordered_map<std::string, SubCallback> _topic_callbacks; // 主题名 -> 回调函数
Requestor::ptr _requestor;
};
}
}
🏳️🌈三、rpc_client 客户端整合
3.1 整体架构
这段代码实现了一个 分布式RPC客户端框架,包含四个核心类:
RegistryClient
(服务注册)DiscoveryClient
(服务发现)RpcClient
(RPC调用)TopicClient
(主题发布/订阅)
它们通过 Dispatcher
(消息分发器)和 Requestor
(请求处理器)协作,实现服务注册、发现、调用及消息推送功能。
graph TD
A[RegistryClient] -->|注册服务| B(注册中心)
C[DiscoveryClient] -->|发现服务| B
D[RpcClient] -->|调用服务| E(服务提供者)
F[TopicClient] -->|发布/订阅| G(消息中心)
3.2 核心类详解
3.2.1 RegistryClient(服务注册客户端)
功能:将本地服务方法注册到远程注册中心。
成员变量:
_requestor
:管理请求生命周期(超时、重试)。_provider
:封装服务注册协议,构造SERVICE_REGISTRY
请求。_dispatcher
:分发注册响应消息。_client
:连接注册中心的网络客户端。
关键接口:
registryMethod(method, host)
:注册方法method
到地址 host。
// 示例:注册服务方法
RegistryClient client("127.0.0.1", 8080);
client.registryMethod("add", Address("192.168.1.100", 9090));
// 服务注册客户端,用于将本地服务方法注册到远程注册中心
class RegistryClient {
public:
using ptr = std::shared_ptr<RegistryClient>;
// 构造函数传入注册中心的地址信息,用于连接注册中心
RegistryClient(const std::string& ip, int port)
: _requestor(std::make_shared<Requestor>()),
_provider(std::make_shared<client::Provider>(_requestor)),
_dispatcher(std::make_shared<Dispatcher>()) {
// 设置响应的回调函数
auto rsp_cb =
std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
// 注册 rpc 响应处理函数
_dispatcher->registerHandler<BaseMessage>(fields::MType::RSP_SERVICE,
rsp_cb);
// std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>
// message_cb = std::bind(&rpc::Dispatcher::onMessage,
// _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
auto message_cb =
std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
// 设置客户端连接注册中心的地址
_client = rpc::ClientFactory::create(ip, port);
_client->setMessageCallback(message_cb);
_client->connect();
}
// 向外提供的服务注册接口
bool registryMethod(const std::string& method, const Address& host) {
return _provider->registryMethod(_client->connection(), method, host);
}
private:
Requestor::ptr
_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系
client::Provider::ptr _provider; // 封装服务注册协议,构造 SERVICE_REGISTRY
// 请求,验证响应状态码
Dispatcher::ptr
_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如处理注册响应)
BaseClient::ptr
_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};
3.2.2DiscoveryClient(服务发现客户端)
功能:动态发现服务地址,处理上下线通知。
成员变量:
_discoverer
:维护本地服务地址缓存。_dispatcher
:处理服务发现响应和上下线通知。
关键接口:
serviceDiscovery(method, host
):查询method
的可用地址。
// 示例:发现服务地址
DiscoveryClient client("127.0.0.1", 8080, [](const Address& host) {
std::cout << "服务下线: " << host.first << std::endl;
});
Address addr;
client.serviceDiscovery("add", addr); // 获取 "add" 方法的地址
// 服务注册客户端,用于将本地服务方法注册到远程注册中心
class RegistryClient {
public:
using ptr = std::shared_ptr<RegistryClient>;
// 构造函数传入注册中心的地址信息,用于连接注册中心
RegistryClient(const std::string& ip, int port)
: _requestor(std::make_shared<Requestor>()),
_provider(std::make_shared<client::Provider>(_requestor)),
_dispatcher(std::make_shared<Dispatcher>()) {
// 设置响应的回调函数
auto rsp_cb =
std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
// 注册 rpc 响应处理函数
_dispatcher->registerHandler<BaseMessage>(fields::MType::RSP_SERVICE,
rsp_cb);
// std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>
// message_cb = std::bind(&rpc::Dispatcher::onMessage,
// _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
auto message_cb =
std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
// 设置客户端连接注册中心的地址
_client = rpc::ClientFactory::create(ip, port);
_client->setMessageCallback(message_cb);
_client->connect();
}
// 向外提供的服务注册接口
bool registryMethod(const std::string& method, const Address& host) {
return _provider->registryMethod(
_client->class DiscoveryClient {
public:
using ptr = std::shared_ptr<DiscoveryClient>;
// 构造函数传入注册中心的地址信息,用于连接注册中心
// DiscoveryClient 是
// 服务发现客户端,用于从注册中心动态发现服务提供者的地址,并处理服务上下线通知
// 连接注册中心:建立与注册中心的网络连接。
// 服务发现请求:主动查询指定服务方法的可用地址。
// 动态通知处理:监听服务上下线通知,更新本地缓存。
// 离线回调机制:当服务下线时,触发用户自定义逻辑。
DiscoveryClient(const std::string& ip, int port,
const Discoverer::OfflineCallback& cb)
: _requestor(std::make_shared<Requestor>()),
_discoverer(
std::make_shared<client::Discoverer>(_requestor, cb)),
_dispatcher(std::make_shared<Dispatcher>()) {
// 1. 注册响应处理回调(处理服务发现响应)
auto rsp_cb =
std::bind(&Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(
MType::RSP_SERVICE, rsp_cb);
// 2. 注册服务请求处理回调(处理服务上下线通知)
auto req_cb =
std::bind(&client::Discoverer::onServiceRequest,
_discoverer.get(), std::placeholders::_1,
std::placeholders::_2);
_dispatcher->registerHandler<ServiceRequest>(
MType::REQ_SERVICE, req_cb);
// 3. 设置消息总回调入口
auto message_cb =
std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_client = rpc::ClientFactory::create(ip, port);
_client->setMessageCallback(message_cb);
// 4. 连接注册中心
_client->connect();
}
// 向外提供的服务发现接口
bool serviceDiscovery(const std::string& method,
Address& host) {
return _discoverer->serviceDiscovery(_client->connection(),
method, host);
}
private:
Requestor::ptr
_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系。
client::Discoverer::ptr
_discoverer; // 封装服务发现逻辑,维护本地服务地址缓存,处理上下线通知。
Dispatcher::ptr
_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如服务上下线通知)
BaseClient::ptr
_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};
connection(), method, host);
}
private:
Requestor::ptr
_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系
client::Provider::ptr _provider; // 封装服务注册协议,构造 SERVICE_REGISTRY
// 请求,验证响应状态码
Dispatcher::ptr
_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如处理注册响应)
BaseClient::ptr
_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};
3.2.3RpcClient(RPC调用客户端)
功能:根据配置(直连或服务发现)调用远程方法。
成员变量:
_enableDiscovery
:模式开关(直连/服务发现)。_caller
:执行 RPC 调用,封装请求和响应解析。_rpc_clients
:连接池(服务发现模式下动态管理)。
关键接口:
call(method, params, result)
:调用远程方法 method。
// 示例:调用远程方法
RpcClient client(true, "127.0.0.1", 8080); // 启用服务发现
Json::Value params, result;
params.append(1); params.append(1);
client.call("add", params, result); // 调用 "add(1,1)"
class RpcClient {
public:
using ptr = std::shared_ptr<RpcClient>;
// enableDiscovery --
// 是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址
RpcClient(bool enableDiscovery, const std::string& ip, int port)
: _enableDiscovery(enableDiscovery),
_requestor(std::make_shared<Requestor>()),
_dispatcher(std::make_shared<Dispatcher>()),
_caller(std::make_shared<client::RpcCaller>(_requestor)) {
// 1. 注册响应处理回调(处理服务发现响应)
auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);
// 如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client
// 如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好
// rpc_client
if (_enableDiscovery) {
auto offline_cb =
std::bind(&RpcClient::delclient, this, std::placeholders::_1);
_discovery_client =
std::make_shared<DiscoveryClient>(ip, port, offline_cb);
} else {
auto message_cb =
std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_rpc_client = ClientFactory::create(ip, port);
_rpc_client->setMessageCallback(message_cb);
_rpc_client->connect();
}
}
bool call(const std::string& method, const Json::Value& params,
Json::Value& result) {
// 获取服务提供者: 1. 服务发现 2. 固定服务提供者
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
ELOG("没有找到相应的客户端信息");
return false;
}
return _caller->call(client->connection(), method, params, result);
}
bool call(const std::string& method, const Json::Value& params,
RpcCaller::JsonAsyncResponse& result) {
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
return false;
}
// 3. 通过客户端连接,发送rpc请求
return _caller->call(client->connection(), method, params, result);
}
bool call(const std::string& method, const Json::Value& params,
const RpcCaller::JsonResponseCallback& cb) {
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
return false;
}
// 3. 通过客户端连接,发送rpc请求
return _caller->call(client->connection(), method, params, cb);
}
private:
BaseClient::ptr newClient(const Address& host) {
auto message_cb =
std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
auto client = ClientFactory::create(host.first, host.second);
client->setMessageCallback(message_cb);
client->connect();
putClient(host, client);
return client;
}
// 获取客户端信息 - 通过地址信息
BaseClient::ptr getClient(const Address& host) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _rpc_clients.find(host);
if (it == _rpc_clients.end()) {
ELOG("没有找到相应的客户端信息");
return BaseClient::ptr();
}
return it->second;
}
// 获取客户端信息 - 通过服务方法
BaseClient::ptr getClient(const std::string& method) {
BaseClient::ptr client;
if (_enableDiscovery) {
// 1. 通过服务器发现,获取服务提供者地址信息
Address host;
bool ret = _discovery_client->serviceDiscovery(method, host);
if (ret == false) {
ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());
return BaseClient::ptr();
}
// 2. 通过地址信息,获取客户端信息
client = getClient(host);
if (client.get() == nullptr) {
ELOG("没有找到一实例化的客户端信息,则创建");
client = newClient(host);
}
} else {
client = _rpc_client;
}
return client;
}
// 添加客户端信息
void putClient(const Address& host, BaseClient::ptr& client) {
std::unique_lock<std::mutex> lock(_mutex);
_rpc_clients.insert(std::make_pair(host, client));
}
// 删除客户端信息
void delclient(const Address& host) {
std::unique_lock<std::mutex> lock(_mutex);
_rpc_clients.erase(host);
}
private:
struct AddressHash {
size_t operator()(const Address& host) const {
std::string addr = host.first + std::to_string(host.second);
return std::hash<std::string>{}(addr);
}
};
private:
bool _enableDiscovery; // 模式开关:true 启用服务发现,false
// 直连固定服务提供者
Requestor::ptr _requestor; // 管理请求-响应生命周期,处理超时和重试。
Dispatcher::ptr _dispatcher; // 分发消息到对应处理器(如 RPC 响应)
RpcCaller::ptr _caller; // 执行 RPC 调用,封装请求发送和响应解析逻辑
DiscoveryClient::ptr
_discovery_client; // 服务发现客户端(仅在服务发现模式下有效)
BaseClient::ptr _rpc_client; // 直连模式下的固定服务提供者连接。
std::mutex _mutex;
std::unordered_map<Address, BaseClient::ptr, AddressHash>
_rpc_clients; // 用于服务发现的客户端连接池
};
3.2.4TopicClient(主题客户端)
功能:管理主题的创建、订阅、发布。
成员变量:
_topic_manager
:处理主题操作(如订阅回调)。_dispatcher
:分发主题相关消息。
关键接口:
subscribeTopic(key, callback)
:订阅主题 key 并绑定回调。publishTopic(key, msg)
:向主题 key 发布消息 msg。
// 示例:发布/订阅主题
TopicClient client("127.0.0.1", 9090);
client.subscribeTopic("news", [](const std::string& key, const std::string& msg) {
std::cout << "收到消息: " << msg << std::endl;
});
client.publishTopic("news", "RPC框架发布!");
class TopicClient {
public:
using ptr = std::shared_ptr<TopicClient>;
TopicClient(const std::string& ip, int port)
: _requestor(std::make_shared<Requestor>()),
_dispatcher(std::make_shared<Dispatcher>()),
_topic_manager(std::make_shared<TopicManager>(_requestor)) {
auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);
auto msg_cb =
std::bind(&TopicManager::onPublishTopic, _topic_manager.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);
auto message_cb =
std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_rpc_client = ClientFactory::create(ip, port);
_rpc_client->setMessageCallback(message_cb);
_rpc_client->connect();
}
bool createTopic(const std::string& key) {
return _topic_manager->createTopic(_rpc_client->connection(), key);
}
bool removeTopic(const std::string& key) {
return _topic_manager->removeTopic(_rpc_client->connection(), key);
}
bool subscribeTopic(const std::string& key,
const TopicManager::SubCallback& cb) {
return _topic_manager->subscribeTopic(_rpc_client->connection(), key,
cb);
}
bool cancelTopic(const std::string& key) {
return _topic_manager->cancelTopic(_rpc_client->connection(), key);
}
bool publishTopic(const std::string& key, const std::string& msg) {
return _topic_manager->publishTopic(_rpc_client->connection(), key,
msg);
}
void shutdown() { _rpc_client->shutdown(); }
private:
Requestor::ptr _requestor;
TopicManager::ptr _topic_manager;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _rpc_client;
};
3.3 协作逻辑与设计思路
3.3.1 消息分发(Dispatcher)
作用:根据消息类型(MType)路由到对应处理器。
注册逻辑:
// 注册处理函数(以 RpcClient 为例)
_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, [](auto conn, auto msg) {
// 处理 RPC 响应
});
3.3.2 请求处理(Requestor)
作用:统一管理请求发送、超时重试、响应映射。
流程:
- 发送请求时生成唯一ID,记录到等待队列。
- 接收响应时根据ID匹配请求,触发回调。
3.3.3 服务发现与连接池(RpcClient)
动态连接管理:
- 启用服务发现时,通过
DiscoveryClient
获取地址,并维护连接池_rpc_clients
。 - 直连模式时固定连接
_rpc_client
。
线程安全:使用 std::mutex 保护连接池操作。
3.3.4 主题管理(TopicManager)
回调机制:订阅时绑定回调函数到 _topic_callbacks
。
消息推送:服务端广播消息后,客户端通过 onPublishTopic
触发本地回调。
🏳️🌈四、主题发布订阅流程示意
截至目前,我们已经封装好了 rpc 项目中所有的内容,现在我们来模拟一下 客户端A发布一个主题消息,客户端B接收到这个主题消息 的全部流程
4.1 核心文件与类说明
4.2 步骤 1:客户端A创建主题
// 1. 创建 TopicManager
auto requestor = std::make_shared<Requestor>();
auto topic_mgr = std::make_shared<client::TopicManager>(requestor);
// 2. 创建主题 "news"
BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
topic_mgr->createTopic(conn, "news");
逻辑:
TopicManager::createTopic
发送TopicOptype::TOPIC_CREATE
请求到服务端。- 服务端
server::TopicManager::onTopicRequest
处理请求,创建全局主题"news"
。
4.3 步骤 2:客户端B订阅主题
// 1. 订阅主题 "news",定义回调函数
auto requestor = std::make_shared<Requestor>();
auto topic_mgr = std::make_shared<client::TopicManager>(requestor);
BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
topic_mgr->subscribeTopic(conn, "news", [](const std::string& key, const std::string& msg) {
std::cout << "收到主题消息: " << msg << std::endl;
});
逻辑:
TopicManager::subscribeTopic
发送TopicOptype::TOPIC_SUBSCRIBE
请求到服务端。- 服务端记录客户端B为
"news"
的订阅者(存储到server::Topic::_subscribers
)。
4.4 步骤 3:客户端A发布消息
// 客户端A继续发布消息
topic_mgr->publishTopic(conn, "news", "Breaking: RPC框架发布!");
逻辑:
TopicManager::publishTopic
发送TopicOptype::TOPIC_PUBLISH
请求,携带消息内容。- 服务端
server::TopicManager::onTopicRequest
处理请求,广播消息给所有订阅者。
4.5 步骤 4:服务端广播消息
// 服务端代码(server/rpc_topic.hpp)
void server::TopicManager::topicPublish(...) {
// 1. 查找主题 "news"
auto topic = _topics["news"];
// 2. 遍历所有订阅者(客户端B)
for (auto& subscriber : topic->_subscribers) {
// 3. 通过订阅者连接发送消息
subscriber->_conn->send(msg);
}
}
逻辑:
- 服务端通过
Topic::pushMessage
将消息推送到所有订阅者的连接(客户端B)。
4.6 步骤 5:客户端B接收消息
// 客户端B的 TopicManager 处理消息
void client::TopicManager::onPublishTopic(...) {
// 1. 提取消息内容
std::string topic_key = msg->topicKey(); // "news"
std::string topic_msg = msg->topicMsg(); // "Breaking: RPC框架发布!"
// 2. 触发订阅时定义的回调函数
callback(topic_key, topic_msg);
}
输出
收到主题消息: Breaking: RPC框架发布!
4.7 关键类交互图
sequenceDiagram
participant ClientA as 客户端A
participant Server as 服务端
participant ClientB as 客户端B
ClientA->>Server: createTopic("news")
Server-->>ClientA: 创建成功
ClientB->>Server: subscribeTopic("news")
Server-->>ClientB: 订阅成功
ClientA->>Server: publishTopic("news", "Breaking: RPC框架发布!")
Server->>ClientB: 推送消息
ClientB->>ClientB: 触发回调函数
4.8 补充说明
- 线程安全:
客户端和服务端的TopicManager
使用std::mutex
保护共享数据(如订阅列表)。 - 错误处理:
若主题不存在,服务端返回RCode::RCODE_NOT_FOUND_TOPIC
,客户端记录错误日志。 - 网络通信:
所有请求通过Requestor::send
发送,底层使用BaseConnection
的 TCP 连接。
🏳️🌈五、运程调用add方法流程示意
为了方便各位更好地理解 rpc 框架运程调用的功能,这里模拟一下 客户端调用服务端注册好的add方法,来得到1 + 1的结果的整个流程
5.1 核心文件与类说明
5.2 步骤 1:服务端启动并注册 add 方法
// 文件: test_server.cpp
#include "rpc_server.hpp"
#include "rpc_registry.hpp"
#include "message.hpp"
// 1. 定义服务端加法方法
int add(int a, int b) { return a + b; }
int main() {
// 2. 创建 RpcServer 并绑定端口
RpcServer server("0.0.0.0:8080");
// 3. 注册方法到 RpcRegistry
server.getRegistry()->registerMethod("add", &add);
// 4. 启动服务端监听
server.start();
}
逻辑:
- 服务端通过
RpcRegistry::registerMethod
将add
方法注册到全局注册表(rpc_registry.hpp
)。 - 注册时生成映射:“add” → 函数指针 &add。
5.3 步骤 2:客户端构造调用请求
// 文件: test_client.cpp
#include "rpc_caller.hpp"
#include "requestor.hpp"
int main() {
// 1. 创建 RpcCaller 工具
RpcCaller caller;
// 2. 生成调用 add(1,1) 的请求
auto req = caller.prepareCall("add", 1, 1);
// 3. 创建 Requestor 发送请求
Requestor requestor;
BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
auto rsp = requestor.send(conn, req);
// 4. 解析响应结果
if (rsp->rcode() == RCode::RCODE_OK) {
int result = rsp->getResult<int>();
std::cout << "1+1=" << result << std::endl; // 输出 1+1=2
}
}
逻辑:
RpcCaller::prepareCall
(rpc_caller.hpp)生成MethodRequest
消息,包含方法名 “add” 和参数 [1,1]。Requestor::send
(requestor.hpp)将请求序列化并通过BaseConnection
发送到服务端。
5.4 步骤3:服务端接收请求并路由到 add 方法
// 文件: dispatcher.hpp
// Dispatcher 注册处理函数(服务端初始化时执行)
Dispatcher dispatcher;
dispatcher.registerHandler<MethodRequest>(
MType::REQ_METHOD,
[registry](const BaseConnection::ptr& conn, std::shared_ptr<MethodRequest>& req) {
// 1. 从 RpcRegistry 查找方法 "add"
auto method = registry->getMethod(req->method());
// 2. 执行方法并获取结果
int result = method->invoke<int>(req->params());
// 3. 构造响应消息
auto rsp = MessageFactory::create<MethodResponse>();
rsp->setResult(result);
rsp->setRcode(RCode::RCODE_OK);
// 4. 发送响应
conn->send(rsp);
}
);
逻辑:
Dispatcher
(dispatcher.hpp)根据消息类型MType::REQ_METHOD
路由到注册的Lambda
处理函数。- 从
RpcRegistry
中查找方法 “add”,调用并返回结果。
5.5 步骤4:网络层数据传输
// 文件: net.hpp
// 服务端接收请求(BaseConnection::onMessage)
void BaseConnection::onMessage(const Buffer& buf) {
// 1. 反序列化为 MethodRequest
auto msg = deserialize<MethodRequest>(buf);
// 2. 通过 Dispatcher 分发消息
Dispatcher::instance().onMessage(shared_from_this(), msg);
}
逻辑:
- 服务端
BaseConnection
接收字节流,反序列化为MethodRequest
对象。 - 调用
Dispatcher::onMessage
触发多路转接。
5.6 关键类交互图
sequenceDiagram
participant Client as 客户端
participant Requestor as Requestor (requestor.hpp)
participant Dispatcher as Dispatcher (dispatcher.hpp)
participant Registry as RpcRegistry (rpc_registry.hpp)
participant Server as RpcServer (rpc_server.hpp)
Client->>Requestor: 调用 add(1,1)
Requestor->>Client: 生成 MethodRequest
Client->>Server: 发送 MethodRequest (TCP)
Server->>Dispatcher: 接收消息并路由
Dispatcher->>Registry: 查找方法 "add"
Registry->>Dispatcher: 返回函数指针
Dispatcher->>Server: 调用 add(1,1)
Server->>Client: 返回 MethodResponse (TCP)
Client->>Client: 解析结果为 2
5.7 补充说明
- 客户端通过
RpcCaller
生成MethodRequest
,通过Requestor
发送请求。 - 服务端通过
Dispatcher
多路转接,将请求路由到RpcRegistry
中注册的 add 方法。 - 网络层通过
BaseConnection
实现 TCP 通信,Dispatcher
确保消息正确路由。 - 结果返回通过
MethodResponse
序列化回客户端,完成一次完整的 RPC 调用。
👥总结
本篇博文对 【从零实现Json-Rpc框架】- 项目实现 - 客户端注册主题及整合 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~