Rpc导读

发布于:2025-02-22 ⋅ 阅读:(16) ⋅ 点赞:(0)

手写Rpc框架 - 导读

git仓库-all-rpc

GTIEE:https://gitee.com/quercus-sp204/all-rpc 【参考源码 yrpc】

1. Rpc概念

RPC 即远程过程调用(Remote Procedure Call) ,就是通过网络从远程计算机程序上请求服务。

  • 本地调用抽象:允许程序像调用本地函数一样调用远程计算机上的函数。开发者无需编写复杂的网络通信代码来处理诸如建立连接、发送请求、等待响应等细节,只需关注业务逻辑的实现。例如,在一个分布式系统中,A 服务器上的程序需要调用 B 服务器上的某个函数来获取数据,使用 RPC 就可以像调用本地函数一样简单。
  • 通信协议与序列化:为了实现这种跨网络的函数调用,RPC 框架通常会使用特定的通信协议(如 TCP/IP)来传输数据,并通过序列化和反序列化技术,将调用的参数和返回值转换为适合在网络上传输的格式。比如,将参数对象转换为字节流进行传输,在接收端再将字节流还原为对象。

那么,它的应用场景有哪些呢?我们平时用到了吗?

  • 微服务架构:在微服务架构中,各个微服务之间通常需要相互通信来完成复杂的业务流程。例如,一个电商系统中,订单服务可能需要调用库存服务来检查商品库存,调用用户服务来验证用户信息等,RPC 可以高效地实现这些微服务间的通信。
  • 分布式系统:在大型分布式系统中,不同节点可能负责不同的功能模块。例如,在搜索引擎系统中,索引构建节点和查询服务节点可能分布在不同的服务器上,通过 RPC 可以实现节点之间的协同工作。

说白了,就是不同服务之间的网络通信嘛。那你可能会问了,假如我的系统有User、Order、Shipment三个服务【爪哇SpringBoot编写的嚯】,如果User想要访问Order上面的函数,我只需要将此函数以Http接口的形式暴露出去,然后User那边使用RestTemplate来访问不就好了吗?何必这么费劲还要用框架呢?

仔细想一下,确实有那么点儿卵道理,但是又仔细一想,

  • 你会发现,你使用RestTemplate的时候,需要手动处理请求的 URL 拼接、请求头设置、参数序列化和响应反序列化等操作,对了,还有异常也需要自己处理,需要自己处理调用失败的情况,如重试几次啊等等
  • HTTP 协议是一种文本协议,存在较多的头部信息,在数据传输时会带来额外的开销。而rpc框架的协议通常相比http协议,是很轻量的。
  • 如果Order部署在了多台机器上面,代码里面肯定是存了这些机器的地址,如果扩容或者缩容,都要修改代码,同时还需要我们手动选择调用哪一台机器上面的Order【需要手动实现负载均衡】

那么,rpc都可以完成这些,并且服务的地址信息那些啊,可以在rpc的注册中心拉取到,动态感知。

综上所述,一个基本的Rpc框架主要应该具有的能力是:

基础通信能力

  • 高效序列化、优化网络传输,采用高效的网络协议和传输方式

  • 稳定性与可靠性

    • 连接管理:具备完善的连接池管理机制,对连接进行复用,减少频繁建立和销毁连接的开销,同时保证连接的稳定性。例如,在高并发场景下,能自动处理连接的超时、重连等问题

服务发现与治理能力

  • 服务注册与发现: 1. 服务提供者能够在启动时自动将自己的服务信息(如服务名称、地址、端口等)注册到服务注册中心,方便服务消费者发现和调用。2.实时感知:调用方能够及时感知调用的服务信息并更新。
  • 负载均衡: 如随机、轮询、最少活跃调用数、一致性哈希
  • 服务容错
    • 熔断机制:当服务提供者出现故障或响应时间过长时,能够自动熔断对该服务的调用,避免大量请求积压,影响整个系统的稳定性。
    • 降级策略:在系统资源紧张或服务出现故障时,能够自动降级服务,提供默认的响应结果或采取其他临时措施,保证系统的基本可用性。

易用性与可扩展性

  • 简单的编程模型:让开发者能够像调用本地函数一样调用远程服务,无需关注底层的网络通信细节,降低开发难度,提高开发效率
  • 插件化与扩展性
    • SPI 机制:具备良好的插件化架构,通过服务提供者接口(SPI)机制,允许开发者根据实际需求扩展框架的功能,如自定义序列化方式、负载均衡策略、过滤器等。

