etcd 在项目中的应用

发布于:2025-07-06 ⋅ 阅读:(12) ⋅ 点赞:(0)

深入理解 etcd:从理论到实践的服务发现与配置管理

前言

在微服务架构中,服务发现和配置管理是两个核心问题。etcd 作为一个高可用的分布式键值存储系统,被广泛应用于服务注册与发现、配置管理、分布式锁等场景。本文将深入探讨 etcd 的核心概念、工作原理,并结合实际项目代码展示其在 Go 微服务架构中的应用。

一、etcd 简介

1.1 什么是 etcd

etcd 是一个分布式、可靠的键值存储系统,用于存储分布式系统中的关键数据。它由 CoreOS 公司开发,使用 Go 语言编写,具有以下特点:

  • 高可用性:支持集群部署,数据自动复制
  • 一致性:基于 Raft 算法保证强一致性
  • 简单易用:提供 HTTP+JSON API 和 gRPC API
  • 安全性:支持 TLS 加密和认证

1.2 etcd 的核心特性

  • 键值存储:支持字符串、数字、JSON 等数据类型
  • 租约机制:支持 TTL(Time To Live)自动过期
  • 监听机制:支持监听键值变化
  • 事务支持:支持原子性操作
  • 版本控制:支持历史版本查询

二、etcd 在微服务架构中的应用

2.1 服务注册与发现

在微服务架构中,服务需要能够动态发现其他服务的位置。etcd 通过以下机制实现服务发现:

  1. 服务注册:服务启动时向 etcd 注册自己的信息
  2. 服务发现:客户端从 etcd 获取服务列表
  3. 健康检查:通过租约机制实现服务健康状态监控

2.2 配置管理

etcd 可以作为配置中心,存储应用程序的配置信息:

  • 数据库连接信息
  • 服务端口配置
  • 功能开关
  • 环境变量

三、实际项目中的 etcd 应用

下面我将结合一个实际的 Go 微服务项目,展示 etcd 的具体应用。

3.1 项目架构概览

该项目采用了典型的微服务架构,包含以下组件:

  • 网关服务(Gate Server):负责客户端连接和消息转发
  • 逻辑服务(Logic Server):处理业务逻辑
  • 登录服务(Login Server):处理用户认证
  • 支付服务(Pay Server):处理支付相关业务
  • 游戏 服务(Game Server):处理游戏逻辑

3.2 服务注册实现

3.2.1 服务注册结构体
// ServiceRegister 创建租约注册服务
type ServiceRegister struct {
    cli           *clientv3.Client                        //etcd client
    leaseID       clientv3.LeaseID                        //租约ID
    keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse //租约keepAlive相应chan
    key           string                                  //key
    value         string                                  //value
}
3.2.2 服务注册核心代码
// NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, err
    }

    ser := &ServiceRegister{
        cli:   cli,
        key:   key,
        value: val,
    }

    //申请租约设置时间keepalive
    if err := ser.putKeyWithLease(lease); err != nil {
        return nil, err
    }

    return ser, nil
}

// 设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
    //设置租约时间
    resp, err := s.cli.Grant(context.Background(), lease)
    if err != nil {
        return err
    }
    //注册服务并绑定租约
    _, err = s.cli.Put(context.Background(), s.key, s.value, clientv3.WithLease(resp.ID))
    if err != nil {
        return err
    }
    //设置续租 定期发送需求请求
    leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

    if err != nil {
        return err
    }
    s.leaseID = resp.ID
    s.keepAliveChan = leaseRespChan
    log.Printf("Put key:%s  val:%s  success!", s.key, s.value)
    return nil
}
3.2.3 服务节点信息结构
// ServerNode 服务器节点
type ServerNode struct {
    //服务器id
    Id int32 `json:"id" bson:"id"`
    //服务器名称
    Name string `json:"name" bson:"name"`
    //服务器连接地址
    Address string `json:"address" bson:"address"`
    //权重
    Weight int32 `json:"weight" bson:"weight"`
    //backup>=1 大于等于1代表备用服务器
    Backup int32 `json:"backup" bson:"backup"`
}

// Marshal 加密成字符串
func (sn *ServerNode) Marshal() string {
    bytes, err := json.Marshal(sn)
    if err != nil {
        return ""
    }
    return string(bytes)
}

// Unmarshal 解密成对象
func (sn *ServerNode) Unmarshal(val string) bool {
    err := json.Unmarshal([]byte(val), sn)
    if err != nil {
        return false
    }
    return true
}

