etcd是什么
etcd是一个分布式、高可用的键值存储系统,常用于配置中心、服务注册、Leader选举等场景。
分布式系统
采用etcd作为分布式一致性KV存储,它基于Raft算法,可以保证在主从节点之间数据一致性。我们常把服务注册信息、配置中心的数据保存在etcd中,多个服务节点通过watch等机制保持一致状态。
分布式锁
数据一致性
Raft算法
代码实例
package main
import (
"context"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
// 服务注册
func registerService(client *clientv3.Client, serviceName, serviceAddr string, ttl int64) error {
// 创建租约
leaseResp, err := client.Grant(context.TODO(), ttl)
if err != nil {
return err
}
// 服务路径
key := "/services/" + serviceName + "/" + serviceAddr
// 将服务地址注册到etcd,并与租约关联
_, err = client.Put(context.TODO(), key, serviceAddr, clientv3.WithLease(leaseResp.ID))
if err != nil {
return err
}
// 自动续约
ch, err := client.KeepAlive(context.TODO(), leaseResp.ID)
if err != nil {
return err
}
// 处理续约应答
go func() {
for range ch {
// 续约成功
log.Printf("Service %s renewed", serviceAddr)
}
log.Printf("Service %s lease expired", serviceAddr)
}()
return nil
}
// 服务发现
func discoverServices(client *clientv3.Client, serviceName string) ([]string, error) {
// 前缀查询
resp, err := client.Get(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())
if err != nil {
return nil, err
}
var services []string
for _, kv := range resp.Kvs {
services = append(services, string(kv.Value))
}
return services, nil
}
// 监听服务变化
func watchServices(client *clientv3.Client, serviceName string) {
rch := client.Watch(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case clientv3.EventTypePut:
log.Printf("Service added: %s", ev.Kv.Value)
case clientv3.EventTypeDelete:
log.Printf("Service removed: %s", ev.Kv.Key)
}
}
}
}
func main() {
// 连接etcd
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"}, // etcd服务器地址
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalf("Failed to connect to etcd: %v", err)
}
defer client.Close()
// 注册服务
serviceName := "user-service"
serviceAddr := "127.0.0.1:8080"
err = registerService(client, serviceName, serviceAddr, 10) // 10秒租约
if err != nil {
log.Fatalf("Failed to register service: %v", err)
}
log.Printf("Service %s registered successfully", serviceAddr)
// 启动服务监听
go watchServices(client, serviceName)
// 发现服务
services, err := discoverServices(client, serviceName)
if err != nil {
log.Fatalf("Failed to discover services: %v", err)
}
log.Printf("Discovered services: %v", services)
// 保持程序运行
select {}
}
代码功能
- 服务注册
1. 使用etcd的租约机制注册(为了避免死锁,使用租约)
2. 设置服务的存活时间(租约有效时间,到期前如果未续费就自动断开)
3. 通过自动续约机制保持服务在线状态
4. 服务路径格式为/services/服务名/服务地址 - 服务发现
- 通过前缀查询获取指定服务名的所有实例
- 可以获取当前可用的服务列表
- 服务监听
- 实时监控服务的变化(新增或移除)
- 当服务上下线时能及时收到通知
上线与注册流程说明
- 连接etcd集群
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"}, // etcd服务器地址
DialTimeout: 5 * time.Second,
})
创建了一个 etcd 客户端,通过指定的端点 (Endpoints) 连接到 etcd 集群。
- 服务注册实现
// 创建租约
leaseResp, err := client.Grant(context.TODO(), ttl)
// 写入服务信息并关联租约
_, err = client.Put(context.TODO(), key, serviceAddr, clientv3.WithLease(leaseResp.ID))
// 自动续约
ch, err := client.KeepAlive(context.TODO(), leaseResp.ID)
// 处理续约应答
go func() {
for range ch {
// 续约成功
}
}()
- Grant() 创建租约,设置服务存活时间
- Put() 将服务信息写入 etcd,并通过WithLease()关联租约
- KeepAlive() 启动自动续约机制
- 用 goroutine 处理续约响应,保持服务活跃
- 服务发现实现
// 前缀查询获取服务实例
resp, err := client.Get(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())
// 提取服务地址
var services []string
for _, kv := range resp.Kvs {
services = append(services, string(kv.Value))
}
- Get() 方法配合WithPrefix()参数实现前缀查询
- 遍历查询结果,提取所有服务实例地址
- 监听服务变化
rch := client.Watch(context.TODO(), "/services/"+serviceName+"/", clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case clientv3.EventTypePut: // 服务新增
case clientv3.EventTypeDelete: // 服务移除
}
}
}
- Watch() 方法建立对指定前缀路径的监听
- 通过循环读取事件通道,处理服务新增和移除事件
- 服务下线