Etcd 框架

发布于:2024-11-23 ⋅ 阅读:(11) ⋅ 点赞:(0)

基本了解

客户端、长连接与租约的关系

客户端对象

etcd的客户端对象是用户与etcd服务进行交互的主要接口,主要功能就是存储、通知和事务等功能访问

  • 键值存储:客户端通过put 和 get操作存储数据;数据存储在etcd的层级化键值数据库中
  • 监听器(Watcher):客户端可以监听指定键的变化事件,如果键值发生改变,etcd会通过回调通知客户端;借助监听器可以实现动态管理配置和服务发现
  • 事务:提供txn接口,允许客户端进行原子性操作

工作原理

  • 客户端通过gRpc与etcd集群进行交互
  • 连接时,客户端需要提供集群的地址和认证信息
  • 连接建立后,客户端可以调用功能不同API完成具体功能

长连接

长连接是指客户端与etcd服务之间的持续连接,主要用于保持会话状态以及实现实时性功能

  • 保持租约长期有效:租约需要客户端通过长连接向etcd定期发送心跳包,否则租约会到期
  • 监听:监听功能依赖于长连接,当监听的键发生变化的时候,etcd通过长连接向客户端推送事件
  • 减少开销:长连接避免了频繁的连接断开,节省了建立连接的时间和资源

工作原理

  • 客户端与 etcd 集群节点通过 gRPC 建立连接
  • 建立连接后,客户端发送 KeepAlive 请求以保持连接
  • 如果 etcd 未在一定时间内接收到心跳,会认为连接断开

租约

etcd提供的一种机制,主要用于一段时间内绑定键值数据,并确保键值在租约有效期内有效

  • 动态生存时间:键值可以绑定到租约,租约到期后,绑定的键值会自动删除
  • 分布式锁:租约结合键值和keeplive实现分布式锁,确保资源的唯一占用
  • Session管理:通过租约,可以熟实现分布式会话管理

工作原理

  • 客户端调用 LeaseGrant 接口创建租约,并设置租期
  • 将键值与租约绑定。例如,使用 Put 命令时指定租约ID
  • 客户端通过长连接定期向 etcd 发送心跳,续租以保持租约有效
  • 如果未续约,租约到期后,绑定的键值会自动被删除

应用:服务实例向 etcd 注册自己(通过租约绑定信息),当实例下线或不可用时,租约自动到期并移除信息

基本使用

安装

框架安装

GitHub - etcd-cpp-apiv3/etcd-cpp-apiv3: The etcd-cpp-apiv3 is a C++ library for etcd's v3 client APIs, i.e., ETCDCTL_API=3.

验证动态库和相应的文件是否存在

使用

上传和获取逻辑

// get.cc

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <thread>

// 回调函数:用于处理监控到的键值变化事件
void callback(const etcd::Response &resp) {
    // 如果收到的事件通知无效,打印错误信息并返回
    if (!resp.is_ok()) {
        std::cout << "收到一个错误的事件通知: " << resp.error_message() << std::endl;
        return;
    }

    // 遍历事件列表,逐个处理每个事件
    for (auto const& ev : resp.events()) {
        // 如果事件类型是 PUT(键值被创建或修改)
        if (ev.event_type() == etcd::Event::EventType::PUT) {
            std::cout << "服务信息发生了改变:\n";
            std::cout << "当前的值:" << ev.kv().key() << " - " << ev.kv().as_string() << std::endl;
            std::cout << "原来的值:" << ev.prev_kv().key() << " - " << ev.prev_kv().as_string() << std::endl;
        }
        // 如果事件类型是 DELETE(键值被删除)
        else if (ev.event_type() == etcd::Event::EventType::DELETE_) {
            std::cout << "服务信息下线被删除:\n";
            std::cout << "当前的值:" << ev.kv().key() << " - " << ev.kv().as_string() << std::endl;
            std::cout << "原来的值:" << ev.prev_kv().key() << " - " << ev.prev_kv().as_string() << std::endl;
        }
    }
}

