C++实现分布式网络通信框架RPC(3)--rpc调用端

发布于:2025-06-11 ⋅ 阅读:(27) ⋅ 点赞:(0)

目录

一、前言

二、UserServiceRpc_Stub

三、 CallMethod方法的重写

头文件

实现

四、rpc调用端的调用

实现

五、 google::protobuf::RpcController *controller

头文件

实现

六、总结


一、前言

在前边的文章中,我们已经大致实现了rpc服务端的各项功能代码,接下来我们就来看看,如果一个rpc调用端想要调用都要干什么。

二、UserServiceRpc_Stub

我们在预备知识就提到过我们通过在 .proto 文件中使用service关键字定义了用来描述rpc方法的类型后

 service UserServiceRpc
 {
    rpc Login(LoginRequest) returns(LoginResponse);
 }

使用 protoc 编译后会自动生成两个服务类,分别是 UserServiceRpc 、UserServiceRpc_Stub

UserServiceRpc是继承自Service这个类的,我们讲到过这个类是在rpc发布端使用的,而UserServiceRpc_Stub是继承自UserServiceRpc的,这个类是在rpc调用端使用的。

可以看到 UserServiceRpc_Stub这个类是没有默认构造函数的,想要构造这个类它必须传入  ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel

我们再来看看这个RpcChannel是什么

class PROTOBUF_EXPORT RpcChannel {
 public:
  inline RpcChannel() {}
  virtual ~RpcChannel();

  // Call the given method of the remote service.  The signature of this
  // procedure looks the same as Service::CallMethod(), but the requirements
  // are less strict in one important way:  the request and response objects
  // need not be of any specific class as long as their descriptors are
  // method->input_type() and method->output_type().
  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};

可以看到它是一个抽象类,所以我们肯定还要定义一个类Mprpcchannel来继承这个类并对该类中的CallMethod方法进行重写。

预备知识一节中,我们已经提到了,无论是用service 关键字定义用来描述rpc方法的类型,无论定义几个该服务下的方法,最后对这些方法的调用都最终调用的是channel的CallMethod方法,即是在调用在创建 UserServiceRpc_Stub对象时传入的我们创建的Mrpcchannel中的CallMethod方法,所以我们就对CallMethod方法进行重写,集中来做所有rpc方法调用的参数的序列化和反序列化操作。

三、 CallMethod方法的重写

我们创建一个类Mrpcchannel用来继承UserServiceRpc_Stub类中的RpcChannel类并对类中的 CallMethod方法进行重写,在该方法中我们集中来做所有rpc方法调用的参数的序列化和反序列化操作。

CallMethod方法中需要做的事情:

  1. 将用户发送的消息按照双方协定的消息格式(header_size + header_str + args_str)序列化好。
  2. 通过网络发送,因为我们是客户端是不需要处理高并发的情况的,所以这里我们采用tcp编程就行。
  3. 接下来就是阻塞等待数据的响应。
  4. 最后拿到响应数据后反序列化之后返回给用户。

头文件

//mprpcchannel.h
class MprpcChannel:public google::protobuf::RpcChannel
{
    //所有通过stub代理对象调用的rpc方法,都是走到了这里,统一做rpc方法调用的数据序列化和网络发送
    void CallMethod(const google::protobuf::MethodDescriptor* method,
        google::protobuf::RpcController* controller, 
        const google::protobuf::Message* request,
        google::protobuf::Message* response, 
        google::protobuf::Closure* done);
};
      

实现

