深入解析etcd服务器:概念、接口及管理类封装实现(C++)

发布于:2024-10-17 ⋅ 阅读:(7) ⋅ 点赞:(0)

前言

介绍

etcd 是一个开源的分布式键值存储系统,主要用于存储和管理配置信息、服务发现以及协调分布式系统中的数据。由 CoreOS 开发,基于 Raft 共识算法,确保了高可用性和一致性。

etcd && Redis && ZooKeeper 三者比较

以下是 etcdRedisZooKeeper 的比较表格:

特性 etcd Redis ZooKeeper
数据模型 键值存储 键值存储 层次化键值存储
一致性模型 强一致性(Raft算法) 最终一致性(主从复制) 强一致性(ZAB协议)
主要用途 配置管理、服务发现 缓存、消息队列、会话存储 配置管理、命名、同步服务
高可用性 支持通过分布式架构实现 支持主从复制和 集群模式 通过选举机制确保高可用性
监控/观察功能 支持键变化的实时通知 支持发布/订阅机制 提供事件监听
安全性 支持 TLS 和 RBAC 支持简单的认证和 ACL 支持身份验证和基于ACL的权限
性能 较低延迟,适合小规模操作 高性能,适合高频读写场景 较高延迟,适合协调性操作
数据持久化 支持持久化 可选择持久化模式 数据持久化到磁盘
编程接口 HTTP/gRPC Redis协议、客户端库支持多种语言 ZooKeeper API

总结

  • etcd 适合需要强一致性和配置管理的场景,常用于微服务架构。
  • Redis 更适合高性能缓存和实时数据处理,支持多种数据结构。
  • ZooKeeper 适合需要协调和同步的分布式系统,提供可靠的配置管理。

etcd 的相关类 与 接口

如果要使用etcd,必须先认识etcd 的相关类与功能接口:

// pplx::task 并行库异步结果对象
// 阻塞方式 get(): 阻塞直到任务执行完成,并获取任务结果
// 非阻塞方式 wait(): 等待任务到达终止状态,然后返回任务状态
namespace etcd
{
    class Value
    {
        bool is_dir()//判断是否是一个目录
            std::string const &key()       // 键值对的 key 值
            std::string const &as_string() // 键值对的 val 值
            int64_t lease()                // 用于创建租约的响应中,返回租约 ID
    }
    // etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端
    // 在通知客户端的时候,会返回改变前的数据和改变后的数据
    class Event
    {
        enum class EventType
        {
            PUT,     // 键值对新增或数据发生改变
            DELETE_, // 键值对被删除
            INVALID,
        };
        enum EventType event_type()
            const Value &kv()
                const Value &prev_kv()
    } class Response
    {
        bool is_ok()
            std::string const &error_message()
                Value const &value()            // 当前的数值 或者 一个请求的处理结果
            Value const &prev_value()           // 之前的数值
            Value const &value(int index)       //
            std::vector<Event> const &events(); // 触发的事件
    } class KeepAlive
    {
        KeepAlive(Client const &client, int ttl, int64_t lease_id = 0);
        // 返回租约 ID
        int64_t Lease();
        // 停止保活动作
        void Cancel();
    } class Client
    {
        // etcd_url: "http://127.0.0.1:2379"
        Client(std::string const &etcd_url,
               std::string const &load_balancer = "round_robin");
        // Put a new key-value pair 新增一个键值对
        pplx::task<Response> put(std::string const &key,
                                 std::string const &value);
        // 新增带有租约的键值对 (一定时间后,如果没有续租,数据自动删除)
        pplx::task<Response> put(std::string const &key,
                                 std::string const &value,
                                 const int64_t leaseId);
        // 获取一个指定 key 目录下的数据列表
        pplx::task<Response> ls(std::string const &key);
        // 创建并获取一个存活 ttl 时间的租约
        pplx::task<Response> leasegrant(int ttl);
        // 获取一个租约保活对象,其参数 ttl 表示租约有效时间
        pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int
                                                                  ttl);
        // 撤销一个指定的租约
        pplx::task<Response> leaserevoke(int64_t lease_id);
        // 数据锁
        pplx::task<Response> lock(std::string const &key);
    } class Watcher
    {
        Watcher(Client const &client,
                std::string const &key,                 // 要监控的键值对 key
                std::function<void(Response)> callback, // 发生改变后的回调
                bool recursive = false);                // 是否递归监控目录下的所有数据改变
        Watcher(std::string const &address,
                std::string const &key,
                std::function<void(Response)> callback,
                bool recursive = false);
        // 阻塞等待,直到监控任务被停止
        bool Wait();
        bool Cancel();
    }

