Etcd注册中心基本实现

发布于:2025-02-10 ⋅ 阅读:(32) ⋅ 点赞:(0)

Etcd入门

什么是Etcd

GitHub:https://github.com/etcd-io/etcd

image-20241215131646916

Etcd数据结构与特性

键值对格式,类似文件层次结构。

image-20241215131754070

image-20241215131841194

image-20241215131921418

image-20241215132016035

Etcd如何保证数据一致性?

表面来看,Etcd支持事务操作,能够保证数据一致性。

底层来看,Etcd使用Raft一致性算法保证数据一致性。

image-20241215132528385

官方可视化地址:http://play.etcd.io/play

可以深度了解,raft算法运行机制。

现在是一主两从两stop。

image-20241215132926476

停止主节点

此时主节点挂了,并没有选择新的主节点上线,因为还剩两个节点,一人一票,都没有胜出无法选择出新的Leader,这种现象也成为“脑裂”。

image-20241215133230080

启动node2,发现node3成为了Leader,此时不会有平票的情况。

image-20241215133747216

Etcd基本操作

增删改查。

写数据

image-20241215134412969

读数据

image-20241215134529732

前缀搜索

image-20241215134645914

Etcd安装

安装:https://github.com/etcd-io/etcd/releases

有不同系统安装启动脚本。

image-20241222185118997

安装完成会有三个脚本

  • etcd: etcd服务本身
  • etcdctl:客户端,用户操作etcd,如读写数据
  • etcdutl:备份恢复工具

执行etcd脚本,会启动etcd服务,服务默认占用2379和2380两个端口

2379:提供HTTP API服务,和etcdctl交互

2380:集群中节点通讯

Etcd可视化工具

etcdkeeper:https://github.com/evildecay/etcdkeeper

下载安装启动完毕,访问http://127.0.0.1:8080/etcdkeeper

Etcd Java客户端

jtecd:https://github.com/etcd-io/jetcd

1)引入依赖

<dependency>
    <groupId>io.etcd</groupId>
    <artifactId>jetcd-core</artifactId>
    <version>0.7.7</version>
</dependency>

2)demo

public class EtcdRegistry {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // create client using endpoints
        Client client = Client.builder().endpoints("http://localhost:2379")
                .build();

        KV kvClient = client.getKVClient();
        ByteSequence key = ByteSequence.from("likelong".getBytes());
        ByteSequence value = ByteSequence.from("666".getBytes());

        // put the key-value
        kvClient.put(key, value).get();

        // get the CompletableFuture
        CompletableFuture<GetResponse> getFuture = kvClient.get(key);

        // get the value from CompletableFuture
        GetResponse response = getFuture.get();
        System.out.println("value = " + response);

        // delete the key
        kvClient.delete(key).get();
    }
}

上述代码使用KVClient操作Etcd读写数据,除了KVClient客户端外,Etcd还提供了其他客户端。

image-20241215180012838

3)常用客户端

image-20241215180046293

绝大多数情况,前三个就够用。

Java Etcd数据结构

image-20241215223051370

除了有基本的KV,还有版本、创建版本、修改版本等字段。Etcd中每个键都有一个与之关联的版本号,用于跟踪键的修改历史。当键值发生变化,版本号也会随之增加。

image-20241215180536633

存储结构设计

存储结构设计几个要点:

  1. key如何设计?
  2. value如何设计?
  3. key什么时候过期?

结合Etcd数据存储结构特点(支持层级查询),以及一个服务会有多个服务提供者实例(负载均衡),可以设计为层级结构。

层级结构:将服务理解为文件夹、将服务对应的一个节点理解为文件夹下的文件,可以通过服务名称,用前缀查询的方式查询到某个服务的所有节点。

如下:键名规则可以为:业务前缀/服务名/服务节点地址

image-20241215183321023

如果是Redis作为注册中心,可以设计为列表结构(Redis本身支持列表数据结构)。

列表结构:将所有服务节点以列表的形式整体作为value。

image-20241215184137752

设置key过期超时时间,如30s,当服务宕机时,超时自动移除。

etcd选择层级结构。

开发实现

1. 注册中心开发

1)注册信息定义

ServiceMetaInfo类,封装服务注册信息,包括服务名称、服务版本号、服务地址(域名和端口)、服务分组等。

/**
 * 服务元信息(注册信息)
 */