在这里插入图片描述

现在就按照上面的能力,来一个一个实现,最终将其组合成一个框架。

2. 角色

一个Rpc框架的大致角色分布:

在这里插入图片描述

服务提供者将自己的数据信息【例如,端口、ip、接口等信息提供给注册中心】(服务注册)

消费者从注册中心拉取到可用的服务信息(服务发现),然后选择一个合适的服务(负载均衡),发送网络请求【请求里面封装了需要调用的接口,参数等等】,

服务提供者接收到请求之后,本地调用方法,然后通过网络把响应结果过传输给消费者

最后消费者解析响应结果。

注册中心: (本文就选择zookeeper为注册中心)

3. 注册中心的接入

zookeeper的安装与启动,就不在这里赘述了。

思考注册中心的主要能力:【服务注册,服务发现】

定义接口

Registry接口

/*
 * 注册中心的能力: 注册服务, 拉取服务列表
 */
public interface Registry {
    /**
     * 注册服务
     * @param serviceConfig 服务的配置内容
     */
    void register(ServiceConfig<?> serviceConfig);
    
    /**
     * 从注册中心拉取服务列表
     * @param serviceName 服务的名称
     * @return 服务的地址
     */
    List<InetSocketAddress> lookup(String serviceName, String group);
}

ServiceConfig 封装服务信息的class

/*
  服务信息
 */
public class ServiceConfig<T> {
    // 接口的类型
    /*
    比如UserServiceImpl实现了UserService, 真实对象就是实现类,interfaceProvider就是UserService.class
    */
    private Class<?> interfaceProvider;
    private Object ref; // 真实对象
    private String group = "default"; // 分组
	// get set.....
}

Zookeeper注册中心的实现类 ZookeeperRegistry

服务注册在zookeeper上面的节点如图所示

trpc根节点
==▶消费者节点
==▼生产者节点
====▼接口的全限定名 
======▼分组
========▼地址信息1...

在这里插入图片描述

@Slf4j
public class ZookeeperRegistry extends AbstractRegistry {
    // 维护一个zk实例
    private ZooKeeper zooKeeper;
    
    public ZookeeperRegistry() {
        this.zooKeeper = ZookeeperUtil.createZookeeper();
    }
    
    public ZookeeperRegistry(String connectString,int timeout) {
        this.zooKeeper = ZookeeperUtil.createZookeeper(connectString,timeout);
    }
    
    @Override
    public void register(ServiceConfig<?> service) {
        // 服务名称的节点 ----  "/tprc-metadata/providers/接口全限定名"
        String parentNode = Constant.BASE_PROVIDERS_PATH +"/"+service.getInterface().getName();
        // 建立服务节点这个节点应该是一个持久节点
        if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){
            ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);
            ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);
        }
        // 建立分组节点
        parentNode = parentNode + "/" + service.getGroup();
        if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){
            ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);
            ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);
        }
        // 创建本机的临时节点, ip:port ,
        // 服务提供方的端口一般自己设定,我们还需要一个获取ip的方法
        // ip我们通常是需要一个局域网ip,不是127.0.0.1,也不是ipv6
        // 192.168.12.121
        String node = parentNode + "/" + NetUtils.getIp() + ":" + TrpcBootstrap.getInstance().getConfiguration().getPort();
        if(!ZookeeperUtil.exists(zooKeeper,node,null)){
            ZookeeperNode zookeeperNode = new ZookeeperNode(node,null);
            ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL);
        }
        log.info("服务{},注册ok",service.getInterface().getName());
    }
    
    /**
     * 拉取合适的服务列表
     * @param serviceName 服务名称
     * @return 服务列表
     */
    @Override
    public List<InetSocketAddress> lookup(String serviceName,String group) {
        // 1、找到服务对应的节点
        String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" +group;

        List<String> children = ZookeeperUtil.getChildren(zooKeeper, serviceNode, null);
        // 获取了所有的可用的服务列表
        List<InetSocketAddress> inetSocketAddresses = children.stream().map(ipString -> {
           String[] ipAndPort = ipString.split(":");
           String ip = ipAndPort[0];
           int port = Integer.parseInt(ipAndPort[1]);
           return new InetSocketAddress(ip, port);
        }).toList();

        if(inetSocketAddresses.isEmpty()){
           throw new DiscoveryException("未发现任何可用的服务主机.");
        }
        return inetSocketAddresses;
    }
}