封装 服务 注册 / 发现 功能

服务注册

服务注册功能 主要是在 etcd 服务器上存储一个租期 n秒 的保活键值对,表示能提供指定服务的节点主机,比如 /service/user/ins-1 的 key,对应的 val 为提供服务的主机节点地址

<key, val> – </service/user/instance-1, 127.0.0.1:8080>

  • /service主目录,包含不同服务的键值对存储
  • /user服务名称,表示该键值对是一个用户服务的节点
  • /ins-1节点实例名称,提供用户服务可能会有很多节点,每个节点都应该有自己独立且唯一的实例名称

当这个键值对注册之后,服务发现方 可以基于目录进行键值对的发现。

一旦注册节点退出,保活失败,ns 后租约失效,键值对被删除, etcd 会通知发现方数据的失效,进而实现服务下线通知的功能


服务发现

服务发现分为两个过程:

  • 刚启动客户端时,进行 ls 目录浏览,进行/service 路径下所有键值对的获取
  • 对所关心的服务进行 watcher 观测,一旦发生数据变化(新增/删除),收到通知 并进行节点的管理

如果 ls 的路径为/service,则会获取到 /service/user, /service/firend …等其路径下的所有能够提供服务的实例节点数据。

如果 ls 的路径为 /service/user, 则会获取到 /service/user/instancd-1,
/service/user/instance-2,…等所有提供用户服务的实例节点数据。

客户端可以将发现的所有**<实例 - 地址>**管理起来,以便于进行节点的管理:

  • 收到新增数据通知,向本地管理添加新增的节点地址 – 服务上线
  • 收到删除数据通知,从本地管理删除对应的节点地址 – 服务下线

因为管理了所有的能够提供服务的节点主机的地址,因此当需要进行 rpc 调用的时候,则根据服务名称,获取一个能够提供服务的主机节点地址进行访问就可以了,获取策略采用 RR 轮转策略;

封装思想

将 etcd 的操作全部封装起来,不需要管理数据,只需要向外四个基础操作接口:

  • 进行服务注册,也就是向 etcd 添加 <服务-主机地址>的数据
  • 进行服务发现,获取当前所有能提供服务的信息
  • 设置服务上线的处理回调接口
  • 设置服务下线的处理回调接口

封装后,外部的 rpc 调用模块,可以先获取所有的当前服务信息,建立通信连接进行 rpc 调用,也能在有新服务上线的时候新增连接,以及下线的时候移除连接;


代码实例

封装 注册 与 发现 功能

etcd 常用于微服务架构中的配置管理和服务发现,我们可以通过etcd 的相关类与接口进行封装 服务发现/注册的功能:

主要基于 etcd 所提供的可以设置有效时间的键值对存储来实现;

封装服务注册类

服务注册需要以下成员变量:

  • etcd客户端:std::shared_ptr<etcd::Client> _client;
  • 租约:std::shared_ptr<etcd::KeepAlive> _keep_alive;
  • 租约id:uint64_t _lease_id;

以及一个注册函数registry,通过接收[key:value],将key、value、与lease_id加入到client中,完成注册;

// 服务注册客户端类
class Registry
{
private:
    std::shared_ptr<etcd::Client> _client;
    std::shared_ptr<etcd::KeepAlive> _keep_alive;
    uint64_t _lease_id;

public:
    using ptr = std::shared_ptr<Registry>;
    Registry(const std::string &host) : _client(std::make_shared<etcd::Client>(host)),
                                        _keep_alive(_client->leasekeepalive(3).get()),
                                        _lease_id(_keep_alive->Lease())
    {}

    ~Registry() { _keep_alive->Cancel(); }

    // 注册服务
    bool registry(const std::string &key, const std::string &value)
    {
        etcd::Response resp = _client->put(key, value, _lease_id).get();
        if (!resp.is_ok())
        {
            LOG_ERROR("注册数据失败: {}", resp.error_message());
            return false;
        }
    }
};

封装服务发现类

