基于 etcd 实现的服务发现,按照非规范化的 etcd key 实现,详细见代码注释。
package discovery
import (
"context"
"encoding/json"
"fmt"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
"strings"
"time"
)
// gRPC 的服务一般会使用 protobuf 作为数据传输的介质
// gRPC 服务定义在 proto 的文件中,例如:service RoutingService {}
// protoc 将 proto 后缀文件转为 go 文件,文件内自动生成了 gRPC 服务的描述信息、服务注册的函数、客户端声明的函数等内容
// 如下,它们的格式是固定的,注意函数的参数
// 服务描述信息:RoutingService_ServiceDesc,格式:服务名_ServiceDesc
// 服务注册函数:RegisterRoutingServiceServer,格式:Register你服务名Server
// 客户端声明函数:NewRoutingServiceClient,格式:New服务名Client
// 其中客户端声明函数的参数是 gRPC 连接,返回值是 gRPC 服务的客户端接口,这样就可以调用客户端接口定义的 rpc 方法了
// gRPC 连接不会与某个 gRPC 服务绑定,它只是一个连接。
// 获取 gRPC 连接的方式如下两种,第一个参数就是 gRPC 服务的地址,可以写死 ip + port,也可以使用服务发现来获取 gRPC 服务的地址。
// grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName))
// grpc.Dial(fmt.Sprintf("%s:///%s", scheme, serviceName))(废弃)
// 服务发现是实现 Builder 和 Resolver 接口,Builder 用于创建 Resolver 实例,Resolver 用于解析服务地址。
// Builder 的 Scheme 方法返回值是 与 grpc.NewClient 中的 scheme 对应
// Builder 的 Build 第一个参数 target.Endpoint() 得到的结果是 grpc.NewClient 中的 serviceName,Build 方法的触发分情况:
// grpc.NewClient 声明不会触发 Build 方法,首次调用 rpc 方法时触发 Build
// grpc.Dial 声明会触发 Build 方法,但已经废弃了
// Resolver 的 ResolveNow 方法是 gRPC 主动调用的,我们可以使用它动态去 etcd 中获取服务地址,也可以不实现它,自定义服务发现的逻辑
// 服务发现的实现方式:
// 假如我们有三个应用,user-center、device-center、网关,user-center 和 device-center 暴露了很多 gRPC 服务,网关需要调用它们的服务
// 假如我们使用 etcd 作为注册中心,同时规范化 etcd 的 key ,例如:grpc/services/{serviceName}/实例ID
// grpc/services/user-center/实例1
// grpc/services/user-center/实例2
// grpc/services/device-center/实例1
// grpc/services/device-center/实例2
// 网关中分别实现 Builder 和 Resolver,并将 Builder 的实例注册在 grpc 的地址解析中,resolver.Register(Builder实现的实例)
// 获取 user-center 和 device-center 的 grpc 连接
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "user-center"))
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "device-center"))
// 当 gRPC 连接建立时,gRPC 会调用 Builder 的 Build 方法,我们获取 target.Endpoint() 就是 serviceName
// 这样 fmt.Sprintf("grpc/services/%s", serviceName) 获取 serviceName 的 etcd 的 key 前缀
// 如:grpc/services/user-center/
// Build 方法中按前缀匹配查询 etcd 的数据,这样就获取到了 user-center 的所有实例的地址,再同步到 Resolver 中
// 如上就实现了规范化 etcd 的 key 前缀的服务发现,不管有多少个应用,代码中只需要一个服务发现的实例
// 如果没有规范化 etcd 的 key 前缀,那么我们需要为各个服务声明不同的 scheme,每个 scheme 对应一个服务发现的实例
// Builder 的实现必须包含 etcd 的 key 前缀 ,不能利用 target.Endpoint() 去实现服务发现
// 如:ServiceDiscovery 实现了 Builder
// type ServiceDiscovery struct {
// serverKey string
// }
// grpc/services/user-center/ 固定写死赋值给 serverKey
// 声明 ServiceDiscovery { serverKey },注册 resolver.Register(ServiceDiscovery实例)
// grpc.NewClient(fmt.Sprintf("%s:///%s", "user", "user-center"))
// 普通 rpc 调用时,服务端挂掉:
// 服务发现找不到数据时:rpc error: code = Unavailable desc = no children to pick from
// 服务挂掉但etcd/服务发现还有数据:rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.202.160.190:6888: connect: connection refused"
// 服务重启后客户端连接可以恢复
// 当某个服务节点不可用时,可以自动连接到可用的服务节点
// 流式 rpc 调用,服务端挂掉:
// 客户端发送方:EOF
// 客户端接收方:rpc error: code = Unavailable desc = error reading from server: EOF
// 服务重启后客户端连接不可恢复,必须重新建立连接
// ServiceDiscovery is a gRPC resolver that uses etcd for service discovery.
// 配合 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 来使用
// Build 方法的 target.Endpoint() 就是 serviceName
type ServiceDiscovery struct {
scheme string // 自定义的 grpc 服务的 scheme,例如:grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)
serviceKey string // etcd 中服务的 key 前缀,例如:grpc/maxwell-ai/GatewayInfoService/1.0/
etcdClient *clientv3.Client
}
// ServiceResolver is a gRPC resolver that resolves service addresses from etcd.
// 一个 scheme 对应一个 ServiceResolver,当 grpc 建立连接时触发 ServiceDiscovery 的 Build 方法
// 注意:
// grpc.NewClient 不会触发 Build 方法
// grpc.Dial 会触发 Build 方法,但已经废弃了
type ServiceResolver struct {
scheme string
serviceKey string
target resolver.Target
client *clientv3.Client
cc resolver.ClientConn
addrMap map[string]resolver.Address
closed chan struct{}
}
func NewServiceDiscovery(scheme string, serviceKey string, etcdClient *clientv3.Client) *ServiceDiscovery {
return &ServiceDiscovery{
scheme: scheme,
serviceKey: serviceKey,
etcdClient: etcdClient,
}
}
// Build creates a new ServiceDiscovery resolver.
// grpc.NewClient 不会触发 Build 方法
// grpc.Dial 会触发 Build 方法,但已经废弃了
// target: grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 中的 serviceName 就是 target
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// 创建服务解析器
serviceResolver := &ServiceResolver{
target: target,
cc: cc,
scheme: s.scheme,
serviceKey: s.serviceKey,
client: s.etcdClient,
closed: make(chan struct{}),
addrMap: make(map[string]resolver.Address),
}
// 首次拉取所有数据
if err := serviceResolver.rePull(); err != nil {
return nil, err
}
// 开启 watcher 监听 etcd 中的服务地址变化
go serviceResolver.watcher()
return serviceResolver, nil
}
// Scheme returns the scheme of the resolver.
// scheme 是 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)
func (s *ServiceDiscovery) Scheme() string {
return s.scheme
}
// ResolveNow is called by gRPC to resolve the service address immediately.
// grpc 主动调用去解析服务地址,这里可以实现从 etcd 获取服务地址的逻辑
// 但是不在这里实现,因为这里实现有同步和异步从 etcd 中查询数据
// 同步会阻塞
// 异步会开启很多 goroutine,可能会导致 goroutine 泄漏
func (s *ServiceResolver) ResolveNow(options resolver.ResolveNowOptions) {
}
func (s *ServiceResolver) Close() {
close(s.closed)
}
func (s *ServiceResolver) rePull() error {
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
resp, err := s.client.Get(ctx, s.serviceKey, clientv3.WithPrefix())
if err != nil {
return err
}
s.addrMap = make(map[string]resolver.Address)
for _, ev := range resp.Kvs {
key := strings.TrimPrefix(string(ev.Key), s.serviceKey)
s.addServer(key, ev.Value)
}
s.syncToGrpc()
return nil
}
func (s *ServiceResolver) addServer(key string, value []byte) {
var si ServiceInfo
if err := json.Unmarshal(value, &si); err != nil {
return
}
s.addrMap[key] = resolver.Address{
Addr: fmt.Sprintf("%s:%d", si.Ip, si.Port),
}
}
func (s *ServiceResolver) delServer(key string) {
if _, ok := s.addrMap[key]; ok {
delete(s.addrMap, key)
}
}
func (s *ServiceResolver) syncToGrpc() {
addrSlice := make([]resolver.Address, 0, 10)
for _, v := range s.addrMap {
addrSlice = append(addrSlice, v)
}
err := s.cc.UpdateState(resolver.State{Addresses: addrSlice})
if err != nil {
return
}
}
func (s *ServiceResolver) watcher() {
rePull := false
for {
select {
case <-s.closed:
return
default:
}
if rePull {
if err := s.rePull(); err != nil {
time.Sleep(5 * time.Second)
continue
}
}
rch := s.client.Watch(context.Background(), s.serviceKey, clientv3.WithPrefix())
loop:
for {
select {
case <-s.closed:
return
case resp, ok := <-rch:
if !ok {
rePull = true
break loop
}
for _, ev := range resp.Events {
key := strings.TrimPrefix(string(ev.Kv.Key), s.serviceKey)
switch ev.Type {
case mvccpb.PUT:
s.addServer(key, ev.Kv.Value)
case mvccpb.DELETE:
s.delServer(key)
}
}
s.syncToGrpc()
}
}
}
}