13-netty基础-手写rpc-消费方生成代理-05

发布于:2025-08-10 ⋅ 阅读:(16) ⋅ 点赞:(0)

 netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 功能逻辑

在客户端启动的时候要为添加了BonnieRemoteReference注解的属性生成一个代理类;代理类的主要功能:在spring容器加载完BeanDefinition之后,在Bean初始化之前,触发生成代理类。
逻辑:

  1. 获取到所有的BeanDefinition
  2. 拿到BeanDefinition对应的class
  3. 遍历class下的所有被BonnieRemoteReference修饰的属性(成员变量)
  4. 为被BonnieRemoteReference修饰的属性,使用BeanDefinitionBuilder构建BeanDefinition,设置interfaceClass、serviceAddress、servicePort属性,并放入到spring容器中,对象的类型为SpringRpcReferenceBean;
  5. SpringRpcReferenceBean实现FactoryBean接口,然后在getObject中返回代理对象。
  6. 编写NettyClient代码

补充:

Spring 的 FactoryBean 是一个工厂 bean 接口,用于自定义 bean 的创建逻辑。它的核心作用是:

  • 当容器获取该 bean 时(如 getBean("xxx")),实际返回的是 getObject() 方法创建的对象,而非 SpringRpcReferenceBean 自身实例。
  • 常用于创建复杂对象(如远程服务代理、数据库连接池等)

2 重点代码介绍

2.1 触发生成代理类入口代码

在spring容器加载BeanDefinition之后,在Bean初始化之前执行,实现接口BeanFactoryPostProcessor接口中postProcessBeanFactory方法即可
 

获取所有的beanDefinitionNames
String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();

获取beanClassName对应的类信息
Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);

获取clazz上的所有属性(成员变量)
ReflectionUtils.doWithFields(clazz, this::parseRpcReference);

当前这个field是否被BonnieRemoteReference注解修饰
BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);

生成SpringRpcReferenceBean的BeanDefinition
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class)
放入属性,远程调用中需要的内容,比如是那个类,以及地址端口信息
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);

放入到spring容器中
registry.registerBeanDefinition(entry.getKey(), entry.getValue());

package com.bonnie.protocol.spring.reference;

import com.bonnie.protocol.annotation.BonnieRemoteReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {

    private ApplicationContext applicationContext;
    private ClassLoader classLoader;
    //保存发布的引用bean的信息
    private final Map<String, BeanDefinition> rpcRefBeanDefinitionMap = new ConcurrentHashMap<>();
    private RpcClientProperties rpcClientProperties;

    public SpringRpcReferencePostProcessor(RpcClientProperties rpcClientProperties) {
        this.rpcClientProperties = rpcClientProperties;
    }

    /**
     * 实现postProcessBeanFactory方法,spring容器加载了bean的定义文件之后, 在bean实例化之前执行
     * 1、将类型的存在的BonnieRemoteReference注解的属性,构造BeanDefinition放在容器中,beanName是类的全限定名, BeanDefinition(类的全限定名,客户端IP,客户端端口号)
     * @param beanFactory
     * @throws BeansException
     */
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        // 获取到所有的beanDefinition
        String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();
        // 遍历
        for (String beanDefinitionName : beanDefinitionNames) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
            String beanClassName = beanDefinition.getBeanClassName();
            if (Objects.nonNull(beanClassName)) {
                // 获取到这个类的所有field
                Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);
                // 该方法遍历class对象中的所有的field属性,并且作为参数传入到parseRpcReference方法中
                ReflectionUtils.doWithFields(clazz, this::parseRpcReference);
            }
        }

        // 将生成的BeanDefinition放入到容器中
        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
        Set<Map.Entry<String, BeanDefinition>> entries = this.rpcRefBeanDefinitionMap.entrySet();
        for (Map.Entry<String, BeanDefinition> entry : entries) {
            if (applicationContext.containsBean(entry.getKey())) {
                log.warn("SpringContext already register bean {}", entry.getKey());
            } else {
                registry.registerBeanDefinition(entry.getKey(), entry.getValue());
                log.info("registered RpcReferenceBean {} success", entry.getKey());
            }
        }
    }

    private void parseRpcReference(Field field) {
        // 当前这个field是否被BonnieRemoteReference注解修饰
        BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);
        // BonnieRemoteReference注解修饰
        if (Objects.nonNull(remoteReference)) {
            BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
            builder.addPropertyValue("interfaceClass", field.getType());
            builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
            builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
            BeanDefinition beanDefinition = builder.getBeanDefinition();
            rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);
        }
    }

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

}

2.2 生成代理类代码

上面会被BonnieRemoteReference修饰的属性(Field)为生成SpringRpcReferenceBean对象,并添加相关的属性。

实现FactoryBean接口,当spring获取SpringRpcReferenceBean对象的时候,调用的就是里面的getObject对象,在getObject里面生成一个代理类,即可代理被BonnieRemoteReference修饰的类。

package com.bonnie.protocol.spring.reference;

import lombok.Setter;
import org.springframework.beans.factory.FactoryBean;

import java.lang.reflect.Proxy;

/**
 * 创建SpringRpcReferenceBean的代理对象
 */
@Setter
public class SpringRpcReferenceBean implements FactoryBean<Object> {

    private String serviceAddress;
    private Integer servicePort;
    private Class<?> interfaceClass;

