etcd-cpp-apiv3 二次封装

发布于:2025-07-09 ⋅ 阅读:(21) ⋅ 点赞:(0)

接口介绍

头文件

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/SyncClient.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>

下面从功能介绍几个类的概念

Value :保存键值对的 key 和 value

Event:记录键值对是否改变或者被删除的状态

Response :etcd 服务器向客户端的响应
KeepAlive : 客户端向 etcd 保活数据
Client  : 客户端类
Watcher : 检测 etcd 服务器上键值对是否发生改变
下面是接口
namespace etcd {
class Value {
 bool is_dir();//判断是否是一个目录
 std::string const& key() //键值对的 key 值
 std::string const& as_string()//键值对的 val 值
 
 int64_t lease() //用于创建租约的响应中,返回租约 ID
}
//etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端
//在通知客户端的时候,会返回改变前的数据和改变后的数据
class Event {
 enum class EventType {
 PUT, //键值对新增或数据发生改变
 DELETE_,//键值对被删除
 INVALID,
 };
 enum EventType event_type() 
 const Value& kv()
 const Value& prev_kv()
} 
class Response {
 bool is_ok()
 std::string const& error_message()
 Value const& value()//当前的数值 或者 一个请求的处理结果
 Value const& prev_value()//之前的数值
 Value const& value(int index)//
 std::vector<Event> const& events();//触发的事件
} 
class KeepAlive {
 KeepAlive(Client const& client, int ttl, int64_t lease_id = 
0);
 //返回租约 ID
 int64_t Lease();
 //停止保活动作
 void Cancel();
} 
class Client {
 // etcd_url: "http://127.0.0.1:2379"
 Client(std::string const& etcd_url,
 std::string const& load_balancer = "round_robin");
 //Put a new key-value pair 新增一个键值对
 pplx::task<Response> put(std::string const& key, 
 std::string const& value);
 //新增带有租约的键值对 (一定时间后,如果没有续租,数据自动删除)
 pplx::task<Response> put(std::string const& key, 
 std::string const& value,
 const int64_t leaseId);
 //获取一个指定 key 目录下的数据列表
 pplx::task<Response> ls(std::string const& key);
//创建并获取一个存活 ttl 时间的租约
 pplx::task<Response> leasegrant(int ttl);
 //获取一个租约保活对象,其参数 ttl 表示租约有效时间
 pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int 
ttl);
 //撤销一个指定的租约
 pplx::task<Response> leaserevoke(int64_t lease_id);
 //数据锁
 pplx::task<Response> lock(std::string const& key);
} 
class Watcher {
 Watcher(Client const& client, 
 std::string const& key, //要监控的键值对 key
 std::function<void(Response)> callback, //发生改变后的回调
 bool recursive = false); //是否递归监控目录下的所有数据改变
 Watcher(std::string const& address, 
 std::string const& key,
 std::function<void(Response)> callback, 
 bool recursive = false);
 //阻塞等待,直到监控任务被停止
 bool Wait();
 bool Cancel();
}

二次封装

        设计两个类 Register  Discover  ,Register  用来向 etcd 服务器注册键值对,它会向 etcd 服务器申请一个租约ID,负责数据保活。Discover 查看服务器键值对,并检测键值对是否被改变或删除。

        Register  类只需封装 Client KeepAlive 类即可,通过KeepAlive 得到租约ID 并且向服务器不断进行保活,Client ::put  向 etcd 服务器添加或更改键值对。代码如下

class Register
{
    public:
        //构造,传入 etcd 服务器地址
        Register(const std::string& url = "http://127.0.0.1:2379")
        :_client_ptr(std::make_shared<etcd::Client>(url)),
        _keep_alive_ptr(std::make_shared<etcd::KeepAlive>(*(_client_ptr.get()), 3)),
        _lease_id(_keep_alive_ptr->Lease())
        {}
        //析构,撤销指定租约
        ~Register()
        {
            _client_ptr->leaserevoke(_lease_id);
        }
        //向 etcd 添加服务
        void put(const std::string& key, const std::string& value)
        {
            if(false == _client_ptr->put(key, value, _lease_id).get().is_ok())
            {
                LOG_ERROR("etcd 添加服务失败 {} : {}", key, value);
            }
            else
            {
                LOG_INFO("etcd 添加服务成功 {} : {}", key, value);
            }
        }
    private:
        std::shared_ptr<etcd::Client> _client_ptr;          //客户端实体  
        std::shared_ptr<etcd::KeepAlive> _keep_alive_ptr;   //保活
        int64_t _lease_id;                                  //租约ID
};  

        Discover 封装 Client 和 Watcher ,检测 etcd 服务器键值对的变化。同时设置两个回调函数当键值对改变时,调用回调函数。

class Discover
{
    using notify_callback = std::function<void(std::string, std::string)>;
    public:
    //四个参数为: etcd 服务器地址, 根目录, 服务添加回调,服务删除回调
    Discover(std::string url, std::string base_dir, notify_callback put_cb, notify_callback del_cd)
    :_client_ptr(std::make_shared<etcd::Client>(url)),
     _watcher_ptr(std::make_shared<etcd::Watcher>(*(_client_ptr.get()), base_dir, std::bind(&Discover::callback, this, std::placeholders::_1), true)),
     _put_cb(put_cb),
     _del_cb(del_cd)
     {
        //ls查看根目录下所有服务
        etcd::Response response = _client_ptr->ls(base_dir).get();
        if(false == response.is_ok())
        {
            LOG_ERROR("获取服务失败 {}", base_dir);
        }
        else
        {
            for(int i = 0; i < response.keys().size(); i++)
            {
                if(_put_cb) _put_cb(response.value(i).key(), response.value(i).as_string());
                LOG_INFO("上线了 {} : {} 服务", response.value(i).key(), response.value(i).as_string());
            }
        }
        //监控根目录下服务
        _watcher_ptr->Wait();
     }
     ~Discover()
     {
        _watcher_ptr->Cancel();
     }
    private:
        void callback(const etcd::Response& resp)
        {
            if(false == resp.is_ok())
            {
                LOG_ERROR("监控服务失败" );
                return;
            }
            else
            {
                for(const etcd::Event& ev : resp.events())
                {
                    if(ev.event_type() == etcd::Event::EventType::PUT)
                    {
                        LOG_INFO("服务发生改变 {} : {} ---> {} : {}", ev.prev_kv().key(), ev.prev_kv().as_string(), ev.kv().key(), ev.kv().as_string());
                        if(_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());
                    }
                    else if(ev.event_type() == etcd::Event::EventType::DELETE_)
                    {
                        LOG_INFO("下线了服务 {} : {}", ev.prev_kv().key(), ev.prev_kv().as_string());
                        if(_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());
                    }
                }            
            }
        }

    private:
        std::shared_ptr<etcd::Client> _client_ptr;          //客户端实体  
        std::shared_ptr<etcd::Watcher> _watcher_ptr;        //监控服务
        notify_callback _put_cb;
        notify_callback _del_cb;
};