上面的ZookeeperUtil是自定义的操作Zookeeper的工具类。-- 详情见源码里面的注释,值得说明一下,Zookeeper要先有父结点才能创建子节点,不能把路径直接写全了直接创建,故在源码里面会用createRoot方法初始化所有的父结点。

public static ZooKeeper createZookeeper(String connectPath, int timeout) {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    try {
        .................
        // 连接成功就创建根节点,检查是否存在rpc根节点  /trpc-metadata/providers  &&  /trpc-metadata/consumers
        createRoot(zooKeeper);
        ...............
    } catch (IOException | InterruptedException e) {
        log.info("创建zookeeper实例时发生异常:",e);
        throw new ZookeeperException("创建zookeeper实例时发生异常");
    }
}

至此,注册中心的两个基本方法就可以告一段落了。

4.Trpc框架启动器

既然是一个框架,那么,我们必然有一个入口来启动这一套流程。

①服务提供方

- 基本功能信息appName、registry

形如: all-tRpc-demo / demo-simple-provider / …/ProviderApplication.java 这样来启动我们的提供方。

public class ProviderApplication {
    public static void main(String[] args) {
        TrpcBootstrap.getInstance() 
                .appName("user-provider")
                // 配置注册中心
                .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
                // 扫描包下的类,然后批量发布接口
                .scan("com.feng.demo")
                // 启动服务
                .start();
    }
}

在TrpcBootstrap.java里面

@Slf4j
public class TrpcBootstrap {
    // 单例,每个应用程序只有一个
    private static final TrpcBootstrap trpcBootstrap = new TrpcBootstrap();
    
    // 配置
    private final RpcConfiguration configuration;
    
    // 获取实例
    public static TrpcBootstrap getInstance() {
        return trpcBootstrap;
    }
    
    // 设置应用名称 *****
    public TrpcBootstrap appName( String appName ) {
        configuration.setAppName(appName);
        return this;
    }
    // 配置注册中心 *****
    public TrpcBootstrap registry( RegistryConfig registryConfig ) {
        // 传递过来的参数registryConfig,还没有创建连接,在这里创建与注册中心的连接
        registryConfig.createRegistryConnection(); // 创建注册中心的连接
        configuration.setRegistryConfig(registryConfig); // 设置服务注册中心
        return this;
    }
}

RpcConfiguration是封装的配置信息

// 全局的配置类,代码配置 --> xml配置 --> 默认项
@Data
public class RpcConfiguration {
    // 配置信息-->端口号
    private int port = 8094;
    // 配置信息-->应用程序的名字
    private String appName = "default";
    // 分组信息
    private String group = "default";
    // 配置信息-->注册中心
    private RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181"); // 默认的
    // 配置信息-->序列化协议
    private String serializeType = "jdk";
    // 配置信息-->压缩使用的协议
    private String compressType = "gzip";
    // 配置信息
    @Getter
    public IdGenerator idGenerator = new IdGenerator(1, 2);

    // 配置信息-->负载均衡策略
    private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();

    // 为每一个ip配置一个限流器
    private final Map<SocketAddress, RateLimiter> everyIpRateLimiter = new ConcurrentHashMap<>(16);
    // 为每一个ip配置一个断路器,熔断
    private final Map<SocketAddress, CircuitBreaker> everyIpCircuitBreaker = new ConcurrentHashMap<>(16);

    // 读xml,dom4j
    public RpcConfiguration() {
        ............
    }
}

// 里面又持有注册中心的类
@Slf4j
public class RegistryConfig {
    // 定义连接的 url zookeeper://127.0.0.1:2181  redis://192.168.12.125:3306
    private final String connectString;
    // 持有一个 Registry
    private Registry registry;
    public RegistryConfig(String connectString) {
        this.connectString = connectString;
    }

    public Registry getRegistry() {
        if ( registry == null ) createRegistryConnection();
        return registry;
    }

    /**
     * 可以使用简单工厂来完成
     * @return 具体的注册中心实例
     */
    public void createRegistryConnection() {
        if ( connectString == null ) throw new DiscoveryException("未配置注册中心!");
        // 1、获取注册中心的类型
        String registryType = getRegistryType(connectString,true).toLowerCase().trim();
        log.info("【创建与注册中心的连接~~~】 注册中心的类型: {}", registryType);
        // 2、通过类型获取具体注册中心
        if( registryType.equals("zookeeper") ){
            String host = getRegistryType(connectString, false);
            this.registry = new ZookeeperRegistry(host, Constant.ZK_TIME_OUT);
        } else if (registryType.equals("nacos")){
            String host = getRegistryType(connectString, false);
            this.registry = new NacosRegistry(host, Constant.ZK_TIME_OUT);
        } else {
            throw new DiscoveryException("未发现合适的注册中心。");
        }
    }