3.3 服务发现实现

3.3.1 服务发现结构体
// ServiceDiscovery 服务发现
type ServiceDiscovery struct {
    cli        *clientv3.Client //etcd client
    serverList sync.Map
    prefix     string //监视的前缀
}
3.3.2 服务发现核心代码
// WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
    s.prefix = prefix
    //根据前缀获取现有的key
    resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
    if err != nil {
        return err
    }

    for _, ev := range resp.Kvs {
        s.SetServiceList(string(ev.Key), string(ev.Value))
    }

    //监视前缀,修改变更的server
    go s.watcher()
    return nil
}

// watcher 监听前缀
func (s *ServiceDiscovery) watcher() {
    rch := s.cli.Watch(context.Background(), s.prefix, clientv3.WithPrefix())
    log.Printf("watching prefix:%s now...", s.prefix)
    for wresp := range rch {
        for _, ev := range wresp.Events {
            switch ev.Type {
            case mvccpb.PUT: //修改或者新增
                s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
            case mvccpb.DELETE: //删除
                s.DelServiceList(string(ev.Kv.Key))
            }
        }
    }
}

3.4 实际应用场景

3.4.1 逻辑服务注册

在逻辑服务启动时,会向 etcd 注册自己的信息:

// doRegisterServiceToEtcd 注册grpc到etcd
func doRegisterServiceToEtcd(etcdConfig *cfg.EtcdConfig, registerConfig *cfg.RegisterConfig) error {
    node := types.ServerNode{}
    node.Id = registerConfig.Id
    node.Name = registerConfig.Name
    node.Address = registerConfig.Address
    node.Weight = registerConfig.Weight
    node.Backup = registerConfig.Backup
    nodeStr := node.Marshal()
    endPoints := strings.Split(etcdConfig.EndPoints, ",")
    prefix := registerConfig.Prefix
    if prefix == "" {
        prefix = ServerGrpcPrefix
    }
    register, err := etcdv3.NewServiceRegister(endPoints, prefix+node.Address, nodeStr, 5)
    if err != nil {
        fmt.Println("register logic service err: ", err)
        logger.ERROR("register logic service err: ", err)
        os.Exit(-1)
        return nil
    }
    logicRegister = register
    return nil
}
3.4.2 网关服务发现

网关服务通过 etcd 发现可用的逻辑服务:

// DiscoveryGrpcFromEtcd 从etcd发现grpc
func DiscoveryGrpcFromEtcd(etcdConfig *cfg.EtcdConfig, discoveryConfig *cfg.DiscoveryConfig) {
    endPoints := strings.Split(etcdConfig.EndPoints, ",")
    prefix := discoveryConfig.Prefix
    if prefix == "" {
        prefix = ServerGrpcPrefix
    }
    mGrpcService.EndPoints = endPoints
    mGrpcService.Prefix = prefix
}

// 连接所有逻辑服务
func (s *MLBConnectorService) doStartUp(callback func(serverId int32, api *pb.ProxyResp)) {
    ser := etcdv3.NewServiceDiscovery(s.EndPoints)
    ser.WatchService(s.Prefix)
    //服务发现定时器
    checkLogicServerTicker := time.NewTicker(time.Second * 2)
    for {
        select {
        case <-checkLogicServerTicker.C:
            s.dialServers(ser.GetServices(), callback)
        }
    }
}

四、etcd 配置管理

4.1 配置文件结构

项目使用 YAML 格式的配置文件,通过模板生成:

default:
  etcd:
    endPoints: 192.168.1.197:2379
  discovery:
    prefix: /grpc/zzb/logic_grpc/

4.2 配置模板生成

type ConfigTemplate struct {
    LogAddUrl     string
    LogQueryUrl   string
    MongoDb       string
    RedisHost     string
    RedisPort     int
    RedisPassword string
    EtcdEndPoints string
    GrpcPrefix    string
    MongodbPasswd string
    User          string
    StartStatus   int
}

func Deploy(jsonPath string, templatePath string, configPath string) {
    configTemplate := ReadJsonFile(jsonPath)
    teml := ReadFile(templatePath)
    t, err := template.New("tmpl").Parse(string(teml))
    if err != nil {
        fmt.Println(err)
        return
    }
    file, err := os.Create(configPath)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer file.Close()
    err = t.Execute(file, *configTemplate)
    if err != nil {
        fmt.Println(err)
        return
    }
}