    /**
     * 返回由工厂创建的目标Bean实例
     * @return
     * @throws Exception
     */
    @Override
    public Object getObject() throws Exception {
        System.out.println("代理类 serviceAddress "+serviceAddress);
        System.out.println("代理类 servicePort "+servicePort);
        System.out.println("代理类 interfaceClass "+interfaceClass);
        // 为BonnieRemoteReference生成一个代理类
        return Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new RpcInvokerProxy(serviceAddress, servicePort));
    }

    /**
     * 返回目标Bean的类型
     * @return
     */
    @Override
    public Class<?> getObjectType() {
        return this.interfaceClass;
    }

}

2.3 代理类handler

这块主要是在发生rpc调用的时候,组装请求信息,并通过nettyClient向服务端发起连接并且发送请求。

package com.bonnie.protocol.spring.reference;

import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.enums.SerialTypeEnum;
import com.bonnie.protocol.netty.NettyClient;
import io.netty.channel.DefaultEventLoop;
import io.netty.util.concurrent.DefaultPromise;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class RpcInvokerProxy implements InvocationHandler {

    private String host;
    private Integer port;

    public RpcInvokerProxy(String host, Integer port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        /**
         * 构建发送的请求报文,首先去创建RequestHold类,在这个类定义一个原子自增的RequestId,
         * 在一个就是每次请求都会有结果,那么请求id和结果的关系要有一个映射关系
         */
        RpcProtocol<RpcRequest> reqProtocol = new RpcProtocol<>();
        long requestId = RequestHolder.REQUEST_ID.incrementAndGet();
        System.out.println("生成的requestId:" + requestId);

        Header header = new Header();
        header.setMagic(RpcConstant.MAGIC);
        header.setSerialType(SerialTypeEnum.JAVA_SERIAL.getCode());
        header.setReqType(ReqTypeEnum.REQUEST.getCode());
        header.setRequestId(requestId);
        header.setLength(0);

        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParams(args);
        rpcRequest.setParameterTypes(method.getParameterTypes());

        reqProtocol.setHeader(header);
        reqProtocol.setContent(rpcRequest);
        // 发起远程调用
        NettyClient nettyClient = new NettyClient(host, port);
        System.out.println("代理发送到服务端请求内容:" + JSONObject.toJSONString(reqProtocol));
        // new DefaultEventLoop(),是用来去执行监听器的
        RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<RpcResponse>(new DefaultEventLoop()));
        // 在发起请求之前,添加映射关系到map中
        RequestHolder.REQUEST_MAP.put(header.getRequestId(), future);
        // 客户端发送数据
        nettyClient.sendRequest(reqProtocol);
        // 通过promise,异步等待服务端发送数据来,不然就会一直在此等待
        // get方法得到的是RpcResponse类,然后调用getData方法获取到数据
        return future.getPromise().get().getData();
    }
}

2.4 netty客户端代码

这块主要包含创建客户端、向服务端发起连接、发送请求,也会设置前文中自定义编解码、序列化的操作

package com.bonnie.protocol.netty;

import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyClient {

    private final Bootstrap bootstrap;
    private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    private String serviceAddress;
    private Integer servicePort;

    public NettyClient(String serviceAddress, Integer servicePort) {
        log.info("开始初始化NettyClient======");
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        log.info("开始初始化RpcClientInitializer======");
                        ch.pipeline()
                                .addLast(new LoggingHandler())
                                .addLast(new BonnieEncoder())
                                .addLast(new BonnieDecoder())
                                .addLast(new RpcClientHandler())
                        ;
                    }
                });
        this.serviceAddress = serviceAddress;
        this.servicePort = servicePort;
    }

    /**
     * 发送数据
     * @param protocol
     * @throws Exception
     */
    public void sendRequest(RpcProtocol<RpcRequest> protocol) {
        try {
            System.out.println(this.serviceAddress+ "===="+this.servicePort);
            final ChannelFuture channelFuture  = bootstrap.connect(this.serviceAddress, this.servicePort).sync();
            // 注册一个监听器,如果出问题就关闭group
            channelFuture.addListener(listener -> {
                if (channelFuture.isSuccess()) {
                    log.info("connect rpc server {} success.",this.serviceAddress);
                } else {
                    log.error("connect rpc server {} failed. ",this.servicePort);
                    channelFuture.cause().printStackTrace();
                    eventLoopGroup.shutdownGracefully();
                }
            });
            log.info("begin transfer data");
            // 向服务端发送数据
            channelFuture.channel().writeAndFlush(protocol);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

2.5 netty客户端接收服务端响应数据

package com.bonnie.protocol.netty;

import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.spring.reference.RequestHolder;
import com.bonnie.protocol.spring.reference.RpcFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {

    /**
     * 接收服务端响应数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {
        long requestId = msg.getHeader().getRequestId();
        log.info("接收服务端响应的结果====== requestId {} {}", requestId, JSONObject.toJSONString(msg));
        // 删除映射关系
        RpcFuture<RpcResponse> future = RequestHolder.REQUEST_MAP.remove(requestId);
        // 我们之前说异步等待服务端发送数据过来,那么只要服务端发送数据过来,就会调用管道RpcClentHandler的read方法
        // 那么当初future.getPromise().get()如果不再阻塞获取数据呢?就是通过给Promise中的Success设置值,同时会唤醒阻塞的线程
        // 一当唤醒线程, future.getPromise().get()就会不再阻塞,就获取到服务端返回的数据
        future.getPromise().setSuccess(msg.getContent());
    }

}


网站公告

今日签到

点亮在社区的每一天
去签到