为了加深对 RPC 框架的理解,自己动手做了个简单的 RPC 框架,名字随便起个,就叫 lsf 吧。
lsf GitHub 地址:https://github.com/buyulian/lsf
目录
1、注册中心、消费者、生产证 spring bean标签定义
一、整体架构
二、各模块含义
三、提供方demo
1、引入客户端 jar 包
<dependency>
<groupId>com.me</groupId>
<artifactId>lsf-client</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2、api 包定义
3、 接口实现
4、提供者 spring bean 配置
<lsf:registry id="registry" host="127.0.0.1" port="25000"/>
<lsf:provider id="helloWorldServiceLsf" alias="test"
interface="com.me.lsf.provider.api.HelloWorldService"
registry="registry"
ref="helloWorldService"/>
5、启动类
四、调用方 demo
1、引入客户端 jar 包和提供者的 api 包
<dependency>
<groupId>com.me</groupId>
<artifactId>lsf-provider-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.me</groupId>
<artifactId>lsf-client</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2、消费者 spring bean 配置
<lsf:registry id="registry" host="127.0.0.1" port="25000"/>
<lsf:consumer id="helloWorldService"
interface="com.me.lsf.provider.api.HelloWorldService"
registry="registry"
alias="test"/>
3、启动类
五、具体实现
1、注册中心、消费者、生产证 spring bean标签定义
(1) lsf.xsd 文件
<?xml version="1.0" encoding="UTF-8" ?>
<schema xmlns="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://www.me.com/schema/lsf"
elementFormDefault="qualified">
<element name="registry">
<complexType>
<attribute name="id" type="string"/>
<attribute name="host" type="string"/>
<attribute name="port" type="string"/>
</complexType>
</element>
<element name="provider">
<complexType>
<attribute name="id" type="string"/>
<attribute name="alias" type="string"/>
<attribute name="interface" type="string"/>
<attribute name="ref" type="string"/>
<attribute name="registry" type="string"/>
</complexType>
</element>
<element name="consumer">
<complexType>
<attribute name="id" type="string"/>
<attribute name="alias" type="string"/>
<attribute name="interface" type="string"/>
<attribute name="registry" type="string"/>
</complexType>
</element>
</schema>
(2) 标签解析
2、核心数据结构
(1) rpc参数
public class RpcParam {
/**
* 调用类
*/
private String rClass;
/**
* 调用方法
*/
private String method;
/**
* 参数列表
*/
private String[] args;
/**
* 序列化方式
*/
private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
}
(2) 消费者bean
public class Consumerbean {
/**
* 调用接口
*/
private Class interfaceClass;
/**
* 调用接口名
*/
private String interfaceName;
/**
* 别名
*/
private String alias;
/**
* 预留
*/
private Boolean register;
/**
* 存活生产者连接
*/
private List<LsfConnection> aliveConnectionList;
/**
* 手工指定生产者连接
*/
private List<LsfConnection> fixedConnectionList;
/**
* 父对象
*/
private ParentObject parentObject;
/**
* 序列化方式
*/
private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
/**
* 注册中心 bean
*/
private RegistryBean registryBean;
}
3、核心处理逻辑
(1) 消费核心处理逻辑
public class ConsumerBeanInvocationHandler implements InvocationHandler {
private static Logger logger = LoggerFactory.getLogger(ConsumerBeanInvocationHandler.class);
private Consumerbean consumerbean;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//获取调用的类
Class tClass = consumerbean.getInterfaceClass();
String canonicalName = tClass.getCanonicalName();
String methodName = method.getName();
ParentObject parentObject = consumerbean.getParentObject();
boolean isNoRpc = isNoRpc(methodName, parentObject);
if (isNoRpc) {
return method.invoke(parentObject, args);
}
logger.debug("执行了 rpc 调用, class {}, method {}, args {}",canonicalName, methodName, Arrays.toString(args));
//获取序列化方式
String serializeType = consumerbean.getSerializeType();
//组装 rpc 参数
RpcParam rpcParam = getRpcParam(args, canonicalName, methodName, serializeType, method);
//获取可用的生产者连接
LsfConnection connection = consumerbean.getConnection();
//调用并得到字符串结果
String rpcResponseParamStr = getBody(rpcParam, connection);
RpcResponseParam rpcResponseParam = JSON.parseObject(rpcResponseParamStr, RpcResponseParam.class);
if (ErrorCodeEnum.SUCCESS.getCode().equals(rpcResponseParam.getCode())) {
//获取序列化处理类
LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
//反序列化结果
Object result = lsfSerialize.deSerializeResult(method, rpcResponseParam.getResult());
return result;
} else {
//生产者抛出的异常处理
throw new RuntimeException(rpcResponseParam.getException());
}
}
private String getBody(RpcParam rpcParam, LsfConnection connection) {
LsfClient client = LsfHttpClientFactory.getClient();
String host = connection.getHost();
int port = connection.getPort();
ClientParam clientParam = new ClientParam();
clientParam.setHost(host);
clientParam.setPort(port);
clientParam.setUrl("/");
String rpcBody = JSON.toJSONString(rpcParam);
clientParam.setBody(rpcBody);
// netty 执行网络调用
return client.post(clientParam);
}
private RpcParam getRpcParam(Object[] args, String canonicalName, String methodName, String serializeType, Method method) {
RpcParam rpcParam = new RpcParam();
rpcParam.setrClass(canonicalName);
rpcParam.setMethod(methodName);
rpcParam.setSerializeType(serializeType);
//获取序列化方式
LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
//序列化参数
String[] argsStrs = lsfSerialize.serializeParam(method, args);
rpcParam.setArgs(argsStrs);
return rpcParam;
}
private boolean isNoRpc(String methodName, ParentObject parentObject) {
boolean isNoRpc = false;
Method[] declaredMethods = parentObject.getClass().getDeclaredMethods();
for (Method declaredMethod : declaredMethods) {
if (declaredMethod.getName().equals(methodName)) {
isNoRpc = true;
break;
}
}
return isNoRpc;
}
}
(2) 生产者核心处理逻辑
public static String dealRequest(String body) {
logger.info("center asyncDeal request {}",body);
//解析 rpc 调用参数
RpcParam rpcParam = JSON.parseObject(body, RpcParam.class);
String rClassStr = rpcParam.getrClass();
Object provider = getProvider(rClassStr);
if (provider == null) {
throw new RuntimeException("没有 对应的 provider");
}
String method = rpcParam.getMethod();
String[] argsStr = rpcParam.getArgs();
Class<?> aClass = provider.getClass();
String resultStr = "error";
RpcResponseParam rpcResponseParam = new RpcResponseParam();
try {
Method declaredMethod = null;
//通过反射获取rpc调用的方法
Method[] declaredMethods = aClass.getDeclaredMethods();
for (Method declaredMethod1 : declaredMethods) {
if (declaredMethod1.getName().equals(method)) {
declaredMethod = declaredMethod1;
break;
}
}
if (declaredMethod == null) {
throw new RuntimeException("没有这个方法 " + method);
}
//获取序列化实现类
LsfSerialize lsfSerialize = LsfSerializeFactory.get(rpcParam.getSerializeType());
//反序列参数
Object[] inArgs = lsfSerialize.deSerializeParam(declaredMethod, argsStr);
//调用实现类
Object result = declaredMethod.invoke(provider, inArgs);
//序列化执行结果
String result1 = lsfSerialize.serializeResult(declaredMethod, result);
rpcResponseParam.setCode(ErrorCodeEnum.SUCCESS.getCode());
rpcResponseParam.setResult(result1);
} catch (Exception e) {
//若原始方法发生异常,则封装异常信息并返回给消费者
rpcResponseParam.setCode(ErrorCodeEnum.EXCEPTION.getCode());
rpcResponseParam.setException(e.toString());
logger.error("lsf rpc exception rpc param {}", JSON.toJSONString(rpcParam), e);
}
resultStr = JSON.toJSONString(rpcResponseParam);
logger.info("center asyncDeal result {}", resultStr);
return resultStr;
}
(3) 序列化接口定义
对扩展开放。新的的序列化方式可通过实现这个接口,并注册到序列化工厂里去实现。
public interface LsfSerialize {
String[] serializeParam(Method method, Object[] args);
Object[] deSerializeParam(Method method, String[] contents);
String serializeResult(Method method, Object result);
Object deSerializeResult(Method method, String content);
}
(4) FastJson autoType 序列化实现
public class JsonAutoTypeSerialize implements LsfSerialize {
private static Logger logger = LoggerFactory.getLogger(JsonAutoTypeSerialize.class);
{
ParserConfig.getGlobalInstance().setSafeMode(false);
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
}
@Override
public String[] serializeParam(Method method, Object[] args) {
String[] argsStrs = null;
if (args != null) {
argsStrs = new String[args.length];
for (int i = 0; i < args.length; i++) {
argsStrs[i] = JSON.toJSONString(args[i], SerializerFeature.WriteClassName);
}
}
return argsStrs;
}
@Override
public Object[] deSerializeParam(Method method, String[] contents) {
Class<?>[] parameterTypes = method.getParameterTypes();
return getInArgs(contents, parameterTypes, method);
}
private Object[] getInArgs(String[] strs, Class<?>[] parameterTypes, Method method) {
if (strs == null) {
return null;
}
Type[] genericParameterTypes = method.getGenericParameterTypes();
Object[] inArgs = new Object[strs.length];
for (int i = 0; i < parameterTypes.length; i++) {
Class<?> parameterType = parameterTypes[i];
inArgs[i] = getObjectSuper(strs[i], parameterType, genericParameterTypes[i]);
}
return inArgs;
}
@Override
public String serializeResult(Method method, Object result) {
return JSON.toJSONString(result, SerializerFeature.WriteClassName);
}
@Override
public Object deSerializeResult(Method method, String content) {
Type genericReturnType = method.getGenericReturnType();
Class<?> returnType = method.getReturnType();
return getObjectSuper(content, returnType, genericReturnType);
}
private Object getObjectSuper(String content, Class<?> returnType, Type genericReturnType) {
Object result = JSON.parseObject(content, returnType);
return result;
}
}
六、代码启动方式
先启动注册中心,然后启动生产者,最后启动消费者。