五、etcd 最佳实践

5.1 租约机制的使用

租约机制是 etcd 的一个重要特性,用于实现服务的自动清理:

// ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
    for leaseKeepResp := range s.keepAliveChan {
        log.Println("续约成功", leaseKeepResp)
    }
    log.Println("关闭续租")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
    //撤销租约
    if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
        return err
    }
    err := s.cli.Close()
    s.cli = nil
    return err
}

5.2 错误处理和重试机制

在实际项目中,需要处理网络异常、服务不可用等情况:

// 设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
    //设置租约时间
    resp, err := s.cli.Grant(context.Background(), lease)
    if err != nil {
        return err
    }
    //注册服务并绑定租约
    _, err = s.cli.Put(context.Background(), s.key, s.value, clientv3.WithLease(resp.ID))
    if err != nil {
        return err
    }
    //设置续租 定期发送需求请求
    leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

    if err != nil {
        return err
    }
    s.leaseID = resp.ID
    s.keepAliveChan = leaseRespChan
    log.Printf("Put key:%s  val:%s  success!", s.key, s.value)
    return nil
}

5.3 负载均衡策略

项目实现了基于权重的负载均衡:

func (s *MLBConnectorService) getConnectorIdByWeight(excludeId int32, backup bool, rand *random.Random) int32 {
    if rand == nil {
        rand = random.NewRandom(false)
    }
    s.lock.RLock()
    defer s.lock.RUnlock()

    var connectors []*MLBConnector = make([]*MLBConnector, 0, len(s.connectorMap))
    for _, temp := range s.connectorMap {
        if temp.connected && temp.Backup == backup {
            if excludeId > 0 && excludeId == temp.Id {
                continue
            }
            connectors = append(connectors, temp)
        }
    }
    if len(connectors) > 0 {
        // 计算总权重
        totalWeight := int32(0)
        for _, conn := range connectors {
            totalWeight += conn.Weight
        }

        // 根据权重选择连接器
        randomWeight := rand.Int31n(totalWeight)
        currentWeight := int32(0)

        for _, conn := range connectors {
            currentWeight += conn.Weight
            if randomWeight < currentWeight {
                return conn.Id
            }
        }
    }

    return -1
}

六、etcd 集群部署

6.1 单节点部署

# 下载 etcd
wget https://github.com/etcd-io/etcd/releases/download/v3.5.0/etcd-v3.5.0-linux-amd64.tar.gz

# 解压
tar -xzf etcd-v3.5.0-linux-amd64.tar.gz

# 启动 etcd
./etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://localhost:2379

6.2 集群部署

# 节点1
./etcd --name infra1 --initial-advertise-peer-urls http://10.0.1.10:2380 \
  --listen-peer-urls http://10.0.1.10:2380 \
  --listen-client-urls http://10.0.1.10:2379,http://127.0.0.1:2379 \
  --advertise-client-urls http://10.0.1.10:2379 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster infra1=http://10.0.1.10:2380,infra2=http://10.0.1.11:2380,infra3=http://10.0.1.12:2380 \
  --initial-cluster-state new

# 节点2
./etcd --name infra2 --initial-advertise-peer-urls http://10.0.1.11:2380 \
  --listen-peer-urls http://10.0.1.11:2380 \
  --listen-client-urls http://10.0.1.11:2379,http://127.0.0.1:2379 \
  --advertise-client-urls http://10.0.1.11:2379 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster infra1=http://10.0.1.10:2380,infra2=http://10.0.1.11:2380,infra3=http://10.0.1.12:2380 \
  --initial-cluster-state new

# 节点3
./etcd --name infra3 --initial-advertise-peer-urls http://10.0.1.12:2380 \
  --listen-peer-urls http://10.0.1.12:2380 \
  --listen-client-urls http://10.0.1.12:2379,http://127.0.0.1:2379 \
  --advertise-client-urls http://10.0.1.12:2379 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster infra1=http://10.0.1.10:2380,infra2=http://10.0.1.11:2380,infra3=http://10.0.1.12:2380 \
  --initial-cluster-state new

6.3 Docker 部署

version: '3'
services:
  etcd:
    image: quay.io/coreos/etcd:v3.5.0
    container_name: etcd
    ports:
      - "2379:2379"
      - "2380:2380"
    environment:
      - ETCD_NAME=etcd0
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://localhost:2379
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://localhost:2380
      - ETCD_INITIAL_CLUSTER=etcd0=http://localhost:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster-1
    volumes:
      - ./etcd-data:/etcd-data
    command: etcd -data-dir=/etcd-data