//mprpcchannel.cc
/*我们约定好在进行网络传输的时候的规则是 header_size + service_name + method_name + args_size +args*/
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method,
                              google::protobuf::RpcController *controller,
                              const google::protobuf::Message *request,
                              google::protobuf::Message *response,
                              google::protobuf::Closure *done)//在调用端最后一个参数没用
{
     //通过method的service方法得到方法属于的服务类
    const google::protobuf::ServiceDescriptor *sd = method->service();
    std::string service_name = sd->name();    // service_name
    std::string method_name = method->name(); // method_name

    // 获取参数的序列化字符串长度 args_size
    uint32_t args_size = 0;
    std::string args_str;
    if (request->SerializeToString(&args_str))
    {
        // 序列化成功
        args_size = args_str.size();
    }
    else
    {
        controller->SetFailed("Serialize request error!");
        return;
    }

    // 定义rpc的请求header
    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    // 进行序列化
    uint32_t header_size = 0;
    std::string rpc_header_str;
    if (rpcHeader.SerializeToString(&rpc_header_str)) // 将东西设置进rpcHeader后进行序列化
    {
        header_size = rpc_header_str.size();
    }
    else
    {
        controller->SetFailed("Serialize rpc header error!");
        return;
    }

    // 组织待发送的rpc请求字符串
    std::string send_rpc_str;
    send_rpc_str.insert(0, std::string((char *)&header_size, 4));
    send_rpc_str += rpc_header_str;
    send_rpc_str += args_str;

    // 打印调试信息
    std::cout << "==================================================" << std::endl;
    std::cout << "header_size::" << header_size << std::endl;
    std::cout << "rpc_header_str::" << rpcHeader.DebugString() << std::endl;
    std::cout << "service_name::" << service_name << std::endl;
    std::cout << "method_name::" << method_name << std::endl;
    std::cout << "args_size::" << args_size << std::endl;
    std::cout << "args_str::" << args_str << std::endl;
    std::cout << "==================================================" << std::endl;

    // 由于这里是客户端,所以不需要处理高并发的情况,所以这边的网络发送我们采用tcp编程,完成rpc方法的远程调用
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == clientfd)
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "creat socket error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }
    // 将服务器的ip地址和端口号读一下
    // std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    // uint16_t port= atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
    // 之前都是使用上面的方法来获取配置文件中的参数,但是现在我们想调用什么,就去zk上查询该服务所在的host信息
    zkClient zkCli;
    zkCli.Start();
    std::string method_path = "/" + service_name + "/" + method_name;
    std::string host_data = zkCli.GetData(method_path.c_str());
    if (host_data == "")
    {
        controller->SetFailed(method_name + "is not exist!");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1)
    {
        controller->SetFailed(method_path + "address is invalid!");
         return;
    }
    std::string ip = host_data.substr(0, idx);
    uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());

    struct sockaddr_in server_addr;
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);

    // 连接rpc服务节点
    if (-1 == connect(clientfd, (struct sockaddr *)&server_addr, sizeof(server_addr)))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "connect error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 发送rpc请求
    if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "send error! errno:%d", errno);
        controller->SetFailed(errtxt);
        // exit(EXIT_FAILURE);这里不能因为没有成功发送而整个的服务全部推出了
        return;
    }
    // 接收rpc端发来的请求响应值
    char recv_buf[1024] = {0};
    int recv_size = 0; // 整个接收的缓冲区中不可能全是发送过来的数据吧,所以记录一下多少数据
    if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "recv error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 接下来就是把接收到的数据填进response中,这样框架就可以通过response知道响应值并返回给用户
    // std::string response_str(recv_buf,0,recv_size);//string有构造函数可以使用recv_buf来初始化,即将recv_buf的从0到recv_size的这一段数据初始化response_str
    // 有bug,recv_buf在遇到\0后面的数据就存不下来了****std::string response_str(recv_buf,recv_size);也可以
    // if(!response->ParseFromString(response_str))反序列化rpc调用的响应数据
    if (!response->ParseFromArray(recv_buf, recv_size))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "parse error! response_str:%d", recv_buf);
        controller->SetFailed(errtxt);
        return;
    }
    close(clientfd);
}

我们在调用rpc服务的时候, 我们用的是相应的描述rpc的服务的 UserServiceRpc_Stub这个类,在使用这个类的时候,我们需要传入一个Rpcchannel这个类

UserServiceRpc_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);

然后在这个UserServiceRpc_Stub这个类中调用的所有的 rpc 方法,最终都跑到了调用channel的CallMethod方法下。举个例子如下

UserServiceRpc_Stub stub(new MprpcChannel());
stub.Login(nullptr,&request,&response,nullptr);

像这样的通过stub调用的Login方法都是调用的channel的CallMethod方法。

四、rpc调用端的调用

在这里我们需要考虑如何去发起一个rpc调用,去调用某个服务下的某个方法,下面都以UserServiceRpc服务下的Login方法为例,我们同样要定义一个与发布端相同的 user.proto文件。然后使用 protoc 编译,接着

  1. 对调用的rpc框架进行初始化操作
  2. 实例化出一个代理对象  fixbug::UserServiceRpc_Stub stub;后面都会通过stub来调用rpc方法。
  3. 填写请求方法的参数
  4. 用stub代理调用Login方法
  5. 等待读取返回值。

实现

