接口介绍
头文件
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/SyncClient.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
下面从功能介绍几个类的概念
Value :保存键值对的 key 和 value
Event:记录键值对是否改变或者被删除的状态
Response :etcd 服务器向客户端的响应
KeepAlive : 客户端向 etcd 保活数据
Client : 客户端类
Watcher : 检测 etcd 服务器上键值对是否发生改变
下面是接口
namespace etcd {
class Value {
bool is_dir();//判断是否是一个目录
std::string const& key() //键值对的 key 值
std::string const& as_string()//键值对的 val 值
int64_t lease() //用于创建租约的响应中,返回租约 ID
}
//etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端
//在通知客户端的时候,会返回改变前的数据和改变后的数据
class Event {
enum class EventType {
PUT, //键值对新增或数据发生改变
DELETE_,//键值对被删除
INVALID,
};
enum EventType event_type()
const Value& kv()
const Value& prev_kv()
}
class Response {
bool is_ok()
std::string const& error_message()
Value const& value()//当前的数值 或者 一个请求的处理结果
Value const& prev_value()//之前的数值
Value const& value(int index)//
std::vector<Event> const& events();//触发的事件
}
class KeepAlive {
KeepAlive(Client const& client, int ttl, int64_t lease_id =
0);
//返回租约 ID
int64_t Lease();
//停止保活动作
void Cancel();
}
class Client {
// etcd_url: "http://127.0.0.1:2379"
Client(std::string const& etcd_url,
std::string const& load_balancer = "round_robin");
//Put a new key-value pair 新增一个键值对
pplx::task<Response> put(std::string const& key,
std::string const& value);
//新增带有租约的键值对 (一定时间后,如果没有续租,数据自动删除)
pplx::task<Response> put(std::string const& key,
std::string const& value,
const int64_t leaseId);
//获取一个指定 key 目录下的数据列表
pplx::task<Response> ls(std::string const& key);
//创建并获取一个存活 ttl 时间的租约
pplx::task<Response> leasegrant(int ttl);
//获取一个租约保活对象,其参数 ttl 表示租约有效时间
pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int
ttl);
//撤销一个指定的租约
pplx::task<Response> leaserevoke(int64_t lease_id);
//数据锁
pplx::task<Response> lock(std::string const& key);
}
class Watcher {
Watcher(Client const& client,
std::string const& key, //要监控的键值对 key
std::function<void(Response)> callback, //发生改变后的回调
bool recursive = false); //是否递归监控目录下的所有数据改变
Watcher(std::string const& address,
std::string const& key,
std::function<void(Response)> callback,
bool recursive = false);
//阻塞等待,直到监控任务被停止
bool Wait();
bool Cancel();
}
二次封装
设计两个类 Register Discover ,Register 用来向 etcd 服务器注册键值对,它会向 etcd 服务器申请一个租约ID,负责数据保活。Discover 查看服务器键值对,并检测键值对是否被改变或删除。
Register 类只需封装 Client KeepAlive 类即可,通过KeepAlive 得到租约ID 并且向服务器不断进行保活,Client ::put 向 etcd 服务器添加或更改键值对。代码如下
class Register
{
public:
//构造,传入 etcd 服务器地址
Register(const std::string& url = "http://127.0.0.1:2379")
:_client_ptr(std::make_shared<etcd::Client>(url)),
_keep_alive_ptr(std::make_shared<etcd::KeepAlive>(*(_client_ptr.get()), 3)),
_lease_id(_keep_alive_ptr->Lease())
{}
//析构,撤销指定租约
~Register()
{
_client_ptr->leaserevoke(_lease_id);
}
//向 etcd 添加服务
void put(const std::string& key, const std::string& value)
{
if(false == _client_ptr->put(key, value, _lease_id).get().is_ok())
{
LOG_ERROR("etcd 添加服务失败 {} : {}", key, value);
}
else
{
LOG_INFO("etcd 添加服务成功 {} : {}", key, value);
}
}
private:
std::shared_ptr<etcd::Client> _client_ptr; //客户端实体
std::shared_ptr<etcd::KeepAlive> _keep_alive_ptr; //保活
int64_t _lease_id; //租约ID
};
Discover 封装 Client 和 Watcher ,检测 etcd 服务器键值对的变化。同时设置两个回调函数当键值对改变时,调用回调函数。
class Discover
{
using notify_callback = std::function<void(std::string, std::string)>;
public:
//四个参数为: etcd 服务器地址, 根目录, 服务添加回调,服务删除回调
Discover(std::string url, std::string base_dir, notify_callback put_cb, notify_callback del_cd)
:_client_ptr(std::make_shared<etcd::Client>(url)),
_watcher_ptr(std::make_shared<etcd::Watcher>(*(_client_ptr.get()), base_dir, std::bind(&Discover::callback, this, std::placeholders::_1), true)),
_put_cb(put_cb),
_del_cb(del_cd)
{
//ls查看根目录下所有服务
etcd::Response response = _client_ptr->ls(base_dir).get();
if(false == response.is_ok())
{
LOG_ERROR("获取服务失败 {}", base_dir);
}
else
{
for(int i = 0; i < response.keys().size(); i++)
{
if(_put_cb) _put_cb(response.value(i).key(), response.value(i).as_string());
LOG_INFO("上线了 {} : {} 服务", response.value(i).key(), response.value(i).as_string());
}
}
//监控根目录下服务
_watcher_ptr->Wait();
}
~Discover()
{
_watcher_ptr->Cancel();
}
private:
void callback(const etcd::Response& resp)
{
if(false == resp.is_ok())
{
LOG_ERROR("监控服务失败" );
return;
}
else
{
for(const etcd::Event& ev : resp.events())
{
if(ev.event_type() == etcd::Event::EventType::PUT)
{
LOG_INFO("服务发生改变 {} : {} ---> {} : {}", ev.prev_kv().key(), ev.prev_kv().as_string(), ev.kv().key(), ev.kv().as_string());
if(_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());
}
else if(ev.event_type() == etcd::Event::EventType::DELETE_)
{
LOG_INFO("下线了服务 {} : {}", ev.prev_kv().key(), ev.prev_kv().as_string());
if(_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());
}
}
}
}
private:
std::shared_ptr<etcd::Client> _client_ptr; //客户端实体
std::shared_ptr<etcd::Watcher> _watcher_ptr; //监控服务
notify_callback _put_cb;
notify_callback _del_cb;
};