服务发现类包括以下成员变量:

  • 数据改变回调函数:NotifyCallback _put_cb;
  • 数据删除回调函数:NotifyCallback _del_cb;
  • etcd客户端:std::shared_ptr<etcd::Client> _client;
  • 事件监听器:std::shared_ptr<etcd::Watcher> _watcher;

成员函数包括:回调函数callback,负责:

当监听器检测到数据变化,回调函数接收响应后,根据响应的事件类型进行不同的处理

以及构造函数,有以下步骤:

  1. 开始服务发现,从指定路径下获取数据
  2. 根据获取的响应的键值对数量,分别交由回调函数进行处理
  3. 最后进行事件监听绑定
// 服务发现客户端类
class Discovery
{
public:
    using ptr = std::shared_ptr<Discovery>;
    using NotifyCallback = std::function<void(std::string, std::string)>;
private:
    NotifyCallback _put_cb;
    NotifyCallback _del_cb;
    std::shared_ptr<etcd::Client> _client;
    std::shared_ptr<etcd::Watcher> _watcher;
public:
    Discovery(const std::string* host, const std::string& basedir,
              const NotifyCallback& put_cb, const NotifyCallback& del_cb):
              _client(std::make_shared<etcd::Client>(host)), _put_cb(put_cb), _del_cb(del_cb)
    {
        // 1. 进行服务发现,获取指定路径数据
        auto resp = _client->ls(basedir).get();
        if(!resp.is_ok()){
            LOG_ERROR("获取服务信息数据失败: {}", resp.error_message());
            return ;
        }

        int sz = resp.keys().size();
        for(int i = 0; i < sz; ++i){
            if(_put_cb) _put_cb(resp.key(i), resp.value(i).as_string());
        }

        // 2. 事件监听:监控数据改变并调用回调
        _watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir, 
            std::bind(&Discovery::callback, this, std::placeholders::_1, true));
    }
private:
    // 服务发现回调
    void callback(const etcd::Response &resp){
        if(!resp.is_ok()) {
            LOG_ERROR("错误事件通知: {}", resp.error_message());
            return ;
        }

        for(auto const& ev : resp.events()){
            if(ev.event_type() == etcd::Event::EventType::PUT){
                if(_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());                
                LOG_DEBUG("新增服务: {}-{}", ev.kv().key(), ev.kv().as_string());
            } else if(ev.event_type() == etcd::Event::EventType::DELETE_){
                if(_del_cb) _del_cb(ev.kv().key(), ev.kv().as_string());
                LOG_DEBUG("删除服务: {}-{}", ev.kv().key(), ev.kv().as_string());
            }
        }
    }

};

搭建服务[注册-发现]中心

我们可以通过etcd作为服务注册/发现中心,首先需要定义服务的注册与发现逻辑,有以下步骤:

  1. 服务注册:服务启动时,向 Etcd 注册服务的地址和端口。
  2. 服务发现:客户端通过 Etcd 获取服务的地址和端口,用于远程调用。
  3. 健康检查:服务定期向 Etcd 发送心跳,以维持其注册信息的有效性。

封装服务注册类

#include <etcd/Client.hpp>
#include <etcd/Response.hpp>
#include <etcd/KeepAlive.hpp>
#include <thread>
#include <gflags/gflags.h>
#include "../../common/etcd.hpp"

DEFINE_bool(run_mode, false, "程序运行模式: false=调试, true=发布");
DEFINE_string(log_file, "", "发布模式下的日志文件名");
DEFINE_int32(log_level, 0, "发布模式下的日志文件等级");

DEFINE_string(etcd_server, "127.0.0.1:2379", "etcd服务器地址");
DEFINE_string(base_service, "/service", "服务注册的根路径 / 监控根路径");
DEFINE_string(instance_name, "/echo/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:7070", "当前实例的外部访问地址");
DEFINE_int32(listen_port, 7070, "Rpc服务器监听端口");

using namespace mim;

int main(int argc, char *argv[]) {
    // 初始化gflags: 解析命令行标志
    google::ParseCommandLineFlags(&argc, &argv, true);
    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    LOG_DEBUG("服务名称: {}", FLAGS_base_service + FLAGS_instance_name);
    // 注册服务
    Registry::ptr client = std::make_shared<Registry>(FLAGS_etcd_server);
    client->registry(FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);

    return 0;
}