int main(int argc, char *argv[])
{
    // 定义etcd服务的地址,默认使用本地运行的etcd服务
    std::string etcd_host = "http://127.0.0.1:2379";

    // 1. 实例化etcd客户端对象,用于和etcd服务交互
    etcd::Client client(etcd_host);

    // 2. 获取 "/service" 路径下的所有键值对
    auto resp = client.ls("/service").get(); // 同步调用获取响应
    if (!resp.is_ok()) {
        // 如果获取失败,打印错误信息并退出程序
        std::cout << "获取键值对数据失败: " << resp.error_message() << std::endl;
        return -1;
    }

    // 3. 遍历返回的键值对,打印每个键值对的信息
    int sz = resp.keys().size(); // 获取键值对的数量
    for (int i = 0; i < sz; ++i) {
        std::cout << resp.value(i).as_string() << " 可以提供 " << resp.key(i) << " 服务\n";
    }

    // 4. 实例化一个Watcher对象,用于监听 "/service" 路径下的键值变化
    // 参数说明:
    // - `client`: 使用前面实例化的客户端对象
    // - "/service": 要监听的路径
    // - `callback`: 监听到键值变化后调用的回调函数
    // - `true`: 是否递归监听路径下的子路径(true 表示递归)
    auto watcher = etcd::Watcher(client, "/service", callback, true);

    // 5. 启动Watcher,阻塞当前线程,持续监听事件,直到程序被中断
    watcher.Wait();

    // 6. 结束程序
    return 0;
}
//put.cc

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <thread>

int main(int argc, char *argv[])
{
    std::string etcd_host = "http://127.0.0.1:2379";
    //实例化客户端对象
    etcd::Client client(etcd_host);
    //获取租约保活对象--伴随着创建一个指定有效时长的租约
    auto keep_alive = client.leasekeepalive(3).get();
    //获取租约ID
    auto lease_id = keep_alive->Lease();
    //向etcd新增数据
    auto resp1 = client.put("/service/user", "127.0.0.1:8080", lease_id).get();
    if (resp1.is_ok() == false) {
        std::cout << "新增数据失败:" << resp1.error_message() << std::endl;
        return -1;
    }
    auto resp2 = client.put("/service/friend", "127.0.0.1:9090").get();
    if (resp2.is_ok() == false) {
        std::cout << "新增数据失败:" << resp2.error_message() << std::endl;
        return -1;
    }
    std::this_thread::sleep_for(std::chrono::seconds(10));
    return 0;
}

接口学习

基于上传和获取学习接口

etcd::Client

etcd::Client 是核心客户端类,用于与 etcd 服务进行交互,提供基本的键值存取和操作功能