public class ServiceMetaInfo {

    /**
     * 服务名称
     */
    private String serviceName;

    /**
     * 服务版本号
     */
    private String serviceVersion = "1.0";

    /**
     * 服务域名
     */
    private String serviceHost;

    /**
     * 服务端口号
     */
    private Integer servicePort;

    /**
     * 服务分组(暂未实现)
     */
    private String serviceGroup = "default";

}

添加方法,获取服务键名、获取服务注册节点键名以及获取服务访问地址。

/**
 * 获取服务键名
 */
public String getServiceKey() {
    // 后续可扩展服务分组
    // return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
    return String.format("%s:%s", serviceName, serviceVersion);
}

/**
 * 获取服务注册节点键名
 */
public String getServiceNodeKey() {
    return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePor	t);
}

/**
 * 获取完整服务地址(服务调用会用到)
 */
public String getServiceAddress() {
    if (!StrUtil.contains(serviceHost, "http")) {
        return String.format("http://%s:%s", serviceHost, servicePort);
    }
    return String.format("%s:%s", serviceHost, servicePort);
}

2)注册中心配置

/**
 * RPC 框架注册中心配置
 */
public class RegistryConfig {

    /**
     * 注册中心类别
     */
    private String registry = "etcd";

    /**
     * 注册中心地址
     */
    private String address = "http://localhost:2380";

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 超时时间(单位毫秒)
     */
    private Long timeout = 10000L;
}

3)注册中心接口

定义注册中心接口,后续可以实现多种不同的注册中心。可以使用SPI机制,动态加载

提供注册中心初始化、注册服务、注销服务、服务发现(获取服务节点列表)、服务销毁等方法。

/**
 * 注册中心接口
 */
public interface Registry {

    /**
     * 初始化
     *
     * @param registryConfig
     */
    void init(RegistryConfig registryConfig);

    /**
     * 注册服务(服务端)
     *
     * @param serviceMetaInfo
     */
    void register(ServiceMetaInfo serviceMetaInfo) throws Exception;

    /**
     * 注销服务(服务端)
     *
     * @param serviceMetaInfo
     */
    void unRegister(ServiceMetaInfo serviceMetaInfo);

    /**
     * 服务发现(获取某服务的所有节点,消费端)
     *
     * @param serviceKey 服务键名
     * @return
     */
    List<ServiceMetaInfo> serviceDiscovery(String serviceKey);

    /**
     * 服务销毁
     */
    void destroy();
}

4)Etcd注册中心实现

public class EtcdRegistry implements Registry {

    private static final Logger logger = LoggerFactory.getLogger(EtcdRegistry.class);

    private Client client;

    private KV kvClient;

    /**
     * 根节点
     */
    private static final String ETCD_ROOT_PATH = "/rpc/";

    @Override
    public void init(RegistryConfig registryConfig) {
        logger.info("etcd注册中心初始化...");
        client = Client.builder().endpoints(registryConfig.getAddress()).connectTimeout(Duration.ofMillis(registryConfig.getTimeout())).build();
        kvClient = client.getKVClient();
    }

    /**
     * 服务注册(默认30s自动剔除)
     *
     * @param serviceMetaInfo 服务元信息
     */
    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        // 创建 Lease 和 KV 客户端
        Lease leaseClient = client.getLeaseClient();

        // 创建一个 30 秒的租约
        long leaseId = leaseClient.grant(30).get().getID();

        // 设置要存储的键值对
        String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
        ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
        ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);

        // 将键值对与租约关联起来,并设置过期时间
        PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
        kvClient.put(key, value, putOption).get();
    }

    public void unRegister(ServiceMetaInfo serviceMetaInfo) {
        kvClient.delete(ByteSequence.from(ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(), StandardCharsets.UTF_8)).get();
    }

    public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
        // 前缀搜索,结尾一定要加 '/'
        String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";

        try {
            // 前缀查询
            GetOption getOption = GetOption.builder().isPrefix(true).build();
            List<KeyValue> keyValues = kvClient.get(
                            ByteSequence.from(searchPrefix, StandardCharsets.UTF_8),
                            getOption)
                    .get()
                    .getKvs();
            // 解析服务信息
            return keyValues.stream()
                    .map(keyValue -> {
                        String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                        return JSONUtil.toBean(value, ServiceMetaInfo.class);
                    })
                    .collect(Collectors.toList());
        } catch (Exception e) {
            throw new RuntimeException(String.format("serviceKey=%s, 获取服务列表失败", serviceKey), e);
        }
    }

    public void destroy() {
        logger.info("etcd注册中心下线...");
        // 释放资源
        if (kvClient != null) {
            kvClient.close();
        }
        if (client != null) {
            client.close();
        }
    }

}