//calleruserservice.cc
int main(int argc,char* argv[])
{
    //先调用框架的初始化类且全局只调用这一次
    //MprpcApplication& myin= MprpcApplication::GetInstance();
    //myin.Init(argc,argv);它是静态函数,也就是说它和具体对象无关,可以直接通过类名调用。
    //因为你拿到对象后并没有使用它的状态,只是用来调用静态函数。下面的方法就做到了单例懒加载,且只初始化执行一次
    MprpcApplication::Init(argc,argv);

    //演示调用远程发布rpc的方法Login,UserServiceRpc_Stub是专门用来协助rpc客户端的
    fixbug::UserServiceRpc_Stub stub(new MprpcChannel());//生成一个代理对象,以后通过stub来调用rpc方法

    //rpc方法的请求参数
    fixbug::LoginRequest request;//用户端发起调用,request肯定是这边给
    request.set_name("zhangsan");
    request.set_pwd("123456");

    //rpc方法的响应
    fixbug::LoginResponse response;

    //发起rpc方法的调用,下面是同步的rpc方法调用过程,即它的底层是MprpcChannel::callMethod
    stub.Login(nullptr,&request,&response,nullptr);//集中来做rpc方法调用的参数的序列化和网络发送了

    //走到这里表示一次rpc调用完成,读取调用的结果
    if(0 == response.result().errcode())
    {
        std::cout<<"rpc login response success:"<<response.success()<<std::endl;
    }
    else
    {
        std::cout<<"rpc login response error:"<<response.result().errmsg()<<std::endl;
    }

    //演示调用远程发布的rpc方法Register
    fixbug::RegisterRequset req;
    req.set_id(2000);
    req.set_name("lisi");
    req.set_pwd("12314");

    fixbug::RegisterResponse rsp;

    //以同步的方式发起rpc调用请求,等待返回结果
    stub.Register(nullptr,&req,&rsp,nullptr);
    if(0 == rsp.result().errcode())
    {
        std::cout<<"rpc register response success:"<<rsp.success()<<std::endl;
    }
    else
    {
        std::cout<<"rpc register response error:"<<rsp.result().errmsg()<<std::endl;
    }

    return 0;
}

五、 google::protobuf::RpcController *controller

前面还有最后一个坑需要填上,google::protobuf::RpcController *controller是个什么东西呢?

我们先不着急回答这个问题,我们思考一下在  calleruserservice.cc 中的代码有没有什么问题?

我们是在使用stub对象发起调用Login方法后,就开始等待着读取返回的响应了,但是我们在CallMethod方法中也看到了,在许多地方失败时都会有return直接返回的情况,比如反序列化失败,套接字创建失败等,所以这里一旦失败,直接返回。我们还有必要进行后面的工作吗,即访问响应response,此时还没有response呢。所以我们需要在这里得到一些控制信息。

我们可以看到调用的这个Login方法的第一个参数就是google::protobuf::RpcController *controller

从他的名字就可以猜到它可以存储一些控制信息,让我们清楚地知道当前rpc调用处于什么状态。

下面是 RpcController类,可以看到它的成员函数都是虚函数,它是一个抽象类,无法实例化出对象,所以还需要我们创建一个类来继承它,并且重写它的成员函数。

也就是说这个 RpcController可以携带我们rpc调用过程中的一些信息,所以接下来我们创建类继承它并对方法进行重写。

头文件

//mprpccontroller.h

class MprpcController : public google::protobuf::RpcController
{
    public:
        MprpcController();
        void Reset();
        bool Failed() const;
        std::string ErrorText() const;
        void SetFailed(const std::string& reason);

        //对于这三个方法目前未实现具体的功能,所以暂时不用
        void StartCancel();
        bool IsCanceled() const;
        void NotifyOnCancel(google::protobuf::Closure* callback);
    private:
        bool m_failed;//RPC方法执行过程中的状态
        std::string m_errText;//RPC方法执行过程中的错误信息

};  

实现

//mprpccontroller.cc
MprpcController::MprpcController()
{
    m_failed=false;
    m_errText="";
}

void MprpcController::Reset()//重置成刚开始的样子
{
    m_failed=false;
    m_errText="";
}
bool MprpcController::Failed() const//判断当前调用成功与否,返回状态,true就是发生问题了
{
    return m_failed;
}
std::string MprpcController::ErrorText() const//返回错误信息
{
    return m_errText;
}

void MprpcController::SetFailed(const std::string&reason)//设置错误
{
    m_failed=true;
    m_errText=reason;
}

//对于我们目前尚未实现的功能,我们实现成空函数就可以了
void MprpcController::StartCancel(){};
bool MprpcController::IsCanceled() const{};
void MprpcController::NotifyOnCancel(google::protobuf::Closure* callback){};

 定义很简单,使用方法就是在 mprpcchannel.cc 中的使用,也比较简单。

之前我们在 calleruserservice.cc文件中使用stub调用Login方法时,第一个参数传的是空指针,现在传controller就行了

MprpcController controller;//实例化出一个控制对象
stub.Login(&controller,&request,&response,nullptr);

这样在 return之前就可以知道rpc调用过程中的状态了,后面我们就要这样子调用了

if (controller.Failed())
{
    // 表示出问题
    std::cout << controller.ErrorText() << std::endl;
}
else
{
    if (0 == response.result().errcode())
    {
        std::cout << "rpc login response success:" << response.success() << std::endl;
    }
    else
    {
        std::cout << "rpc login response error:" << response.result().errmsg() << std::endl;
    }
}

六、总结

到这里我们基本已经实现了整个框架,包括rpc发布端rpc方法的发布和rpc调用端的调用。后面我们还要给整个框架添加必不可少的一个模块——日志模块,还有zookeeper在本项目上的应用。见下一文!


感谢阅读!