封装服务发现类

#include "../../common/etcd.hpp"

#include <gflags/gflags.h>

DEFINE_bool(run_mode, false, "程序运行模式: false=调试, true=发布");
DEFINE_string(log_file, "", "发布模式下的日志文件名");
DEFINE_int32(log_level, 0, "发布模式下的日志文件等级");

DEFINE_string(etcd_server, "127.0.0.1:2379", "etcd服务器地址");
DEFINE_string(base_service, "/service", "服务注册的根路径 / 监控根路径");
DEFINE_string(instance_name, "/echo/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:7070", "当前实例的外部访问地址");
DEFINE_int32(listen_port, 7070, "Rpc服务器监听端口");

using namespace mim;

void online(const std::string& service_name, const std::string& access_host) {
    LOG_INFO("服务上线: {}-{}", service_name, access_host);
}

void offline(const std::string& service_name, const std::string& access_host) {
    LOG_INFO("服务下线: {}-{}", service_name, access_host);
}

int main(int argc, char* argv[]) {
    google::ParseCommandLineFlags(&argc, &argv, true);
    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    Discovery::ptr client = std::make_shared<Discovery>(FLAGS_etcd_server, FLAGS_base_service, online, offline);

    std::this_thread::sleep_for(std::chrono::seconds(500));

    return 0;
}

结果演示

在这里插入图片描述

在这里插入图片描述


etcd 的安装与配置

etcd

  1. 安装 Etcd:
sudo apt-get install etcd
  1. 启动 Etcd 服务:
sudo systemctl start etcd
  1. 设置 Etcd 开机自启:
sudo systemctl enable etcd
  1. 查看Etcd状态
sudo systemctl status etcd

节点配置

如果是单节点集群,可以不进行配置,默认 etcd 的集群节点通信端口为 2380,客户端访问端口为 2379;

#节点名称,默认为 "default"
ETCD_NAME="etcd1"
#数据目录,默认为 "${name}.etcd"
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"
#用于客户端连接的 URL。
ETCD_LISTEN_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0
.1:2379"
#用于客户端访问的公开,也就是提供服务的 URL
ETCD_ADVERTISE_CLIENT_URLS="http://192.168.65.132:2379,http://127.
0.0.1:2379"
#用于集群节点间通信的 URL。
ETCD_LISTEN_PEER_URLS="http://192.168.65.132:2380"
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://192.168.65.132:2380"
#心跳间隔时间-毫秒
ETCD_HEARTBEAT_INTERVAL=100
#选举超时时间-毫秒
ETCD_ELECTION_TIMEOUT=1000
#以下为集群配置,若无集群则需要注销
#初始集群状态和配置--集群中所有节点
#ETCD_INITIAL_CLUSTER="etcd1=http://192.168.65.132:2380,etcd2=http
://192.168.65.132:2381,etcd3=http://192.168.65.132:2382"
#初始集群令牌-集群的 ID
#ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
#ETCD_INITIAL_CLUSTER_STATE="new"
#以下为安全配置,如果要求 SSL 连接 etcd 的话,把下面的配置启用,并修改文件
路径
#ETCD_CERT_FILE="/etc/ssl/client.pem"
#ETCD_KEY_FILE="/etc/ssl/client-key.pem"
#ETCD_CLIENT_CERT_AUTH="true"
#ETCD_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
#ETCD_AUTO_TLS="true"
#ETCD_PEER_CERT_FILE="/etc/ssl/member.pem"
#ETCD_PEER_KEY_FILE="/etc/ssl/member-key.pem"
#ETCD_PEER_CLIENT_CERT_AUTH="false"
#ETCD_PEER_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
#ETCD_PEER_AUTO_TLS="true"

etcd-cpp-apiv3

etcd-cpp-apiv3 是 etcd 的 C++版本客户端 API。
其 依赖于 mipsasm, boost,protobuf, gRPC, cpprestsdk 等库

依赖库安装:

sudo apt-get install libboost-all-dev libssl-dev
sudo apt-get install libprotobuf-dev protobuf-compiler-grpc
sudo apt-get install libgrpc-dev libgrpc++-dev
sudo apt-get install libcpprest-dev

api 框架安装:

git clone https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/usr
make -j$(nproc) && sudo make install