    private String getRegistryType(String connectString,boolean ifType){
        String[] typeAndHost = connectString.split("://");
        if(typeAndHost.length != 2){
            throw new RuntimeException("给定的注册中心连接url不合法");
        }
        if(ifType){
            return typeAndHost[0];
        } else {
            return typeAndHost[1];
        }
    }
}
- 扫描接口并发布scan
// 扫描项目指定包下面的接口,并且将他们发布到注册中心
public TrpcBootstrap scan(String packageName) {
    // 1. 获取指定包 path 下的所有类的全限定名
    List<String> classNames = getAllClassName(packageName);
    // 2.1 拿到所有标注了TrpcApi注解的类
    List<? extends Class<?>> classes = getTrpcClassesByList(classNames);
    //  2.2遍历这些构建实例
    for (Class<?> clazz : classes) {
        Class<?>[] interfaces = clazz.getInterfaces(); // 获取到他的接口
        Object instance;
        try {
            instance = clazz.getConstructor().newInstance(); // 通过无参构造器创建一个实例
        } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
            throw new RuntimeException(e);
        }
        // 获取注解的group
        TrpcApi annotation = clazz.getAnnotation(TrpcApi.class);
        String group = annotation.group();
        // 3.将这些接口发布
        for (Class<?> anInterface : interfaces) {
            ServiceConfig<?> serviceConfig = new ServiceConfig<>();
            serviceConfig.setInterface(anInterface);
            serviceConfig.setRef(instance);
            serviceConfig.setGroup(group);
            if (log.isDebugEnabled()){
                log.debug("---->已经通过包扫描,将服务【{}】发布.",anInterface);
            }
            // 3、发布
            publish(serviceConfig);
        }
    }
    return this;
}

//
private TrpcBootstrap publish( ServiceConfig<?> service ) {
    configuration.getRegistryConfig().getRegistry().register(service);
    // 维护一个映射关系
    SERVERS_LIST.put(service.getInterface().getName(), service);
    return this;
}

具体可以看源码里面的实现

- 启动netty
public void start() {
    // 1、创建eventLoop,老板只负责处理请求,之后会将请求分发至worker
    EventLoopGroup boss = new NioEventLoopGroup(2);
    EventLoopGroup worker = new NioEventLoopGroup(10);
    try {
        ServerBootstrap bootstrap = new ServerBootstrap();
        // 3.配置服务
        bootstrap = bootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 是核心,我们需要添加很多入站和出站的handler
                        socketChannel.pipeline().addLast(new LoggingHandler()) // 打印日志
                            .addLast(new TrpcRequestDecoder())  // 请求过来,需要解码
                            // 根据请求进行方法调用
                            .addLast(new MethodCallHandler())
                            .addLast(new TrpcResponseEncoder()) // 响应回去,需要编码
                        ;
                    }
                });
        // 4.绑定端口
        ChannelFuture channelFuture = bootstrap.bind(configuration.getPort()).sync();
        // 5.关闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        boss.shutdownGracefully();
        worker.shutdownGracefully();
    }
}

② 服务消费方

形如 all-tRpc-demo / demo-simple-consumer /…/ConsumerApplication.java

public class ConsumerApplication {
    public static void main(String[] args) {
        ReferenceConfig<UserService> reference = new ReferenceConfig<>();
        reference.setReference(UserService.class);

        // 1、连接注册中心
        // 2、拉取服务列表
        // 3、选择一个服务并建立连接
        // 4、发送请求,携带一些信息(接口名,参数列表,方法的名字),获得结果
        TrpcBootstrap.getInstance()
                .appName("user-consumer")
                .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
                .reference(reference);

        UserService userService = reference.get();
        System.out.println("=======================================");
        for (int i = 0; i < 10; i++) {
            System.out.println("【rpc调用开始=============】");
            // 开始时间
            long start = System.currentTimeMillis();
            List<User> users = userService.getUserByName("田小锋q" + i);
            for (User user : users) {
                System.out.println(user);
            }
            // 结束时间
            long end = System.currentTimeMillis();
            System.out.println("rpc执行耗时:" + (end - start));
            System.out.println("【rpc调用=============结束-----】");
        }

    }
}