2.SPI机制配置和扩展注册中心

使用SPI机制,读取配置文件,初始化对应的注册中心。

对应类属性位置:RpcConfig#registryConfig#registry

可以自己实现接口,自行扩展注册中心,也可以使用默认etcd注册中心。

SPI机制:添加对应SPI配置文件。

image-20241221181629344

注册中心工厂

类似序列化器,创建注册中心工厂,可以通过配置文件配置注册中心类型,指定对应注册中心

/**
 * 注册中心工厂(用于获取注册中心对象)
 */
public class RegistryFactory {

    static {
        SpiLoader.load(Registry.class);
    }

    /**
     * 默认注册中心
     */
    private static final Registry DEFAULT_REGISTRY = new EtcdRegistry();

    /**
     * 获取实例
     *
     * @param key 注册中心键值
     * @return 注册中心实例
     */
    public static Registry getInstance(String key) {
        return SpiLoader.getInstance(Registry.class, key);
    }

}

3.RPC调用

1)服务代理类

服务代理类,使用jdk动态代理(实现InvocationHandler类),用于生成代理对象实现远程调用

/**
 * 服务代理(JDK 动态代理)
 */
public class ServiceProxy implements InvocationHandler {

    /**
     * 调用代理
     *
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcConfig rpcConfig = RpcApplication.getRpcConfig();
        // 指定序列化器
        final Serializer serializer = SerializerFactory.getInstance(rpcConfig.getSerializer());

        // 构造请求
        String serviceName = method.getDeclaringClass().getName();
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setServiceName(serviceName);
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameterTypes(method.getParameterTypes());
        rpcRequest.setArgs(args);

        try {
            // 序列化
            byte[] bodyBytes = serializer.serialize(rpcRequest);

            // 从注册中心获取服务提供者请求地址
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstants.DEFAULT_SERVICE_VERSION);
            List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
            if (CollUtil.isEmpty(serviceMetaInfoList)) {
                throw new RuntimeException("暂无服务地址");
            }
            // 先默认取第一个,后续优化
            ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);

            // 发送请求
            try (HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
                    .body(bodyBytes)
                    .execute()) {
                byte[] result = httpResponse.bodyBytes();
                // 反序列化
                RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
                return rpcResponse.getData();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        return null;
    }
}

2)服务代理工厂

/**
 * 服务代理工厂(工厂模式,用于创建代理对象)
 */
public class ServiceProxyFactory {

    /**
     * 根据服务类获取代理对象
     *
     * @param serviceClass
     * @param <T>
     * @return
     */
    public static <T> T getProxy(Class<T> serviceClass) {
        return (T) Proxy.newProxyInstance(
                serviceClass.getClassLoader(),
                new Class[]{serviceClass},
                new ServiceProxy());
    }

}

基于服务代理类,生成代理对象,实现服务远程调用

3)本地注册中心

用于存放接口名与具体实现类映射,便于获取

/**
 * 本地注册中心
 */
public class LocalRegistry {

    /**
     * 注册信息存储
     */
    private static final Map<String, Class<?>> map = new ConcurrentHashMap<>();

    /**
     * 注册服务
     *
     * @param serviceName 接口
     * @param implClass   实现类
     */
    public static void register(String serviceName, Class<?> implClass) {
        map.put(serviceName, implClass);
    }

    /**
     * 获取服务
     *
     * @param serviceName
     * @return
     */
    public static Class<?> get(String serviceName) {
        return map.get(serviceName);
    }

    /**
     * 删除服务
     *
     * @param serviceName
     */
    public static void remove(String serviceName) {
        map.remove(serviceName);
    }
}

4)Vertx实现web服务器

Vertx官方文档:https://vertx.io/

使用Vertx实现web服务器

添加依赖

        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>4.5.1</version>
        </dependency>
/**
 * HTTP 服务器接口
 */
public interface HttpServer {