构造函数

  • etcd_url:etcd 服务地址(例如 http://127.0.0.1:2379
  • 创建一个与 etcd 服务通信的客户端对象
etcd::Client client(const std::string &etcd_url);

ls() 方法

  • 列出路径下所有的键值
  • key:路径键,表示需要获取其子键的根路径(例如 /service
  • 返回一个 std::future<etcd::Response>,可以通过 .get() 同步获取结果
std::future<etcd::Response> ls(const std::string &key);
auto resp = client.ls("/service").get();
if (resp.is_ok()) {
    // 获取成功,遍历键值
    for (const auto &key : resp.keys()) {
        std::cout << key << ": " << resp.value(key).as_string() << std::endl;
    }
} else {
    std::cerr << "获取失败: " << resp.error_message() << std::endl;
}

etcd::Watcher

 用于监听指定路径下的键值变化事件,并触发回调函数处理这些事件

构造函数

  • clientetcd::Client 对象,用于连接 etcd 服务。
  • key:要监听的路径键(例如 /service)。
  • callback:回调函数,当监听到键值变化时触发,参数是 etcd::Response 对象。
  • recursive:是否递归监听子路径下的键值变化,默认为 false
  • 功能:创建一个监听器对象,监听制定路径下的键值变化
etcd::Watcher watcher(etcd::Client &client, const std::string &key,
                      std::function<void(const etcd::Response &)> callback,
                      bool recursive = false);

Wait() 方法

  • 阻塞当前线程,持续监听键值变化事件,直到被中断
auto watcher = etcd::Watcher(client, "/service", callback, true);
watcher.Wait(); // 阻塞线程等待事件

 etcd::Response

 etcd 请求或事件的响应对象,包含操作结果及相关数据

 is_ok() 方法

  • 判断请求是否成功:成功则返回true,失败则返回false
bool is_ok() const;

error_message() 方法

  • 获取请求或者事件失败的错误信息
std::string error_message() const;

 keys() 方法

  • 获取路径下所有键的列表
std::vector<std::string> keys() const;

value(key) 方法

  •  返回指向键的值对象
etcd::Value value(const std::string &key) const;

 events() 方法

  • 获取监听器捕获到的事件列表
std::vector<etcd::Event> events() const;

etcd::Value

etcd::Value 表示 etcd 中的键值对,包含键、值及其元数据

key() 方法

  • 获取键的名称
std::string key() const;

 as_string() 方法

  • 获取键对应的值(字符串形式)
std::string as_string() const;

etcd::Value val = resp.value(key);
std::cout << "Key: " << val.key() << ", Value: " << val.as_string() << std::endl;

二次封装 

基本思想

服务注册客户端类(Registry)

主要目标

  • 向etcd注册服务信息(通过键值对方式)并确保其活跃状态
  • 借用租约机制保证注册信息的有效性,如果租约过期,服务注册信息回自动清除

实现逻辑

  • 创建一个etcd客户端,用于与etcd进行通信
  • 创建租约(Lease),为服务注册信息绑定一个动态生存空间
  • 定期发送心跳保活(KeepLive),续约租约,确保服务信息不会被自动删除
  • 提供一个registry方法,用于注册服务信息(键值对)

服务发现客户端类

主要目标

  • 从etcd中获取指定路径下的当前服务信息
  • 通过监听机制实时监控服务信息的变化(例如服务新增、服务下线)
  • 使用回调函数处理服务变更事件

实现逻辑

  • 创建etcd客户端用于与etcd通信
  • 使用ls方法获取当前路径下所有的键值对,处理当前已有数据
  • 创建Watcher对象,监听指定路径下的数据变更事件
  • 在事件回调中,根据事件的类型调用对应的回调函数

具体实现

细节补充:Watcher中使用Bind进行回调的逻辑总结

  • Watcher 检测到事件etcd::Watcher 检测到指定路径的键值发生了变化(例如新增或删除)
  • Watcher 调用回调函数Watcher 将变化事件封装为 etcd::Response 对象,调用绑定后的函数对象,并将 etcd::Response 作为参数传递
  • 绑定的函数对象调用 callback:其中,thisDiscovery 类的实例,respetcd::Watcher 提供的事件响应
  • callback 函数执行callback 使用 resp 提供的事件信息,执行具体的业务逻辑(例如调用 _put_cb_del_cb
this->callback(resp);

etcd二次封装实现 

#pragma once
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <functional>
#include "logger.hpp"

namespace mag {
    // 服务注册客户端类
    class Registry 
    {
    public:
        using ptr = std::shared_ptr<Registry>;

        // 构造函数:初始化etcd客户端和租约(Lease)机制
        Registry(const std::string &host):
            _client(std::make_shared<etcd::Client>(host)), // 创建etcd客户端
            _keep_alive(_client->leasekeepalive(3).get()), // 创建一个3秒TTL的租约并保持续约
            _lease_id(_keep_alive->Lease()){} // 获取租约ID

        // 析构函数:取消租约续约,释放资源
        ~Registry() { _keep_alive->Cancel(); }

        // 注册服务信息:将键值对存入etcd并绑定租约
        bool registry(const std::string &key, const std::string &val) 
        {
            auto resp = _client->put(key, val, _lease_id).get(); // 绑定租约,写入键值对
            if (resp.is_ok() == false) 
            {
                LOG_ERROR("注册数据失败:{}", resp.error_message()); // 打印错误日志
                return false;
            }
            return true; // 返回注册结果
        }
    private:
        std::shared_ptr<etcd::Client> _client; // etcd客户端对象
        std::shared_ptr<etcd::KeepAlive> _keep_alive; // 租约续约对象
        uint64_t _lease_id; // 租约ID
    };

    // 服务发现客户端类
    class Discovery 
    {
    public:
        using ptr = std::shared_ptr<Discovery>;
        using NotifyCallback = std::function<void(std::string, std::string)>;

        // 构造函数:初始化etcd客户端,执行服务发现和事件监听
        Discovery(const std::string &host, 
            const std::string &basedir,
            const NotifyCallback &put_cb,
            const NotifyCallback &del_cb)
            :_client(std::make_shared<etcd::Client>(host)), // 创建etcd客户端
             _put_cb(put_cb), _del_cb(del_cb) // 设置回调函数
            {
            // 服务发现:获取当前路径下的已有服务数据
            auto resp = _client->ls(basedir).get(); // 列出路径下的所有键值
            if (resp.is_ok() == false) 
            {
                LOG_ERROR("获取服务信息数据失败:{}", resp.error_message()); // 打印错误日志
            }
            int sz = resp.keys().size();
            for (int i = 0; i < sz; ++i) 
            {
                if (_put_cb) _put_cb(resp.key(i), resp.value(i).as_string()); // 调用服务新增回调
            }

            // 事件监控:监听路径数据变更并调用回调处理
            _watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir,
                std::bind(&Discovery::callback, this, std::placeholders::_1), true);
        }

        // 析构函数:取消事件监听,释放资源
        ~Discovery() {
            _watcher->Cancel();
        }
    private:
        // 事件回调函数:处理服务新增和删除事件
        void callback(const etcd::Response &resp) 
        {
            if (resp.is_ok() == false) {
                LOG_ERROR("收到一个错误的事件通知: {}", resp.error_message()); // 打印错误日志
                return;
            }
            for (auto const& ev : resp.events()) 
            {
                // 新增服务事件
                if (ev.event_type() == etcd::Event::EventType::PUT) {
                    if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string()); // 调用服务新增回调
                    LOG_DEBUG("新增服务:{}-{}", ev.kv().key(), ev.kv().as_string()); // 打印调试信息
                }
                // 服务下线事件
                else if (ev.event_type() == etcd::Event::EventType::DELETE_) {
                    if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string()); // 调用服务删除回调
                    LOG_DEBUG("下线服务:{}-{}", ev.prev_kv().key(), ev.prev_kv().as_string()); // 打印调试信息
                }
            }
        }
    private:
        NotifyCallback _put_cb; // 服务新增事件回调
        NotifyCallback _del_cb; // 服务删除事件回调
        std::shared_ptr<etcd::Client> _client; // etcd客户端对象
        std::shared_ptr<etcd::Watcher> _watcher; // 事件监听器
    };
}

 测试注册和发现逻辑

