RegistryClient接口
设计思路
这个是用来实现provider的register_client模块的,当Rpc_server模块启动enableRegistry,就会调用RegistryClient接口。然后实例化了一个register_client对象,就会向register——server注册服务。
这个RegistryClient实现的逻辑就是:向外提供一个服务注册的接口registryMethod,这个接口是对rpc_registry模块的registryMethod进行的封装,它隐藏了底层的网络通信细节和消息处理逻辑。
客户端通过RegistryClient只需提供方法名和主机地址信息就能完成服务注册,而不需要关心:
- 如何创建与注册中心的连接
- 如何构造注册请求消息
- 如何发送请求和接收响应
- 如何处理响应结果
其次就是对于构造函数里面就是绑定两个回调函数,一个是对于消息到来的回调message_cb,一个是对于响应的回调rsp_cb ,
这两个回调函数之间存在层级调用关系:
message_cb 是一级回调:
- 网络层收到任何消息都会触发这个回调
- 它将所有收到的消息转发给 Dispatcher::onMessage 方法
- 这是消息进入系统的统一入口点
rpc_cb 是二级回调:
- 它通过 registerHandler 注册到 Dispatcher 中
- 只处理特定类型的消息(REQ_RPC)
- 在 Dispatcher::onMessage 内部被调用
就是当服务注册发送给注册中心了之后,你肯定是要收到一个响应的,不然你怎么知道你成功注册了没有呢。所以就需要给register_client设置一个消息回调函数,当有消息到来了,然后调用message_cb,把这个消息传递给Dispatcher进行筛选的,Dispatcher发现这个是个RSP_SERVICE就会调用rsp_cb了。
网络层收到消息 -> 调用 message_cb -> Dispatcher::onMessage -> 根据消息类型查找处理函数 -> 找到并调用 rpc_cb (RpcRouter::onRpcRequest)
图解
代码
class RegistryClient {
public:
using ptr = std::shared_ptr<RegistryClient>;
//构造函数传入注册中心的地址信息,用于连接注册中心
RegistryClient(const std::string &ip, int port):
_requestor(std::make_shared<Requestor>()),
_provider(std::make_shared<client::Provider>(_requestor)),
_dispatcher(std::make_shared<Dispatcher>()) {
auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_client = ClientFactory::create(ip, port);
_client->setMessageCallback(message_cb);
_client->connect();
}
//向外提供的服务注册接口
bool registryMethod(const std::string &method, const Address &host) {
return _provider->registryMethod(_client->connection(), method, host);
}
private:
Requestor::ptr _requestor;
client::Provider::ptr _provider;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _client;
};
DiscoveryClient接口
设计思路
这个DiscoveryClient实现的逻辑就是:向外提供一个服务发现的接口serviceDiscovery,这个接口是对client::Discoverer模块的serviceDiscovery方法进行的封装,它隐藏了底层的网络通信细节和消息处理逻辑。
客户端通过DiscoveryClient只需提供方法名就能获取到提供该服务的主机地址,而不需要关心:
- 如何创建与注册中心的连接
- 如何构造服务发现请求消息
- 如何发送请求和接收响应
- 如何处理响应结果
具体就是对于构造函数而言就是绑定了两个回调函数,一个是对于消息到来的回调message_cb,一个是对于响应的回调rsp_cb,还有一个是对于服务请求的回调req_cb,为什么会多一个req_cb呢,req顾名思义就是请求,也就是别人主动发起请求到discovery_client,那么就是当服务提供者上线或下线时,注册中心会主动向所有DiscoveryClient发送REQ_SERVICE类型的消息,通知服务状态变化。
这三个回调函数之间存在层级调用关系:
message_cb 是一级回调:
- 网络层收到任何消息都会触发这个回调
- 它将所有收到的消息转发给 Dispatcher::onMessage 方法
- 这是消息进入系统的统一入口点
rsp_cb 和 req_cb 是二级回调:
- 它们通过 registerHandler 注册到 Dispatcher 中
- 只处理特定类型的消息(RSP_SERVICE和REQ_SERVICE)
- 在 Dispatcher::onMessage 内部被调用
就是当服务发现发送给注册中心了之后,你肯定是要收到一个响应的,不然你怎么知道你成功发现服务了没有呢。所以就需要给register_client设置一个消息回调函数,当有消息到来了,然后调用message_cb,把这个消息传递给Dispatcher进行筛选的,Dispatcher发现这个是个RSP_SERVICE就会调用rsp_cb了。
网络层收到消息 -> 调用 message_cb -> Dispatcher::onMessage -> 根据消息类型查找处理函数 -> 找到并调用对应的回调(rsp_cb或req_cb)
图解
代码
class DiscoveryClient {
public:
using ptr = std::shared_ptr<DiscoveryClient>;
//构造函数传入注册中心的地址信息,用于连接注册中心
DiscoveryClient(const std::string &ip, int port, const Discoverer::OfflineCallback &cb):
_requestor(std::make_shared<Requestor>()),
_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),
_dispatcher(std::make_shared<Dispatcher>()){
auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);
auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_client = ClientFactory::create(ip, port);
_client->setMessageCallback(message_cb);
_client->connect();
}
//向外提供的服务发现接口
bool serviceDiscovery(const std::string &method, Address &host) {
return _discoverer->serviceDiscovery(_client->connection(), method, host);
}
private:
Requestor::ptr _requestor;
client::Discoverer::ptr _discoverer;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _client;
};
RpcClient接口
设计思路
构造函数而言就是绑定了两个回调函数,一个是对于消息到来的回调message_cb,一个是对于响应的回调rsp_cb,不过这个message_cb跟之前的不一样,它分两种情况,在直连模式下,直接设置message_cb。但是在服务发现模式下,是在DiscoveryClient内部设置自己的message_cb用于与注册中心通信。
需要一个enableDiscovery,进行判断是否启动了服务发现,如果启动了服务发现就创建一个服务下线回调函数,绑定到当前RpcClient的delClient方法,当服务下线时,这个回调会被触发。还需要创建服务发现客户端,将下线回调传递给它,这样DiscoveryClient在接收到服务下线通知时会自动调用RpcClient的delClient方法。
为什么需要这个下线回调函数呢?因为这种设计确保了当服务提供者下线时,RpcClient能够及时从连接池中移除对应连接,防止向不可用服务发送请求,提高系统的容错性和可靠性。
如果没有启用服务发现,就不需要下线通知机制。在直连模式下:如果直连的服务提供者出现问题,RPC调用会直接失败,客户端会立即感知到错误(如连接断开、超时等),而不需要通过通知机制来清理连接池。
向外提供三种call接口:同步call,异步call,回调call,这些接口是对RpcCaller模块的call方法进行的封装,它隐藏了底层的网络通信细节、服务发现过程和消息处理逻辑。
客户端通过RpcClient只需提供方法名、参数和接收结果的方式(同步、Future或回调)就能完成远程服务调用,而不需要关心:
- 如何查找并连接到服务提供者
- 如何构造和序列化RPC请求消息
- 如何发送请求和接收响应
- 如何处理服务提供者动态上下线
- 如何管理和复用客户端连接
其次还需要一个连接池,用于管理已经发现了的服务,在服务发现模式下:先通过服务发现获取提供服务的主机地址,查找连接池中是否已有该地址的连接,没有则创建新连接并加入连接池
对内设置了一些私有方法,包括newClient方法负责创建新连接,getClient(Address)方法用于从连接池查找已有连接,getClient(string)方法是调用链的核心,putClient方法安全地将连接添加到池中,delClient方法处理连接的移除,这些方法都是对连接池的连接进行管理的
图解
代码
class RpcClient {
public:
using ptr = std::shared_ptr<RpcClient>;
//enableDiscovery--是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址
RpcClient(bool enableDiscovery, const std::string &ip, int port):
_enableDiscovery(enableDiscovery),
_requestor(std::make_shared<Requestor>()),
_dispatcher(std::make_shared<Dispatcher>()),
_caller(std::make_shared<bitrpc::client::RpcCaller>(_requestor)) {
//针对rpc请求后的响应进行的回调处理
auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);
//如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client
//如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好rpc_client
if (_enableDiscovery) {
auto offline_cb = std::bind(&RpcClient::delClient, this, std::placeholders::_1);
_discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);
}else {
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_rpc_client = ClientFactory::create(ip, port);
_rpc_client->setMessageCallback(message_cb);
_rpc_client->connect();
}
}
bool call(const std::string &method, const Json::Value ¶ms, Json::Value &result) {
//获取服务提供者:1. 服务发现; 2. 固定服务提供者
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
return false;
}
//3. 通过客户端连接,发送rpc请求
return _caller->call(client->connection(), method, params, result);
}
bool call(const std::string &method, const Json::Value ¶ms, RpcCaller::JsonAsyncResponse &result) {
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
return false;
}
//3. 通过客户端连接,发送rpc请求
return _caller->call(client->connection(), method, params, result);
}
bool call(const std::string &method, const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb) {
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
return false;
}
//3. 通过客户端连接,发送rpc请求
return _caller->call(client->connection(), method, params, cb);
}
private:
BaseClient::ptr newClient(const Address &host) {
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
auto client = ClientFactory::create(host.first, host.second);
client->setMessageCallback(message_cb);
client->connect();
putClient(host, client);
return client;
}
BaseClient::ptr getClient(const Address &host) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _rpc_clients.find(host);
if (it == _rpc_clients.end()) {
return BaseClient::ptr();
}
return it->second;
}
BaseClient::ptr getClient(const std::string method) {
BaseClient::ptr client;
if (_enableDiscovery) {
//1. 通过服务发现,获取服务提供者地址信息
Address host;
bool ret = _discovery_client->serviceDiscovery(method, host);
if (ret == false) {
ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());
return BaseClient::ptr();
}
//2. 查看服务提供者是否已有实例化客户端,有则直接使用,没有则创建
client = getClient(host);
if (client.get() == nullptr) {//没有找打已实例化的客户端,则创建
client = newClient(host);
}
}else {
client = _rpc_client;
}
return client;
}
void putClient(const Address &host, BaseClient::ptr &client) {
std::unique_lock<std::mutex> lock(_mutex);
_rpc_clients.insert(std::make_pair(host, client));
}
void delClient(const Address &host) {
std::unique_lock<std::mutex> lock(_mutex);
_rpc_clients.erase(host);
}
private:
struct AddressHash {
size_t operator()(const Address &host) const{
std::string addr = host.first + std::to_string(host.second);
return std::hash<std::string>{}(addr);
}
};
bool _enableDiscovery;
DiscoveryClient::ptr _discovery_client;
Requestor::ptr _requestor;
RpcCaller::ptr _caller;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _rpc_client;//用于未启用服务发现
std::mutex _mutex;
//<"127.0.0.1:8080", client1>
std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients;//用于服务发现的客户端连接池
};
TopicClient接口
设计思路
这个TopicClient实现的逻辑就是:向外提供一组主题管理接口(create/remove/subscribe/cancel/publish),这些接口是对TopicManager模块相应方法的封装,它隐藏了底层的网络通信细节和消息处理逻辑。
客户端通过TopicClient只需提供主题名称和必要参数就能完成主题的创建、删除、订阅和发布,而不需要关心:
- 如何创建与主题服务器的连接
- 如何构造主题请求消息
- 如何发送请求和接收响应
- 如何处理响应结果和主题消息推送
其次就是对于构造函数里面就是绑定三个回调函数:
- 响应回调rsp_cb:处理服务器对主题操作请求的响应
- 消息推送回调msg_cb:接收发布到订阅主题的消息
- 一级消息回调message_cb:处理所有网络消息
这三个回调函数之间存在层级调用关系:
message_cb是一级回调:
- 网络层收到任何消息都会触发这个回调
- 它将所有收到的消息转发给Dispatcher::onMessage方法
- 这是消息进入系统的统一入口点
rsp_cb和msg_cb是二级回调:
- 它们通过registerHandler注册到Dispatcher中
- 分别只处理特定类型的消息(RSP_TOPIC和REQ_TOPIC)
- 在Dispatcher::onMessage内部被调用
就是当执行主题操作后,你肯定要收到响应确认成功与否,同时订阅主题后需要接收发布的消息。所以TopicClient设置了这些回调函数,当有消息到来,先调用message_cb,把消息传递给Dispatcher进行筛选,Dispatcher根据类型调用对应的处理函数。
网络层收到消息 -> 调用message_cb -> Dispatcher::onMessage -> 根据消息类型查找处理函数 -> 找到并调用对应回调(rsp_cb或msg_cb)
图解
代码
class TopicClient {
public:
TopicClient(const std::string &ip, int port):
_requestor(std::make_shared<Requestor>()),
_dispatcher(std::make_shared<Dispatcher>()),
_topic_manager(std::make_shared<TopicManager>(_requestor)) {
auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);
auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_rpc_client = ClientFactory::create(ip, port);
_rpc_client->setMessageCallback(message_cb);
_rpc_client->connect();
}
bool create(const std::string &key) {
return _topic_manager->create(_rpc_client->connection(), key);
}
bool remove(const std::string &key) {
return _topic_manager->remove(_rpc_client->connection(), key);
}
bool subscribe(const std::string &key, const TopicManager::SubCallback &cb) {
return _topic_manager->subscribe(_rpc_client->connection(), key, cb);
}
bool cancel(const std::string &key) {
return _topic_manager->cancel(_rpc_client->connection(), key);
}
bool publish(const std::string &key, const std::string &msg) {
return _topic_manager->publish(_rpc_client->connection(), key, msg);
}
void shutdown() {
_rpc_client->shutdown();
}
private:
Requestor::ptr _requestor;
TopicManager::ptr _topic_manager;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _rpc_client;//用于未启用服务发现
};
整体代码
#include "../common/dispatcher.hpp"
#include "requestor.hpp"
#include "rpc_caller.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"
namespace bitrpc {
namespace client {
class RegistryClient {
public:
using ptr = std::shared_ptr<RegistryClient>;
//构造函数传入注册中心的地址信息,用于连接注册中心
RegistryClient(const std::string &ip, int port):
_requestor(std::make_shared<Requestor>()),
_provider(std::make_shared<client::Provider>(_requestor)),
_dispatcher(std::make_shared<Dispatcher>()) {
auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_client = ClientFactory::create(ip, port);
_client->setMessageCallback(message_cb);
_client->connect();
}
//向外提供的服务注册接口
bool registryMethod(const std::string &method, const Address &host) {
return _provider->registryMethod(_client->connection(), method, host);
}
private:
Requestor::ptr _requestor;
client::Provider::ptr _provider;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _client;
};
class DiscoveryClient {
public:
using ptr = std::shared_ptr<DiscoveryClient>;
//构造函数传入注册中心的地址信息,用于连接注册中心
DiscoveryClient(const std::string &ip, int port, const Discoverer::OfflineCallback &cb):
_requestor(std::make_shared<Requestor>()),
_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),
_dispatcher(std::make_shared<Dispatcher>()){
auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);
auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_client = ClientFactory::create(ip, port);
_client->setMessageCallback(message_cb);
_client->connect();
}
//向外提供的服务发现接口
bool serviceDiscovery(const std::string &method, Address &host) {
return _discoverer->serviceDiscovery(_client->connection(), method, host);
}
private:
Requestor::ptr _requestor;
client::Discoverer::ptr _discoverer;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _client;
};
class RpcClient {
public:
using ptr = std::shared_ptr<RpcClient>;
//enableDiscovery--是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址
RpcClient(bool enableDiscovery, const std::string &ip, int port):
_enableDiscovery(enableDiscovery),
_requestor(std::make_shared<Requestor>()),
_dispatcher(std::make_shared<Dispatcher>()),
_caller(std::make_shared<bitrpc::client::RpcCaller>(_requestor)) {
//针对rpc请求后的响应进行的回调处理
auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);
//如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client
//如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好rpc_client
if (_enableDiscovery) {
auto offline_cb = std::bind(&RpcClient::delClient, this, std::placeholders::_1);
_discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);
}else {
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_rpc_client = ClientFactory::create(ip, port);
_rpc_client->setMessageCallback(message_cb);
_rpc_client->connect();
}
}
bool call(const std::string &method, const Json::Value ¶ms, Json::Value &result) {
//获取服务提供者:1. 服务发现; 2. 固定服务提供者
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
return false;
}
//3. 通过客户端连接,发送rpc请求
return _caller->call(client->connection(), method, params, result);
}
bool call(const std::string &method, const Json::Value ¶ms, RpcCaller::JsonAsyncResponse &result) {
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
return false;
}
//3. 通过客户端连接,发送rpc请求
return _caller->call(client->connection(), method, params, result);
}
bool call(const std::string &method, const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb) {
BaseClient::ptr client = getClient(method);
if (client.get() == nullptr) {
return false;
}
//3. 通过客户端连接,发送rpc请求
return _caller->call(client->connection(), method, params, cb);
}
private:
BaseClient::ptr newClient(const Address &host) {
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
auto client = ClientFactory::create(host.first, host.second);
client->setMessageCallback(message_cb);
client->connect();
putClient(host, client);
return client;
}
BaseClient::ptr getClient(const Address &host) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _rpc_clients.find(host);
if (it == _rpc_clients.end()) {
return BaseClient::ptr();
}
return it->second;
}
BaseClient::ptr getClient(const std::string method) {
BaseClient::ptr client;
if (_enableDiscovery) {
//1. 通过服务发现,获取服务提供者地址信息
Address host;
bool ret = _discovery_client->serviceDiscovery(method, host);
if (ret == false) {
ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());
return BaseClient::ptr();
}
//2. 查看服务提供者是否已有实例化客户端,有则直接使用,没有则创建
client = getClient(host);
if (client.get() == nullptr) {//没有找打已实例化的客户端,则创建
client = newClient(host);
}
}else {
client = _rpc_client;
}
return client;
}
void putClient(const Address &host, BaseClient::ptr &client) {
std::unique_lock<std::mutex> lock(_mutex);
_rpc_clients.insert(std::make_pair(host, client));
}
void delClient(const Address &host) {
std::unique_lock<std::mutex> lock(_mutex);
_rpc_clients.erase(host);
}
private:
struct AddressHash {
size_t operator()(const Address &host) const{
std::string addr = host.first + std::to_string(host.second);
return std::hash<std::string>{}(addr);
}
};
bool _enableDiscovery;
DiscoveryClient::ptr _discovery_client;
Requestor::ptr _requestor;
RpcCaller::ptr _caller;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _rpc_client;//用于未启用服务发现
std::mutex _mutex;
//<"127.0.0.1:8080", client1>
std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients;//用于服务发现的客户端连接池
};
class TopicClient {
public:
TopicClient(const std::string &ip, int port):
_requestor(std::make_shared<Requestor>()),
_dispatcher(std::make_shared<Dispatcher>()),
_topic_manager(std::make_shared<TopicManager>(_requestor)) {
auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);
auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(),
std::placeholders::_1, std::placeholders::_2);
_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);
auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),
std::placeholders::_1, std::placeholders::_2);
_rpc_client = ClientFactory::create(ip, port);
_rpc_client->setMessageCallback(message_cb);
_rpc_client->connect();
}
bool create(const std::string &key) {
return _topic_manager->create(_rpc_client->connection(), key);
}
bool remove(const std::string &key) {
return _topic_manager->remove(_rpc_client->connection(), key);
}
bool subscribe(const std::string &key, const TopicManager::SubCallback &cb) {
return _topic_manager->subscribe(_rpc_client->connection(), key, cb);
}
bool cancel(const std::string &key) {
return _topic_manager->cancel(_rpc_client->connection(), key);
}
bool publish(const std::string &key, const std::string &msg) {
return _topic_manager->publish(_rpc_client->connection(), key, msg);
}
void shutdown() {
_rpc_client->shutdown();
}
private:
Requestor::ptr _requestor;
TopicManager::ptr _topic_manager;
Dispatcher::ptr _dispatcher;
BaseClient::ptr _rpc_client;//用于未启用服务发现
};
}
}