    /**
     * 启动服务器
     *
     * @param port
     */
    void doStart(int port);
}
/**
 * Vertx HTTP 服务器
 */
public class VertxHttpServer implements HttpServer {

    private static final Logger logger = LoggerFactory.getLogger(VertxHttpServer.class);

    /**
     * 启动服务器
     *
     * @param port 端口
     */
    public void doStart(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 HTTP 服务器
        io.vertx.core.http.HttpServer server = vertx.createHttpServer();

        // 处理请求
        server.requestHandler(new HttpServerHandler());

        // 启动 HTTP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                logger.info("Server is now listening on port " + port);
            } else {
                logger.error("Failed to start server: " + result.cause());
            }
        });
    }
}

具体业务处理逻辑

/**
 * HTTP 请求处理器
 */
public class HttpServerHandler implements Handler<HttpServerRequest> {

    private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);

    @Override
    public void handle(HttpServerRequest request) {
        // 指定序列化器
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());

        // 记录日志
        logger.info("Received request: " + request.method() + " " + request.uri());

        // 异步处理 HTTP 请求
        request.bodyHandler(body -> {
            byte[] bytes = body.getBytes();
            RpcRequest rpcRequest = null;
            try {
                rpcRequest = serializer.deserialize(bytes, RpcRequest.class);
            } catch (Exception e) {
                e.printStackTrace();
            }

            // 构造响应结果对象
            RpcResponse rpcResponse = new RpcResponse();
            // 如果请求为 null,直接返回
            if (rpcRequest == null) {
                rpcResponse.setMessage("rpcRequest is null");
                doResponse(request, rpcResponse, serializer);
                return;
            }

            try {
                // 获取要调用的服务实现类,通过反射调用
                Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
                Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
                // 封装返回结果
                rpcResponse.setData(result);
                rpcResponse.setDataType(method.getReturnType());
                rpcResponse.setMessage("ok");
            } catch (Exception e) {
                e.printStackTrace();
                rpcResponse.setMessage(e.getMessage());
                rpcResponse.setException(e);
            }
            // 响应
            doResponse(request, rpcResponse, serializer);
        });
    }

    /**
     * 响应
     *
     * @param request
     * @param rpcResponse
     * @param serializer
     */
    void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer) {
        HttpServerResponse httpServerResponse = request.response()
                .putHeader("content-type", "application/json");
        try {
            // 序列化
            byte[] serialized = serializer.serialize(rpcResponse);
            httpServerResponse.end(Buffer.buffer(serialized));
        } catch (IOException e) {
            e.printStackTrace();
            httpServerResponse.end(Buffer.buffer());
        }
    }
}

5)服务提供者服务注册

创建服务提供者module,引入starlink-rpc-core模块

image-20241221175357225

RPC相关配置application.properties

rpc.name=starlink
rpc.version=1.0
rpc.serverPort=8081
rpc.serializer=hessian

服务接口及实现类

public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "hello, " + name;
    }
}

服务注册

public class ServiceStarter {
    public static void main(String[] args) {
        RpcApplication.init();

        RpcConfig rpcConfig = RpcApplication.getRpcConfig();
        Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());


        // 注册服务
        String serviceName = HelloService.class.getName();
        LocalRegistry.register(serviceName, HelloServiceImpl.class);

        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());
        serviceMetaInfo.setServicePort(rpcConfig.getServerPort());
        serviceMetaInfo.setServiceName(serviceName);
        serviceMetaInfo.setServiceVersion(rpcConfig.getVersion());
        try {
            registry.register(serviceMetaInfo);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        // 启动Http服务
        HttpServer httpServer = new VertxHttpServer();
        httpServer.doStart(rpcConfig.getServerPort());
    }
}

客户端查看,注册成功,类似文件夹结构。

image-20241221175319016

6)服务调用者远程调用

依旧创建module,引入starlink-rpc-core及服务提供者example-consumer依赖

image-20241221180145283

先启动服务提供者,再远程调用

public class TestService {
    public static void main(String[] args) {
        // 创建代理对象
        HelloService helloService = ServiceProxyFactory.getProxy(HelloService.class);
        System.out.println(helloService.hello("Jack"));
    }
}

远程调用成功。

image-20241221180523273

核心:动态代理【ServiceProxy】 + 反射调用【HttpServerHandler】

基本可用,后续优化。


网站公告

今日签到

点亮在社区的每一天
去签到