七、监控和维护

7.1 健康检查

# 检查集群健康状态
etcdctl endpoint health

# 查看集群成员
etcdctl member list

# 查看集群状态
etcdctl cluster-health

7.2 性能监控

# 查看 etcd 指标
curl http://localhost:2379/metrics

# 监控 etcd 性能
etcdctl --endpoints=localhost:2379 endpoint status

八、项目中的关键代码分析

8.1 服务注册流程

在项目的逻辑服务中,服务注册的完整流程如下:

// 在 logic_server/main.go 中
func main() {
    // ... 其他初始化代码 ...
    
    netgrpc.Initialize()
    netgrpc.RegisterGrpcServiceToEtcd(sercfg.GetEtcdConfig(config.Etcd), config.Register)
    
    // ... 启动服务 ...
    netgrpc.StartServer(config.Port)
}

8.2 服务发现流程

网关服务通过以下方式发现逻辑服务:

// 在 gate_server/main.go 中
func main() {
    // ... 其他初始化代码 ...
    
    netgrpc.DiscoveryGrpcFromEtcd(sercfg.GetEtcdConfig(gateConfig.Etcd), sercfg.GetDiscoveryConfig(gateConfig.Discovery))
    netgrpc.Initialize(int64(*gateId))
    
    // ... 启动服务 ...
}

8.3 配置管理

项目使用模板化的配置管理方式:

// 在 tools/deploy/main.go 中
func main() {
    gopath := filepath.Dir(os.Args[0])
    if gopath == "" {
        fmt.Println("gopath nil")
        os.Exit(-1)
        return
    }
    Deploy(gopath+"/conf/config.json", gopath+"/template/config_deploy.tmpl", gopath+"/config.yaml")
}

九、性能优化建议

9.1 连接池管理

// 建议的连接池配置
cli, err := clientv3.New(clientv3.Config{
    Endpoints:   endpoints,
    DialTimeout: 5 * time.Second,
    MaxCallSendMsgSize: 1024 * 1024, // 1MB
    MaxCallRecvMsgSize: 1024 * 1024, // 1MB
})

9.2 批量操作

对于大量数据的读写操作,建议使用批量操作:

// 批量写入
ops := []clientv3.Op{
    clientv3.OpPut("key1", "value1"),
    clientv3.OpPut("key2", "value2"),
    clientv3.OpPut("key3", "value3"),
}
_, err := cli.Txn(context.Background()).Then(ops...).Commit()

9.3 监听优化

// 使用带缓冲的监听
rch := cli.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(0))
for wresp := range rch {
    // 处理事件
    for _, ev := range wresp.Events {
        // 处理单个事件
    }
}

十、故障排查

10.1 常见问题

  1. 连接超时

    • 检查网络连接
    • 验证 etcd 服务状态
    • 检查防火墙设置
  2. 租约续约失败

    • 检查网络稳定性
    • 验证 etcd 集群健康状态
    • 调整租约时间
  3. 服务发现延迟

    • 检查监听机制
    • 验证事件处理逻辑
    • 优化网络配置

10.2 调试工具

# 查看 etcd 日志
journalctl -u etcd -f

# 检查 etcd 状态
etcdctl endpoint status

# 查看 etcd 指标
curl http://localhost:2379/metrics | grep etcd

十一、总结

etcd 作为一个高可用的分布式键值存储系统,在微服务架构中发挥着重要作用。通过本文的深入分析,我们可以看到:

  1. 服务注册与发现:etcd 通过租约机制和监听机制实现了可靠的服务注册与发现
  2. 配置管理:etcd 可以作为配置中心,统一管理微服务的配置信息
  3. 高可用性:etcd 集群部署保证了服务的高可用性
  4. 一致性:基于 Raft 算法保证了数据的一致性

在实际项目中,etcd 的应用需要考虑:

  • 租约时间设置:根据服务的健康检查频率设置合适的租约时间
  • 错误处理:实现完善的错误处理和重试机制
  • 负载均衡:结合权重实现智能的负载均衡
  • 监控告警:建立完善的监控和告警机制

通过合理使用 etcd,可以构建出稳定、可靠的微服务架构,为业务的发展提供强有力的技术支撑。

参考资料