@Slf4j
public class ReferenceConfig<T> {
    // 接口类型
    private Class<T> interfaceRef;
    // 注册中心
    private Registry registry;
    // 分组信息
    private String group;
    /**
     * 代理设计模式
     * @return 代理对象
     */
    public T get() {
        // 此处一定是使用动态代理完成了一些工作
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 类加载器
        Class<T>[] classes = new Class[]{interfaceRef}; // 接口类型
        InvocationHandler handler = new ProxyConsumerInvocationHandler(registry,interfaceRef,group);
        // 使用动态代理生成代理对象
        Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);
        return (T)helloProxy;
    }
}

主要是jdk动态代理 在invoke里面实现我们的远程调用

5. 序列化&&压缩

在 RPC(远程过程调用)框架中,序列化和压缩是两个重要的概念,它们在数据传输过程中起着关键作用

序列化

上图里面可以看出来,序列化是将对象转换为字节流的过程,以便在网络上传输或存储到文件中。在 RPC 中,客户端调用远程服务时,需要将调用的参数对象序列化为字节流,通过网络发送到服务端;服务端接收到字节流后,再将其反序列化为对象进行处理。处理完后,又将结果对象序列化为字节流返回给客户端,客户端再反序列化得到结果。

常见的序列化方式

那么我们就定义一下序列化的接口

public interface Serializer {
    /**
     * 抽象的用来做序列化的方法
     */
    byte[] serialize(Object object);

    /**
     * 反序列化的方法
     */
    <T> T deserialize(byte[] bytes, Class<T> clazz);
}
1. JDK序列化
@Slf4j // lombok里面的日志注解
public class JdkSerializer implements Serializer {
    @Override
    public byte[] serialize(Object object) {
        if (object == null) return null;
        try (
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream outputStream = new ObjectOutputStream(baos);
        ) { // try - with写法
            outputStream.writeObject(object);
            byte[] result = baos.toByteArray();
            if(log.isInfoEnabled()){
                log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】",object,result.length);
            }
            return result;
        } catch (IOException e) {
            log.error("序列化对象【{}】时放生异常.",object);
            throw new SerializeException(e);
        }
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        if(bytes == null || clazz == null) return null;
        try (
            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
            ObjectInputStream objectInputStream = new ObjectInputStream(bais);
        ) {
            Object object = objectInputStream.readObject();
            if(log.isInfoEnabled()){
                log.info("类【{}】已经完成了反序列化操作.",clazz);
            }
            return (T)object;
        } catch (IOException | ClassNotFoundException e) {
            log.error("反序列化对象【{}】时放生异常.",clazz);
            throw new SerializeException(e);
        }
    }
}
2.JSON序列化

这里就用fastjson了

@Slf4j
public class JsonSerializer implements Serializer {
    @Override
    public byte[] serialize(Object object) {
        if (object == null) return null;
        byte[] result = JSON.toJSONBytes(object);
        if (log.isInfoEnabled()) {
            log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】", object, result.length);
        }
        return result;
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        if (bytes == null || clazz == null) return null;
        T t = JSON.parseObject(bytes, clazz);
        if (log.isInfoEnabled()) {
            log.info("类【{}】已经完成了反序列化操作.", clazz);
        }
        return t;
    }
}
3.Hessian序列化

Hessian是一个轻量级的、基于HTTP的RPC(远程过程调用)框架,由Resin开源提供。它使用一个简单的、基于二进制的协议来序列化对象,并通过HTTP进行传输。Hessian的设计目标是提供一种高效、可靠且易于使用的远程服务调用机制。

maven依赖