#include "etcd.hpp"
#include <iostream>
#include <chrono>
#include <thread>

// 服务新增回调
void onServiceAdded(const std::string &key, const std::string &value) {
    std::cout << "发现新增服务:" << key << " -> " << value << std::endl;
}

// 服务下线回调
void onServiceRemoved(const std::string &key, const std::string &value) {
    std::cout << "服务下线:" << key << " -> " << value << std::endl;
}

int main() {
    std::string etcd_host = "http://127.0.0.1:2379";
    std::string watch_dir = "/service";

    // 创建Discovery对象
    mag::Discovery::ptr discovery = std::make_shared<mag::Discovery>(
        etcd_host, watch_dir, onServiceAdded, onServiceRemoved
    );

    // 保持监听状态
    std::cout << "开始监听服务变更,按 Ctrl+C 退出..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(600));

    return 0;
}
#include "etcd.hpp"
#include <iostream>
#include <chrono>
#include <thread>

int main() {
    std::string etcd_host = "http://127.0.0.1:2379";
    std::string key = "/service/example";
    std::string value = "127.0.0.1:8080";

    // 创建Registry对象
    mag::Registry::ptr registry = std::make_shared<mag::Registry>(etcd_host);

    // 注册服务
    if (registry->registry(key, value)) {
        std::cout << "服务注册成功:" << key << " -> " << value << std::endl;
    } else {
        std::cerr << "服务注册失败!" << std::endl;
        return -1;
    }

    // 保持注册信息的活跃状态
    std::cout << "服务正在运行,按 Ctrl+C 退出..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(600));

    return 0;
}

问题

编译问题慢

问题描述

 多次对该文件进行编译,编译总是卡在80%左右停止,服务器环境是2核2G的云服务器

分析

 经htop进行排查,发现部分服务占用了大量内存,从而导致编译写入和读取文件速度变慢

解决思路

停止占用内存较高的MySQL服务

sudo systemctl stop mysql

清理缓存和无用数据

sync; echo 3 | sudo tee /proc/sys/vm/drop_caches

删除临时文件进一步释放空间

sudo apt autoremove -y
sudo apt clean

使用两个线程对程序进行编译

make -j2