<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>版本号</version>  <!-- 4.0.66 -->
</dependency>
@Slf4j
public class HessianSerializer implements Serializer {
    @Override
    public byte[] serialize(Object object) {
        if (object == null) return null;
        try (
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ) {
            Hessian2Output hessian2Output = new Hessian2Output(baos);
            hessian2Output.writeObject(object);
            hessian2Output.flush();
            byte[] result = baos.toByteArray();
            if(log.isInfoEnabled())
                log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】",object,result.length);
            return result;
        } catch (IOException e) {
            log.error("使用hessian进行序列化对象【{}】时放生异常.",object);
            throw new SerializeException(e);
        }
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        if(bytes == null || clazz == null) return null;
        try (
            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        ) {

            Hessian2Input hessian2Input = new Hessian2Input(bais);
            T t = (T) hessian2Input.readObject();
            if(log.isInfoEnabled()
                log.info("类【{}】已经使用hessian完成了反序列化操作.",clazz);
            return t;
        } catch (IOException  e) {
            log.error("使用hessian进行反序列化对象【{}】时发生异常.",clazz);
            throw new SerializeException(e);
        }
    }
}
4.序列化工厂

转念一想嚯,我们这是实现一个框架额,肯定是要有对外扩展的能力的,那么,我们就将序列化的所有方式默认加载到内存中去,通过一个工厂类来获取指定的序列化方法就可以了,然后暴露添加其他序列化方法的接口。

/**
 * @version 1.0
 * @Author txf
 * @Date 2025/2/10 15:17
 * @注释 序列化工厂
 */
@Slf4j
public class SerializerFactory {
    private final static ConcurrentHashMap<String, ObjectWrapper<Serializer>> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);
    private final static ConcurrentHashMap<Byte, ObjectWrapper<Serializer>> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);

    static {
        ObjectWrapper<Serializer> jdk = new  ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer());
        ObjectWrapper<Serializer> json = new  ObjectWrapper<>((byte) 2, "json", new JsonSerializer());
        ObjectWrapper<Serializer> hessian = new  ObjectWrapper<>((byte) 3, "hessian", new HessianSerializer());
        SERIALIZER_CACHE.put("jdk",jdk);
        SERIALIZER_CACHE.put("json",json);
        SERIALIZER_CACHE.put("hessian",hessian);

        SERIALIZER_CACHE_CODE.put((byte) 1, jdk);
        SERIALIZER_CACHE_CODE.put((byte) 2, json);
        SERIALIZER_CACHE_CODE.put((byte) 3, hessian);
    }

    /**
     * 使用工厂方法获取一个SerializerWrapper
     * @param serializeType 序列化的类型
     * @return SerializerWrapper
     */
    public static  ObjectWrapper<Serializer> getSerializer(String serializeType) {
        ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE.get(serializeType);
        if(serializerWrapper == null){
            log.error("未找到您配置的【{}】序列化工具,默认选用jdk的序列化方式。",serializeType);
            return SERIALIZER_CACHE.get("jdk");
        }

        return SERIALIZER_CACHE.get(serializeType);
    }

    public static  ObjectWrapper<Serializer> getSerializer(Byte serializeCode) {
        ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE_CODE.get(serializeCode);
        if(serializerWrapper == null){
            log.error("未找到您配置的【{}】序列化工具,默认选用jdk的序列化方式。",serializeCode);
            return SERIALIZER_CACHE.get("jdk");
        }
        return SERIALIZER_CACHE_CODE.get(serializeCode);
    }

    /**
     * 新增一个新的序列化器
     * @param serializerObjectWrapper 序列化器的包装
     */
    public static void addSerializer(ObjectWrapper<Serializer> serializerObjectWrapper){
        SERIALIZER_CACHE.put(serializerObjectWrapper.getName(),serializerObjectWrapper);
        SERIALIZER_CACHE_CODE.put(serializerObjectWrapper.getCode(),serializerObjectWrapper);
    }
}

压缩

压缩是指通过特定的算法对数据进行处理,减少数据的存储空间或传输带宽。在 RPC 中,对序列化后的字节流进行压缩可以进一步减少数据传输量,提高传输效率。说白了,就是传的少了。RPC嘛,将性能追求到极致!!!!

设计方式同序列化一样的。

public interface Compressor {
    // 序列化后的字节数组压缩
    byte[] compress(byte[] bytes);
    // 解压缩
    byte[] decompress(byte[] bytes);
}

Gzip压缩

@Slf4j
public class GzipCompressor implements Compressor {
    @Override
    public byte[] compress(byte[] bytes) {
        try (
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);
        ) {
            gzipOutputStream.write(bytes);
            gzipOutputStream.finish();
            byte[] result = baos.toByteArray();
            if(log.isInfoEnabled())
                log.info("对字节数组进行了压缩长度由【{}】压缩至【{}】.",bytes.length,result.length);
            return result;
        } catch (IOException e){
            log.error("对字节数组进行压缩时发生异常",e);
            throw new CompressException(e);
        }
    }
    @Override
    public byte[] decompress(byte[] bytes) {
        try (
            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
            GZIPInputStream gzipInputStream = new GZIPInputStream(bais);
        ) {
            byte[] result = gzipInputStream.readAllBytes();
            if(log.isInfoEnabled())
                log.info("对字节数组进行了解压缩长度由【{}】变为【{}】.",bytes.length,result.length);
            return result;
        } catch (IOException e){
            log.error("对字节数组进行压缩时发生异常",e);
            throw new CompressException(e);
        }
    }
}

压缩工厂就不在这里写了。

导读部分结束了。。