GitHub推荐C++项目:基于muduo、protobuf、zookeeper实现RPC框架

发布于:2025-02-15 ⋅ 阅读:(16) ⋅ 点赞:(0)

项目地址: GitHub - attackoncs/rpc: 基于muduo、protobuf、zookeeper的rpc实现

GitHub - terryup/rpc: 基于muduo+protobuf+zookeeper的一个rpc分布式网络通信框架

因为这个项目在GitHub有多个链接,老廖综合了各家的资料并做了整理.

C++学到什么程度可以就业-以C++RPC项目(可写简历)为例讲解 整理, 更多项目资料可以点击查看.

1 分布式网络通信rpc框架

1.1 快速开始

安装zookeeper、muduo、protobuf、cmake,

参考 第2章节 <<环境搭建>>

1.2 分布式网络通信rpc框架

项目是分布式网络通信rpc框架(项目源代码链接博文中提到单机服务器的缺点:

  1. 硬件资源的限制影响并发:受限于硬件资源,聊天服务器承受的用户的并发有限

  2. 模块的编译部署难:任何模块小的修改,都导致整个项目代码重新编译、部署

  3. 模块对硬件资源的需求不同:各模块是CPU或IO密集型,各模块对资源需求不同

尽管集群服务器可以扩展硬件资源,提高用户的并发,但缺点2和3仍存在,此时就引出分布式服务器,分布式系统中系统由“微服务”组成,常用RPC(remote procedure call)解决分布式系统中微服务之间的调用问题(当然基于HTTP的restful形式的广义远程调用也可,这暂时不提),简言之就是开发者能像调用本地方法一样调用远程服务。

1.3 RPC概述

RPC调用过程

完整的RPC过程如下图:

远程调用需传递服务对象、函数方法、函数参数,经序列化成字节流后传给提供服务的服务器,服务器接收到数据后反序列化成服务对象、函数方法、函数参数,并发起本地调用,将响应结果序列化成字节流,发送给调用方,调用方接收到后反序列化得到结果,并传给本地调用。

序列化和反序列化

  • 序列化:对象转为字节序列称为对象的序列化

  • 反序列化:字节序列转为对象称为对象的反序列化

常见序列化和反序列化协议有XML、JSON、PB,相比于其他PB更有优势:跨平台语言支持,序列化和反序列化效率高速度快,且序列化后体积比XML和JSON都小很多,适合网络传输。

XML

JSON

PB

保存方式

文本

文本

二进制

可读性

较好

较好

不可读

解析效率

一般

语言支持

所有语言

所有语言

C++/Java/Python及第三方支持

适用范围

文件存储、数据交互

文件存储、数据交互

文件存储、数据交互

注意:序列化和反序列化可能对系统的消耗较大,因此原则是:远程调用函数传入参数和返回值对象要尽量简单,具体来说应避免:

  • 远程调用函数传入参数和返回值对象体积较大,如传入参数是List或Map,序列化后字节长度较长,对网络负担较大

  • 远程调用函数传入参数和返回值对象有复杂关系,传入参数和返回值对象有复杂的嵌套、包含、聚合关系等,性能开销大

  • 远程调用函数传入参数和返回值对象继承关系复杂,性能开销大

数据传输格式

考虑到可靠性和长连接,因此使用TCP协议,而TCP是字节流协议,因此需自己处理拆包粘包问题,即自定义数据传输格式!如图:

定义protobuf类型的结构体消息RpcHeader,包含服务对象、函数方法、函数参数,因为参数可变,参数长不定,因此不能和RpcHeader一起定义,否则多少函数就有多少RpcHeader,因此需为每个函数定义不同protobuf结构体消息,然后对该结构体消息序列化(字符串形式存储),就得到两个序列化后的二进制字符串,拼接起来就是要发送的消息,同时消息前需记录序列化后的RpcHeader数据的长度,这样才能分开RpcHeader和函数参数的二进制数据,从而反序列化得到函数参数

//消息头
message RpcHeader
{
  bytes service_name = 1;
  bytes method_name = 2;
  uint32 args_size = 3;
}

1.4 框架

RPC通信过程中的代码调用流程图如图:

业务层实现

数据结构定义

以user.proto中的Login函数请求参数为例说明参数结构体定义,其他函数类似:

//Login函数的参数
message LoginRequest
{
  bytes name = 1;
  bytes pwd = 2;
}

使用protoc编译proto文件:

protoc user.proto -I ./ -cpp_out=./user

得到user.pb.cc和user.pb.h,每个message结构体都生成个类,如上面说的LoginRequest生成继承自google::protobuf::Message的LoginRequest类,主要包含定义的私有成员变量及读取设置变量的成员函数,如name对应的name()和set_name()两个读取和设置函数;每个service结构体都生成两个关键类,以user.proto中定义的UserServiceRpc为例,生成继承自google::protobuf::Service的UserServiceRpc和继承自UserServiceRpc的UserServiceRpc_Stub类,前者给服务方callee使用,后者给调用方caller使用,并且都生成Login和Register虚函数,UserServiceRpc类中还包括重要的CallMethod方法

service UserServiceRpc
{
  rpc Login(LoginRequest) returns(LoginResponse);
  rpc Register(RegisterRequest) returns(RegisterResponse);
}

以Login方法为例,Caller调用远程方法Login,Callee中的Login接收LoginRequest消息体,执行完Login后将结果写入LoginResponse消息体,再返回给Caller

caller

调用方将继承自google::protobuf::RpcChannel的MprpcChannel的对象,传入UserServiceRpc_Stub构造函数生成对象stub,设置远程调用Login的请求参数request,并由stub调用成员函数Login一直阻塞等待远程调用的响应,而Login函数实际被传入的channel对象调用CallMethod,在CallMethod中设置controller对象和response对象,前者在函数出错时设置rpc调用过程的状态,后者是远程调用的响应

int main(int argc, char **argv)
{
    // 整个程序启动以后,想使用mprpc框架来享受rpc服务调用,一定需要先调用框架的初始化函数(只初始化一次)
    MprpcApplication::Init(argc, argv);

    // 演示调用远程发布的rpc方法Login
    UserServiceRpc_Stub stub(new MprpcChannel());
    // rpc方法的请求参数
    LoginRequest request;
    request.set_name("zhang san");
    request.set_pwd("123456");
    // rpc方法的响应
    LoginResponse response;
    // 发起rpc方法的调用  同步的rpc调用过程  MprpcChannel::callmethod
    MprpcController controller;
    stub.Login(&controller, &request, &response, nullptr); // RpcChannel->RpcChannel::callMethod 集中来做所有rpc方法调用的参数序列化和网络发送
    // 一次rpc调用完成,读调用的结果
    if (controller.Failed())
    {
        std::cout << controller.ErrorText() << std::endl;
    }
    else
    {
        // 一次rpc调用完成,读调用的结果
        if (0 == response.result().errcode())
        {
            std::cout << "rpc login response success:" << response.sucess() << std::endl;
        }
        else
        {
            std::cout << "rpc login response error : " << response.result().errmsg() << std::endl;
        }
    }
}

所有通过stub代理对象调用的rpc方法,通过C++多态最终都会通过调用CallMethod实现,该函数首先序列化并拼接发送的 send_rpc_str字符串,其次从zk服务器中拿到注册的rpc服务端的 ip 和 port,连接到rpc服务器并发送请求,接受服务端返回的字节流,并反序列化响应response

// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据数据序列化和网络发送 
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
    google::protobuf::RpcController* controller, 
    const google::protobuf::Message* request,
    google::protobuf::Message* response,
    google::protobuf:: Closure* done)
{
    const google::protobuf::ServiceDescriptor* sd = method->service();
    std::string service_name = sd->name(); // service_name
    std::string method_name = method->name(); // method_name

    // 获取参数的序列化字符串长度 args_size
    uint32_t args_size = 0;
    std::string args_str;
    if (request->SerializeToString(&args_str))
    {
        args_size = args_str.size();
    }
    else
    {
        controller->SetFailed("serialize request error!");
        return;
    }

    // 定义rpc的请求header
    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    uint32_t header_size = 0;
    std::string rpc_header_str;
    if (rpcHeader.SerializeToString(&rpc_header_str))
    {
        header_size = rpc_header_str.size();
    }
    else
    {
        controller->SetFailed("serialize rpc header error!");
        return;
    }

    // 组织待发送的rpc请求的字符串
    std::string send_rpc_str;
    send_rpc_str.insert(0, std::string((char*)&header_size, 4)); // header_size
    send_rpc_str += rpc_header_str; // rpcheader
    send_rpc_str += args_str; // args

    // 使用tcp编程,完成rpc方法的远程调用
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == clientfd)
    {
        char errtxt[512] = {0};
        sprintf(errtxt, "create socket error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
    ZkClient zkCli;
    zkCli.Start();
    //  /UserServiceRpc/Login
    std::string method_path = "/" + service_name + "/" + method_name;
    // 127.0.0.1:8000
    std::string host_data = zkCli.GetData(method_path.c_str());
    if (host_data == "")
    {
        controller->SetFailed(method_path + " is not exist!");
        return;
    }
    int idx = host_data.find(":");
    if (idx == -1)
    {
        controller->SetFailed(method_path + " address is invalid!");
        return;
    }
    std::string ip = host_data.substr(0, idx);
    uint16_t port = atoi(host_data.substr(idx+1, host_data.size()-idx).c_str()); 

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

    // 连接rpc服务节点
    if (-1 == connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr)))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "connect error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

        // 发送rpc请求
    if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "send error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 接收rpc请求的响应值
    char recv_buf[1024] = {0};
    int recv_size = 0;
    if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "recv error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    // 反序列化rpc调用的响应数据
    if (!response->ParseFromArray(recv_buf, recv_size))
    {
        close(clientfd);
        char errtxt[512] = {0};
        sprintf(errtxt, "parse error! response_str:%s", recv_buf);
        controller->SetFailed(errtxt);
        return;
    }

    close(clientfd);
}

callee

服务方将继承自UserServiceRpc的UserService类的对象,传入RpcProvider类的构造函数生成对象provider,provider是rpc服务对象,调用NotifyService将UserService对象发布到rpc节点上,调用Run启动rpc服务节点,提供rpc远程调用服务

int main(int argc, char **argv)
{
    // 调用框架的初始化操作
    MprpcApplication::Init(argc, argv);

    // provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
    RpcProvider provider;
    provider.NotifyService(new UserService());
    // 启动一个rpc服务发布节点   Run以后,进程进入阻塞状态,等待远程的rpc调用请求
    provider.Run();

    return 0;
}

NotifyService将传入进来的服务对象service发布到rpc节点上。其实就是将服务对象及其方法的抽象描述,存储在map中

// 这里是框架提供给外部使用的,可以发布rpc方法的函数接口
void RpcProvider::NotifyService(google::protobuf::Service *service)
{
    ServiceInfo service_info;

    // 获取了服务对象的描述信息
    const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();
    // 获取服务的名字
    std::string service_name = pserviceDesc->name();
    // 获取服务对象service的方法的数量
    int methodCnt = pserviceDesc->method_count();

    LOG_INFO("service_name:%s", service_name.c_str());

    for (int i=0; i < methodCnt; ++i)
        {
            // 获取了服务对象指定下标的服务方法的描述(抽象描述) UserService   Login
            const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i);
            std::string method_name = pmethodDesc->name();
            service_info.m_methodMap.insert({method_name, pmethodDesc});

            LOG_INFO("method_name:%s", method_name.c_str());
        }
    service_info.m_service = service;
    m_serviceMap.insert({service_name, service_info});
}

Run创建TcpServer对象并绑定连接回调和消息可读回调及线程数,将rpc节点上要发布的服务全注册到zk服务器上,并启动网络服务和事件循环,等待客户端的连接和写入,从而触发对应的回调

// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run()
{
    // 读取配置文件rpcserver的信息
    std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
    muduo::net::InetAddress address(ip, port);

    // 创建TcpServer对象
    muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");

    // 绑定连接回调和消息读写回调方法  分离了网络代码和业务代码
    server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
    server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, 
        std::placeholders::_2, std::placeholders::_3));

    // 设置muduo库的线程数量
    server.setThreadNum(4);

    // 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
    ZkClient zkCli;
    zkCli.Start();
    // service_name为永久性节点    method_name为临时性节点
    for (auto &sp : m_serviceMap) 
        {
            // /service_name   /UserServiceRpc
            std::string service_path = "/" + sp.first;
            zkCli.Create(service_path.c_str(), nullptr, 0);
            for (auto &mp : sp.second.m_methodMap)
                {
                    // /service_name/method_name   /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和port
                    std::string method_path = service_path + "/" + mp.first;
                    char method_path_data[128] = {0};
                    sprintf(method_path_data, "%s:%d", ip.c_str(), port);
                    // ZOO_EPHEMERAL表示znode是一个临时性节点
                    zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
                }
        }

    // rpc服务端准备启动,打印信息
    std::cout << "RpcProvider start service at ip:" << ip << " port:" << port << std::endl;

    // 启动网络服务
    server.start();
    m_eventLoop.loop(); 
}

连接回调教简单,不在赘述,主要解释可读事件的回调OnMessage,接收远程rpc调用请求的字节流并反序列化 ,解析出service_name 和 method_name 和 args_str参数,并查找存储服务对象的map,找到服务对象service和方法对象描述符method,生成rpc方法调用的请求request和响应response,并设置发送响应的回调SendRpcResponse,并调用CallMethod

void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, 
    muduo::net::Buffer *buffer, 
    muduo::Timestamp)
{
    // 网络上接收的远程rpc调用请求的字符流    Login args
    std::string recv_buf = buffer->retrieveAllAsString();

    // 从字符流中读取前4个字节的内容
    uint32_t header_size = 0;
    recv_buf.copy((char*)&header_size, 4, 0);

    // 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
    std::string rpc_header_str = recv_buf.substr(4, header_size);
    mprpc::RpcHeader rpcHeader;
    std::string service_name;
    std::string method_name;
    uint32_t args_size;
    if (rpcHeader.ParseFromString(rpc_header_str))
    {
        // 数据头反序列化成功
        service_name = rpcHeader.service_name();
        method_name = rpcHeader.method_name();
        args_size = rpcHeader.args_size();
    }
    else
    {
        // 数据头反序列化失败
        std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
        return;
    }

    // 获取rpc方法参数的字符流数据
    std::string args_str = recv_buf.substr(4 + header_size, args_size);

    // 获取service对象和method对象
    auto it = m_serviceMap.find(service_name);
    if (it == m_serviceMap.end())
    {
        std::cout << service_name << " is not exist!" << std::endl;
        return;
    }

    auto mit = it->second.m_methodMap.find(method_name);
    if (mit == it->second.m_methodMap.end())
    {
        std::cout << service_name << ":" << method_name << " is not exist!" << std::endl;
        return;
    }

    google::protobuf::Service *service = it->second.m_service; // 获取service对象  new UserService
    const google::protobuf::MethodDescriptor *method = mit->second; // 获取method对象  Login

    // 生成rpc方法调用的请求request和响应response参数
    google::protobuf::Message *request = service->GetRequestPrototype(method).New();
    if (!request->ParseFromString(args_str))
    {
        std::cout << "request parse error, content:" << args_str << std::endl;
        return;
    }
    google::protobuf::Message *response = service->GetResponsePrototype(method).New();

    // 给下面的method方法的调用,绑定一个Closure的回调函数
    google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider, 
    const muduo::net::TcpConnectionPtr&, 
    google::protobuf::Message*>
    (this, 
    &RpcProvider::SendRpcResponse, 
    conn, response);

    // 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
    // new UserService().Login(controller, request, response, done)
    service->CallMethod(method, nullptr, request, response, done);
}

通过CallMethod调用UserService中的Login函数,获取参数并调用本地的Login函数,并写入响应,调用前面设置的response回调函数SendRpcResponse

void Login(::google::protobuf::RpcController* controller,
    const ::fixbug::LoginRequest* request,
    ::fixbug::LoginResponse* response,
    ::google::protobuf::Closure* done)
{
    // 框架给业务上报了请求参数LoginRequest,应用获取相应数据做本地业务
    std::string name = request->name();
    std::string pwd = request->pwd();

    // 做本地业务
    bool login_result = Login(name, pwd); 

    // 把响应写入  包括错误码、错误消息、返回值
    fixbug::ResultCode *code = response->mutable_result();
    code->set_errcode(0);
    code->set_errmsg("");
    response->set_sucess(login_result);

    // 执行回调操作   执行响应对象数据的序列化和网络发送(都是由框架来完成的)
    done->Run();
}

SendRpcResponse函数用于将响应序列化并发送出去

// Closure的回调操作,用于序列化rpc的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, 
    google::protobuf::Message *response)
{
    std::string response_str;
    if (response->SerializeToString(&response_str)) // response进行序列化
    {
        // 序列化成功后,通过网络把rpc方法执行的结果发送会rpc的调用方
        conn->send(response_str);
    }
    else
    {
        std::cout << "serialize response_str error!" << std::endl; 
    }
    conn->shutdown(); // 模拟http的短链接服务,由rpcprovider主动断开连接
}

zookeeper

实现的rpc,发起的rpc请求需知道请求的服务在哪台机器,所以需要分布式服务配置中心,所有提供rpc的节点,都需向配置中心注册服务,ip+port+服务,当然zookeeper不止分布式服务配置,还有其他协调功能,如分布式锁,这里不细述。

callee启动时,将UserService对象发布到rpc节点上,也就是将每个类的方法所对应的分布式节点地址和端口记录在zk服务器上,当调用远程rpc方法时,就去 zk服务器上面查询对应要调用的服务的ip和端口

注册服务:

// 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
ZkClient zkCli;
zkCli.Start();
// service_name为永久性节点    method_name为临时性节点
for (auto &sp : m_serviceMap) 
{
    // /service_name   /UserServiceRpc
    std::string service_path = "/" + sp.first;
    zkCli.Create(service_path.c_str(), nullptr, 0);
    for (auto &mp : sp.second.m_methodMap)
        {
            // /service_name/method_name   /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和port
            std::string method_path = service_path + "/" + mp.first;
            char method_path_data[128] = {0};
            sprintf(method_path_data, "%s:%d", ip.c_str(), port);
            // ZOO_EPHEMERAL表示znode是一个临时性节点
            zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
        }
}

查询服务:

// rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
ZkClient zkCli;
zkCli.Start();
//  /UserServiceRpc/Login
std::string method_path = "/" + service_name + "/" + method_name;
// 127.0.0.1:8000
std::string host_data = zkCli.GetData(method_path.c_str());

日志

借助线程安全的消息队列(日志队列为空则线程进入wait状态,否则唤醒线程),将要写的日志都压入消息队列中,再单独开一个线程负责将消息队列的日志写入到磁盘文件中

Logger::Logger()
{
    // 启动专门的写日志线程
    std::thread writeLogTask([&](){
        for (;;)
            {
                // 获取当前的日期,然后取日志信息,写入相应的日志文件当中 a+
                time_t now = time(nullptr);
                tm *nowtm = localtime(&now);

                char file_name[128];
                sprintf(file_name, "%d-%d-%d-log.txt", nowtm->tm_year+1900, nowtm->tm_mon+1, nowtm->tm_mday);

                FILE *pf = fopen(file_name, "a+");
                if (pf == nullptr)
                {
                    std::cout << "logger file : " << file_name << " open error!" << std::endl;
                    exit(EXIT_FAILURE);
                }

                std::string msg = m_lckQue.Pop();

                char time_buf[128] = {0};
                sprintf(time_buf, "%d:%d:%d =>[%s] ", 
                    nowtm->tm_hour, 
                    nowtm->tm_min, 
                    nowtm->tm_sec,
                    (m_loglevel == INFO ? "info" : "error"));
                msg.insert(0, time_buf);
                msg.append("\n");

                fputs(msg.c_str(), pf);
                fclose(pf);
            }
    });
    // 设置分离线程,守护线程
    writeLogTask.detach();
}

1.5 总结

整个系统时序图如下所示,整个项目源代码的链接

2 环境搭建

2.1 下载安装Protobuf

GitHub地址:GitHub - protocolbuffers/protobuf: Protocol Buffers - Google's data interchange format

这里我用的是3.19.4的版本:Releases · protocolbuffers/protobuf · GitHub

(用最新的版本也可以, 但要注意后续学习grpc时protobuf版本兼容性的问题)

老廖提供的百度云链接资料也包含了protobuf-cpp-3.19.4.tar.gz 压缩包.

  1. 解压安装包:tar zxf protobuf-cpp-3.19.4.tar.gz

  2. 进入解压后的文件夹:cd protobuf-3.19.4

  3. 安装所需工具:sudo apt-get install autoconf automake libtool curl make g++ unzip

  4. 自动生成confifigure配置文件:./autogen.sh

  5. 配置环境:./configure

  6. 编译源代码:make

  7. 安装:sudo make install

  8. 刷新动态库:sudo ldconfifig

2.2 Protobuf的使用

创建test.proto文件,添加以下信息:

syntax = "proto3";//声明protobuf的版本

package fixbug; //声明代码所在的包(对于c++来说是namespace)

//定义下面的选项,表示生成service服务类和rpc方法描述,默认不生成
option cc_generic_services = true;

message Resultcode{
  int32 errcode = 1;
  string errmsg = 2;
}

//数据  列表    映射表
//定义登陆请求消息类型 name pwd
message LoginRequest{
  string name = 1;
  string pwd = 2;
}

//定义登陆响应消息类型
message LoginResponse{
  Resultcode result = 1;
  bool success = 3;
}


//在protobuf里面怎么定义描述rpc方法的类型   -service
service UserServiceRpc{
rpc login(LoginRequest) returns(LoginResponse);
}

在命令行进入test.proto所在目录下,使用此命令

ubuntu% protoc test.proto --cpp_out=./
ubuntu% ls                            
a.out  main.cpp  test.pb.cc  test.pb.h  test.proto

此时protobuf自动生成了test.pb.cc和test.pb.h两个文件,我们可以使用peotobuf给我们提供的接口

#include "test.pb.h"
#include <iostream>
#include <string>

int main(){
    //封装了Login请求对象的数据
    fixbug::LoginRequest req;
    req.set_name("zixuan");
    req.set_pwd("1231231");

    //对象数据序列化->char*
    std::string send_str;
    if(req.SerializeToString(&send_str)){
        std::cout << send_str.c_str() << std::endl;
    }

    //从send_str反序列化一个Login请求对象
    fixbug::LoginRequest reqB;
    if(reqB.ParseFromString(send_str)){
        std::cout << reqB.name() << std::endl;
        std::cout << reqB.pwd() << std::endl;
    }

    return 0;
}

2.3 下载安装ZooKeeper

https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz

下载解压后

lqf@ubuntu:~$ tar zxf apache-zookeeper-3.8.4-bin.tar.gz
lqf@ubuntu:~$ cd apache-zookeeper-3.8.4-bin/
lqf@ubuntu:~/apache-zookeeper-3.8.4-bin$ cd conf/
lqf@ubuntu:~/apache-zookeeper-3.8.4-bin/conf$ cp zoo_sample.cfg zoo.cfg
lqf@ubuntu:~/apache-zookeeper-3.8.4-bin/conf$ cd ..
lqf@ubuntu:~/apache-zookeeper-3.8.4-bin$ cd bin/
lqf@ubuntu:~/apache-zookeeper-3.8.4-bin/bin$ ls
README.txt    zkCli.sh   zkServer.cmd            zkSnapshotComparer.cmd  zkSnapShotToolkit.sh
zkCleanup.sh  zkEnv.cmd  zkServer-initialize.sh  zkSnapshotComparer.sh   zkTxnLogToolkit.cmd
zkCli.cmd     zkEnv.sh   zkServer.sh             zkSnapShotToolkit.cmd   zkTxnLogToolkit.sh

# 使用管理员权限启动
lqf@ubuntu:~/apache-zookeeper-3.8.4-bin/bin$ sudo ./zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/lqf/apache-zookeeper-3.8.4-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

打印STARTED说明启动成功,

也可以查看端口2181是否被监听:

lqf@ubuntu:~/apache-zookeeper-3.8.4-bin/bin$ sudo lsof -i:2181
COMMAND     PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java     158443 root   65u  IPv6 2302952      0t0  TCP *:2181 (LISTEN)

这也说明zookeeper是正常工作的.

2.4 编译项目

原有的编译方式大家参考:

GitHub - attackoncs/rpc: 基于muduo、protobuf、zookeeper的rpc实现

GitHub - terryup/rpc: 基于muduo+protobuf+zookeeper的一个rpc分布式网络通信框架

老廖这里已经把mudo库集成到rpc里,目的是方便编译和调试学习.

# 解压
tar -zxf rpc-main.tar.gz
# 进到项目
cd rpc-main
# 该项目基于cmake构建
mkdir build
cd build 
cmake ..
make -j4

# 编译成功后在bin目录生成consumer  provider  test
ls bin/
consumer  provider  test

启动rpc server端:

provider: 这个是rpc server,需要将服务注册到zookeeper, 启动时需要通过 -i 指定配置文件, 配置文件路径在rpc-main/app/example/test.conf

# 先拷贝配置文件test.conf到当前build目录
lqf@ubuntu:~/rpc-main/build$ cp ../app/example/test.conf  .

# 指定配置文件启动rpc server
lqf@ubuntu:~/rpc-main/build$ ./bin/provider -i test.conf 
2025-01-20 23:05:40,477:171547(0x7f37faeca40):ZOO_INFO@log_env@753: Client environment:z 
zookeeper_init success!
... 省略部分
znode create success... path:/FiendServiceRpc/GetFriendsList
RpcProvider start service at ip:127.0.0.1 port:8000  有这个打印说明rpc server启动成功

启动rpc client端:

consumer: 这个是rpc 客户端, 需要连接zookeeper获取服务信息, 启动时需要通过-i 指定配置文件, 配置文件和provider用的一样, 启动命令: ./bin/consumer -i test.conf

lqf@ubuntu:~/long/rpc-main/build$ ./bin/consumer -i test.conf 
============================================
header_size: 35
rpc_header_str: 
FiendServiceRpcGetFriendsList
service_name: FiendServiceRpc
method_name: GetFriendsList
args_str:�
============================================
.......省略
zookeeper_init success!
2025-01-20 23:06:54,888:172008(0x7ff733962780):ZOO_INFO@zookeeper_close@2563: Closing zookeeper sessionId=0x100020839f40003 to [127.0.0.1:2181]

rpc GetFriendsList response success!
index:1 name:gao yang
index:2 name:liu hong
index:3 name:wang shuo

2.5 zookeeper命令查看节点情况

启动zookeeper 命令行客户端 ./zkCli.sh -server localhost:2181

lqf@ubuntu:~/long/apache-zookeeper-3.8.4-bin/bin$ ./zkCli.sh -server localhost:2181

命令ls / 查看当前的根目录情况

[zk: localhost:2181(CONNECTED) 1] ls /
[FiendServiceRpc, zookeeper]

命令ls /FiendServiceRpc 查看对应service的方法

[zk: localhost:2181(CONNECTED) 2] ls  /FiendServiceRpc
[GetFriendsList]

命令get获取当前方法的调用地址

[zk: localhost:2181(CONNECTED) 3] get  /FiendServiceRpc/GetFriendsList
127.0.0.1:8000

3 框架加载配置文件

MprpcApplication类

MprpcApplication类负责mprpc框架的一些初始化操作,例如mprpc服务器需要监听的端口号还有所在地址。这些信息由配置文件来载入例如test.conf.

项目初始化的时候,会根据传入的命令行参数信息找到配置文件。如符合规范则找到配置文件会调用MprpcConfig::LoadConfigFile方法来解析加载配置文件。

//  mprpc框架基础类     负责框架的一些初始化操作
class MprpcApplication{
public:
    static void Init(int argc, char **argv);
    static MprpcApplication& GetInstance();
    static MprpcConfig& GetConfig();
private:
    static MprpcConfig m_config;
    
    MprpcApplication(){}
    MprpcApplication(const MprpcApplication&) = delete;
    MprpcApplication(MprpcApplication&&) = delete;
};

命令行输入的合法性检测

void MprpcApplication::Init(int argc, char **argv){
    if(argc < 2){
        ShowArgsHelp();
        exit(EXIT_FAILURE);
    }

    int c = 0;
    std::string config_file;
    if((c = getopt(argc, argv, "i:")) != -1){
        switch (c)
        {
        case 'i':
            config_file = optarg;
            break;
        case '?':
            ShowArgsHelp();
            exit(EXIT_FAILURE);
        case ':':
            ShowArgsHelp();
            exit(EXIT_FAILURE);
        default:
            break;
        }
    }

    //  开始加载配置文件了   rpcserver_ip=  rpcserver_port=  zookeeper_ip=  zookeeper_port=
    m_config.LoadConfigFile(config_file.c_str());
}

MprpcConfig类

用于读取配置文件,需要去掉注释和字符串前后多余的空格,还需检测配置项是否合法。

//rpcserverip   rpcserverport   zookeeperip     zookeeperport   
//框架读取配置文件类
class MprpcConfig{
public:
    //负责解析加载配置文件
    void LoadConfigFile(const char *config_file);

    //查询配置项信息
    std::string Load(const std::string &key);

private:
    std::unordered_map<std::string, std::string>m_configMap;

    //去掉字符串前后的空格
    void Trim(std::string &src_buf);
};

测试配置文件加载功能

编写test.conf文件

# rpc节点的ip地址
rpcserverip=127.0.0.1
# rpc节点的port端口号
rpcserverport=8080
# zk的IP地址
zookeeperip=127.0.0.1
# zk的port端口号
zookeeperport=2181

启动服务端,测试成功

ubuntu% ./provider -i test.conf 
rpcserverip:127.0.0.1
rpcserverport:8080
zookeeperip:127.0.0.1
zookeeperport:2181

4 RpcProvider类

RpcProvider是一个框架专门为发布rpc服务的网络对象类。在服务端会用此类来注册服务,故RpcProvider类需要支持所有服务的发布。因此设计的NotifyService方法的参数必须要是这些服务的基类,也就是google::protobuf::Service。

因为protobuf只是提供了数据的序列化和反序列化还有RPC接口,并没有提供网络传输的相关代码。所以此项目用了muduo库实现网络传输。(后期可能会自己实现高并发网络传输)

同时还需要将服务注册到zookeeper上。

RpcProvider类源码

class RpcProvider{
public:
    //  这里是框架提供给外部使用的,可以发布RPC方法的接口
    void NotifyService(google::protobuf::Service *service);
    //  启动RPC服务结点,开始提供RPC远程网络调用服务
    void Run();
private:
    //  组合了EventLoop
    muduo::net::EventLoop m_eventLoop;
    //  服务类型信息
    struct ServiceInfo{
        google::protobuf::Service *m_service;    //  保存服务对象
        std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> m_methodMap;   //  保存服务方法
    };
    //  存储注册成功的服务对象和其他服务方法的所有信息
    std::unordered_map<std::string, ServiceInfo> m_serviceMap;
    //  新的socket连接回调
    void OnConnection(const muduo::net::TcpConnectionPtr&);
    //  已建立连接用户的读写事件回调
    void OnMessage(const muduo::net::TcpConnectionPtr&, muduo::net::Buffer*, muduo::Timestamp);
    //  Closure的回调操作,用于序列化RPC的响应和网络发送
    void SendRpcResponse(const muduo::net::TcpConnectionPtr&, google::protobuf::Message*);

};

NotifyService

  1. 利用NotifyService发布服务

  2. 从*service获取服务对象的描述信息,此接口由protobuf提供。

  3. 从描述信息中获取到服务名字和服务对象service的方法和数量。

  4. 遍历service获取服务对象指定虾苗的服务方法描述,并将其注册到m_methodMap上,例如FriendServiceRpc/GetFriendsList.

  5. 最后将其加入服务对象集合m_serviceMap中。

void RpcProvider::NotifyService(google::protobuf::Service *service){
    ServiceInfo service_info;
    const google::protobuf::ServiceDescriptor *perviceDesc = service->GetDescriptor();
    std::string service_name = perviceDesc->name();
    int methodCnt = perviceDesc->method_count();

    LOG_INFO("service_name:%s", service_name.c_str());

    for (int i = 0; i < methodCnt; ++i){
        const google::protobuf::MethodDescriptor* pmethodDesc =  perviceDesc->method(i);
        std::string method_name = pmethodDesc->name();
        service_info.m_methodMap.insert({method_name, pmethodDesc});

        LOG_INFO("method_name:%s", method_name.c_str());
    }
    service_info.m_service = service;
    m_serviceMap.insert({service_name, service_info});
}

开启远程服务

  1. 从RPC的框架中获取到IP和PORT,创建TCPserver对象

  2. 设置muduo库的线程数量

  3. 把当前结点要发布的服务注册到zookeeper上,让客户端可以从zookeeper上发现服务

  4. 启动网络服务

void RpcProvider::Run(){
    std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
    muduo::net::InetAddress address(ip, port);

    muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");

    server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
    server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1,
             std::placeholders::_2, std::placeholders::_3));
    
    server.setThreadNum(4);

    ZkClient zkcli;
    zkcli.Start();

    for(auto &sp : m_serviceMap){
        std::string service_path = "/" + sp.first;
        zkcli.Create(service_path.c_str(), nullptr, 0);
        for(auto &mp : sp.second.m_methodMap){
            std::string method_name = service_path + "/" + mp.first;
            char method_path_data[128] = {0};
            sprintf(method_path_data, "%s:%d", ip.c_str(), port);
            zkcli.Create(method_name.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
        }
    }

    LOG_INFO("RpcProvider start service at ip::%s port:%d", ip.c_str(), port);

    server.start();
    m_eventLoop.loop();
}

客户端发起请求服务端接收到

当接收到客户端的请求时。OnMessage回调函数会被调用,可以从客户端发过来的数据得知他想要调用那一个类的那一个方法以及其参数是什么。

为了防止TCP的粘包问题需要在自定义一个协议,本项目采用了将消息分为消息头和消息体,消息头包含此消息的总长度,每次都需要先读消息头,从而得知我们这次发过来的消息要读到那里。

  1. 从网络上接收远程的RPC调用请求的字符串。

  2. 从字符串中先读取前四个字节的内容,从而得知此次消息的长度。

  3. 根据header_size读取数据头的原始字符串,反序列化数据得到RPC请求的详细消息。

  4. 获取service对象和method对象。

  5. 生成RPC方法调用请求request和相应response参数。

  6. 在框架上根据远端的RPC请求调用当前的RPC结点方法。

void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr& conn, 
                            muduo::net::Buffer *buffer,
                             muduo::Timestamp){
    std::string recv_buf = buffer->retrieveAllAsString();

    uint32_t header_size = 0;
    recv_buf.copy((char *)&header_size, 4, 0);

    std::string rpc_header_str = recv_buf.substr(4, header_size);
    mprpc::RpcHeader rpcHeader;
    std::string service_name;
    std::string method_name;
    uint32_t args_size;
    if(rpcHeader.ParseFromString(rpc_header_str)){
        service_name = rpcHeader.service_name();
        method_name = rpcHeader.method_name();
        args_size = rpcHeader.args_size();
    }
    else{
        LOG_ERR("rec_header_str:%sparse error!", rpc_header_str.c_str());
        return;
    }

    std::string args_str = recv_buf.substr(4 + header_size, args_size);

    auto it = m_serviceMap.find(service_name);
    if(it == m_serviceMap.end()){
        LOG_ERR("%sis not exist", service_name.c_str());
        return;
    }

    auto mit = it->second.m_methodMap.find(method_name);
    if(mit == it->second.m_methodMap.end()){
        LOG_ERR("%s:%sis not exist", service_name.c_str(), method_name.c_str());
        return;
    }

    google::protobuf::Service *service = it->second.m_service;  
    const google::protobuf::MethodDescriptor *method = mit->second; 

    google::protobuf::Message *request = service->GetRequestPrototype(method).New();
    if(!request->ParseFromString(args_str)){
        LOG_ERR("request parse error! content:%s", args_str.c_str());
        return;
    }
    google::protobuf::Message *response = service->GetResponsePrototype(method).New();

    //  给下面的method的方法的调用,绑定一个Closure的回调函数
    google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider, const muduo::net::TcpConnectionPtr&, google::protobuf::Message*>(this,&RpcProvider::SendRpcResponse, conn, response);

    service->CallMethod(method, nullptr, request, response, done);
}

服务端处理完请求返回数据给客户端

当service->CallMethod执行完毕后,调用通过done绑定的回调函数。将服务器处理后的结果序列化,然后通过muduo网络库发回给客户端。

void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message *response){
    std::string response_str;
    if(response->SerializeToString(&response_str)){
        conn->send(response_str);
    }
    else{
        LOG_ERR("serialize response_str error!");
    }
    conn->shutdown();  
}

5 MprpcController模块

MprpcContrller模块继承于google::protobuf::RpcController,他声明于service.h文件下,而RpcController是一个抽象类,他的成员都是纯虚函数,需要我们自己重写实现,我们可以通过RpcController的方法得到RPC方法执行过程中的状态和RPC方法执行过程中的错误信息。

class PROTOBUF_EXPORT RpcController {
 public:
  inline RpcController() {}
  virtual ~RpcController();

  virtual void Reset() = 0;
  virtual bool Failed() const = 0;
  virtual std::string ErrorText() const = 0;
  virtual void StartCancel() = 0;
  virtual void SetFailed(const std::string& reason) = 0;
  virtual bool IsCanceled() const = 0;
  virtual void NotifyOnCancel(Closure* callback) = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcController);
};

RpcController的API

这里只提及本项目涉及到的接口

Reset()可以将RpcController重新设定为初始状态,以便他可以被重用。他不能在RPC进行时调用。

virtual void Reset() = 0;

Failed()在一个调用结束以后,如果调用失败则返回ture。失败的原因取决于RPC的实现。Failed()不能在调用结束前被调用。如果返回true则响应消息的内容未被定义。

virtual bool Failed() const = 0;

如果Failed()返回为true此方法则返回一个用户可读的错误描述。

virtual std::string ErrorText() const = 0;

StartCancel()通知RPC系统,调用者希望RPC调用取消,RPC系统可以立刻取消,也可以等待一段时间后再取消调用,也可以不取消。如果调用被取消,done回调任然会被调用,RpcController会表明当时的调用失败。

virtual void StartCancel() = 0;

MprpcController声明

class MprpcController : public google::protobuf::RpcController{
public:
    MprpcController();
    void Reset();
    bool Failed() const;
    std::string ErrorText() const;
    void SetFailed(const std::string& reason);

    //  目前未实现具体的功能
    void StartCancel();
    bool IsCanceled() const;
    void NotifyOnCancel(google::protobuf::Closure* callback);


private:
    bool m_failed;  //  RPC方法执行过程中的状态
    std::string m_errText; //  RPC方法执行过程中的错误信息
};

MprpcController实现

MprpcController::MprpcController(){
    m_failed = false;
    m_errText = "";
}

void MprpcController::Reset(){
    m_failed = false;
    m_errText = "";
}

bool MprpcController::Failed() const{
    return m_failed;
}

std::string MprpcController::ErrorText() const{
    return m_errText;
}

void MprpcController::SetFailed(const std::string& reason){
    m_failed = true;
    m_errText = reason;
}

//  目前未实现具体的功能
void MprpcController::StartCancel(){}
bool MprpcController::IsCanceled() const{return false;}
void MprpcController::NotifyOnCancel(google::protobuf::Closure* callback){}

6 MprpcChannel模块

MprpcChannel模块继承于google::protobuf::RpcChannel他是一个RPC通道的抽象接口,表示一个到服务的通信线路,这个线路用于客户端远程调用服务端的方法。我们需要继承这个类并重写他的CallMethod方法。

class PROTOBUF_EXPORT RpcChannel {
 public:
  inline RpcChannel() {}
  virtual ~RpcChannel();

  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};

MprpcChannel类

class MprpcChannel : public google::protobuf::RpcChannel{
public:
    void CallMethod(const google::protobuf::MethodDescriptor* method,
                          google::protobuf::RpcController* controller, const google::protobuf::Message* request,
                          google::protobuf::Message* response, google::protobuf::Closure* done);


};

CallMethod方法

所有通过stub代理对象调用的RPC方法都走到了这里,统一做RPC方法调用的数据序列化和网络发送

获取客户端请求的方法和序列化

  1. 从CallMethod的参数method获取service_name和method_name;

  2. 将获取到的参数序列化为字符串,并获取他的长度。

  3. 定义RPC的请求header.

  4. 组织待发送的RPC请求的字符串

const google::protobuf::ServiceDescriptor* sd = method->service();
    std::string service_name = sd->name(); 
    std::string method_name = method->name(); 

    uint32_t args_size = 0;
    std::string args_str;
    if(request->SerializeToString(&args_str)){
        args_size = args_str.size();
    }
    else{
        controller->SetFailed("serialize request error!");
        return;
    }

    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    uint32_t header_size = 0;
    std::string rpc_header_str;
    if(rpcHeader.SerializeToString(&rpc_header_str)){
        header_size = rpc_header_str.size();
    }
    else{
        controller->SetFailed("serialize rpc header error!");
        return;
    }

    std::string send_rpc_str;
    send_rpc_str.insert(0, std::string((char*)&header_size, 4)); 
    send_rpc_str += rpc_header_str; 
    send_rpc_str += args_str;

使用TCP编程完成RPC方法的远程调用

因为CallMethod方法用于客户端远程调用服务端的方法,考虑到这里不需要高并发,故没有使用muduo网络库。

  1. 通过 socket 函数在内核中创建一个套接字

  2. RPC调用方法想要调用service_name的method_name服务,需要到zookeeper上查询该服务的所在的host信息。

  3. 查询到了mathod_name服务的IP和PORT后,连接RPC服务结点

  4. 发送RPC请求

  5. 接收RPC请求的响应值

  6. 最后反序列化服务器发回来的响应数据

int client_fd = socket(AF_INET, SOCK_STREAM, 0);
    if(client_fd == -1){
        char errtxt[512] = {0};
        sprintf(errtxt, "create socket error! error:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    ZkClient zkcli;
    zkcli.Start();
    std::string method_path = "/" + service_name + "/" + method_name;
    //  127.0.0.1:8080  
    std::string host_data = zkcli.GetData(method_path.c_str());
    if(host_data == ""){
        controller->SetFailed(method_path + "is not exist!");
        return;
    }
    int idx = host_data.find(":");
    if(idx == -1){
        controller->SetFailed(method_path + "address is invalid!");
        return;
    }
    std::string ip = host_data.substr(0, idx);
    uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

    if(connect(client_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1){
        close(client_fd);
        char errtxt[512] = {0};
        sprintf(errtxt, "connect error! error:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    if(send(client_fd, send_rpc_str.c_str(), send_rpc_str.size(), 0) == -1){
        close(client_fd);
        char errtxt[512] = {0};
        sprintf(errtxt, "send error! errno:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    char recv_buf[1024] = {0};
    int recv_size = 0;
    if((recv_size = recv(client_fd, recv_buf, 1024, 0)) == -1){
        close(client_fd);
        char errtxt[512] = {0};
        sprintf(errtxt, "recv error! error:%d", errno);
        controller->SetFailed(errtxt);
        return;
    }

    if(!response->ParseFromArray(recv_buf, recv_size)){
        close(client_fd);
        char errtxt[512] = {0};
        sprintf(errtxt, "parse error! response_str%s", recv_buf);
        controller->SetFailed(errtxt);
        return;
    }
    close(client_fd);

7 日志模块

同步日志与缺点

同步日志通常也会被称为传统日志,每次调用一次打印日志API就对应一次系统的调用write写日志文件,如果日志产生频率低的话没什么问题。

但是如果日志打印非常频繁,会发生如下问题:

  • 大量的日志打印陷入等量的write系统调用,有一定的系统开销

  • 打印日志的线程附带了大量同步的磁盘IO,严重影响性能

对以上问题我们的解决方案是,引入异步日志。

异步日志与队列

异步日志,我个人理解就是工作线程的日志打印接口负责生产日志数据(作为日志的生产者),而日志的实现操作则留给另一个后台进程去完成(作为日志的消费者),用一个典型的生产者-消费者问题就能解决。

这样一来工作线程调用日志打印接口成为非阻塞的操作,同步的磁盘IO可以从主线程分离出来,提高性能。对于异步日志我们借助队列来实现:工作线程写日志到队列,队列本身使用条件变量为通知机制,当有数据入队列时就通知消费者线程去消费日志,模型如下:

这样一来也会发生一定的问题:

  • queue如果没有保证线程安全就会出现数据安全问题

  • 如果消费者线程一直占用锁就会造成生产者线程阻塞,影响性能

lockQueue模块

LockQueue模块是为了解决queue的线程安全问题,本项目的LockQueue实现相对简单。

首先是生产者线程,将会有多个生产线程去争一把锁,将数据写入队列。

void Push(const T &data){
    std::lock_guard<std::mutex> lock(m_mutex);
    m_queue.push(data);
    m_condvariable.notify_one();
}

其次是消费者线程,用一个消费者线程去读日志队列,将其写入日志文件。当日志队列为空时,将消费者线程置为wait状态。

T Pop(){
    std::unique_lock<std::mutex> lock(m_mutex);
    while (m_queue.empty()){
        m_condvariable.wait(lock);
    }

    T data = m_queue.front();
    m_queue.pop();
    return data;
}

整体实现代码:

template<typename T>
class LockQueue{
public:
    void Push(const T &data){
        std::lock_guard<std::mutex> lock(m_mutex);
        m_queue.push(data);
        m_condvariable.notify_one();
    }
    T Pop(){
        std::unique_lock<std::mutex> lock(m_mutex);
        while (m_queue.empty()){
            m_condvariable.wait(lock);
        }
        T data = m_queue.front();
        m_queue.pop();
        return data;
    }
private:
    std::queue<T> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_condvariable;
};

Logger模块

Logger模块实现了MPRPC框架的日志系统,它提供了三个方法和一个构造函数。

GetInstance()方法可以获取日志的单例。他的实现如下:

Logger& Logger::GetInstance(){
    static Logger logger;
    return logger;
}

SetLogLevel(LogLevel level)方法可以获取日志的级别,而LogLevel我们定义了一个枚举,INFO是普通的日志信息,而ERROR则是错误的日志信息。他的实现如下:

enum LogLevel{
    INFO,
    ERROR,
};
void Logger::SetLogLevel(LogLevel level){
    m_loglevel = level;
}

Log(std::string msg)方法则是给生产者线程提供的写日志的方法。实现如下:

void Logger::Log(std::string msg){
    m_lockQue.Push(msg);
}

构造函数则为启用一个专门写日志的线程,这个线程是一个后台线程(守护线程),我们希望我们的日志文件的文件名是一个这样子的形式:

2023-10-1-log.txt
2023-10-2-log.txt
2023-20-3-log.txt

这样以便与我们后期查阅,且我们希望日志文件内包含有时、分、秒的信息,以便与我们后期快速找到问题所在。构造函数实现如下:

Logger::Logger(){
    std::thread writeLogTask([&](){
        for(;;){
            time_t now = time(nullptr);
            tm *nowtm = localtime(&now);
            char file_name[128];
            sprintf(file_name, "%d-%d-%d-log.txt", nowtm->tm_year + 1900, nowtm->tm_mon + 1, nowtm->tm_mday);

            FILE *pf = fopen(file_name, "a+");
            if(pf == nullptr){
                std::cout << "logger file:" << file_name << "open error!" << std::endl;
                exit(EXIT_FAILURE);
            }
            std::string msg = m_lockQue.Pop();
            char time_buf[128] = {0};
            sprintf(time_buf, "%d-%d-%d => [%s]", 
                nowtm->tm_hour, 
                nowtm->tm_min, 
                nowtm->tm_sec,
                (m_loglevel == INFO ? "info" : "error"));
            msg.insert(0, time_buf);
            msg.append("\n");

            fputs(msg.c_str(), pf);
            fclose(pf);
        }
    });
    writeLogTask.detach();
}

8 ZooKeeper分布式协调服务

ZooKeeper是一个分布式的应用程序协调服务,我们client在调用RPC框架服务的时候需要一个服务配置中心来记录那个服务器提供了那个服务,通俗些讲就是client需要知道他想要远程调用的服务被放在了哪一台服务器上他的IP:PORT是什么,所以我们需要一个中间件ZooKeeper来告诉client他想要调用的服务在哪。

ZooKeeper提供了什么

正如上文所说,zookeeper为我们提供文件系统和通知机制

  • 文件系统

zookeeper提供了一个多层级的命名空间(结点znode)。与文件系统不同的是,这些结点都可以设置一个关联的数据,而文件系统只有叶子结点可以存放数据目录结点则不行。zookeeper为了保持高吞吐了低延迟,在内存中维护了这个树状的树形结构。这种特质的原因使得zookeeper每个结点只能存储1MB的数据。

  • 通知机制

  • client端会对某一个znode建立一个watcher事件,当znode发生变化时,client会接收到zk发过来的通知,从而根据znode的变化做出业务上的改变。

结点类型

zookeeper节点类型可以分为持久节点(PERSISTENT)、临时节点(EPHEMERAL)和顺序节点(SEQUENTIAL)三大类,而本项目只会用到前两类。

持久节点(PERSISTENT)

所谓持久性结点就是指该数据节点被创建了之后,会一直保留在zookeeper服务器上,直到有删除操作来主动清除这个节点。例如项目中的service_name也就是/FriendSerciceRpc就会被注册为持久结点,这里即使RPC结点超时未发送心跳,zk也不会删除这个结点。(心跳概念见下文)

临时节点(EPHEMERAL)

和持久性节点不同的是,临时结点的生命周期和客户端的会话绑定在一起的。因此只要客户端会话失效,那么这个节点就会被自动清理掉。注意,这里提到的是客户端会话失效,而非TCP连接断开。同时zookeeper规定了不能在临时结点上创建子结点,即临时结点只能作为叶子结点。我们这里做一个测试。

  • 通过自述文件的方法启动zookeeper。(这里不做演示)

  • 启动provider发布服务到zk上,这里能看到我们已经发布成功了。

ubuntu% ./provider -i test.conf 
rpcserverip:127.0.0.1
rpcserverport:8080
zookeeperip:127.0.0.1
zookeeperport:2181
2023-04-23 00:00:22,262:4806(0x7f7333731a00):ZOO_INFO@log_env@726: Client environment:zookeeper.version=zookeeper C client 3.4.10
2023-04-23 00:00:22,262:4806(0x7f7333731a00):ZOO_INFO@log_env@730: Client environment:host.name=ubuntu
2023-04-23 00:00:22,262:4806(0x7f7333731a00):ZOO_INFO@log_env@737: Client environment:os.name=Linux
2023-04-23 00:00:22,262:4806(0x7f7333731a00):ZOO_INFO@log_env@738: Client environment:os.arch=5.4.0-146-generic
2023-04-23 00:00:22,262:4806(0x7f7333731a00):ZOO_INFO@log_env@739: Client environment:os.version=#163~18.04.1-Ubuntu SMP Mon Mar 20 15:02:59 UTC 2023
2023-04-23 00:00:22,263:4806(0x7f7333731a00):ZOO_INFO@log_env@747: Client environment:user.name=zixuanhuang
2023-04-23 00:00:22,263:4806(0x7f7333731a00):ZOO_INFO@log_env@755: Client environment:user.home=/home/zixuanhuang
2023-04-23 00:00:22,263:4806(0x7f7333731a00):ZOO_INFO@log_env@767: Client environment:user.dir=/home/zixuanhuang/mprpc/bin
2023-04-23 00:00:22,263:4806(0x7f7333731a00):ZOO_INFO@zookeeper_init@800: Initiating client connection, host=127.0.0.1:2181 sessionTimeout=30000 watcher=0x55c06a84ef18 sessionId=0 sessionPasswd=<null> context=(nil) flags=0
2023-04-23 00:00:22,263:4806(0x7f7330ecf700):ZOO_INFO@check_events@1728: initiated connection to server [127.0.0.1:2181]
2023-04-23 00:00:22,266:4806(0x7f7330ecf700):ZOO_INFO@check_events@1775: session establishment complete on server [127.0.0.1:2181], sessionId=0x1879d16838c0045, negotiated timeout=30000
zookeeper_init sucess!
znode create success... path:/FriendServiceRpc
znode create success... path:/FriendServiceRpc/GetFriendsList
  • 回到zkcli.sh查看是否注册了这个节点,可以看到已经注册成功了。

[zk: localhost:2181(CONNECTED) 6] ls /
[zookeeper, FriendServiceRpc]
[zk: localhost:2181(CONNECTED) 7] ls /FriendServiceRpc/GetFriendsList
[]
[zk: localhost:2181(CONNECTED) 8]
  • 这个时候我们将provider的会话关掉,可以看到/FriendServiceRpc目录下已经为空。

provider:
^C
ubuntu% 
zkcli.sh:
[zk: localhost:2181(CONNECTED) 8] ls /
[zookeeper, FriendServiceRpc]
[zk: localhost:2181(CONNECTED) 9] ls /FriendServiceRpc
[]

心跳消息

client和ZooKeeper之间通信,需要创建一个Session,这个Session会有一个超时时间,因为Zookeeper集群会把Client的Session信息持久化,所以在Session没超时之前,client与Zookeeper server的连接可以在各个Zookeeper server之间透明地移动。在实际的应用中,如果client与server之间的通信足够频繁,Session的维护就不需要其他额外的消息了。否则,ZooKeeper client每t/3ms就需要发一次心跳给Service,如果超过了t的事件Service还没有接收到client发过来的心跳消息,那么ZooKeeper Service就会认为这个client失效了,从而注销掉他的服务。

ZooKeeper组织结构

远程zkClient API存在的问题

  1. 设置监听watcher只能是一次性的,每次触发后需要重复设置

  2. .znode节点只存储简单的byte字节数组,如果存储对象,需要自己转换对象生成字节数组

项目应用

Roc_provider中注册到了unordered_map中,这里需要连接ZkClient,注册到ZooKeeper中。这里需要创建指定的路径和数据。

路径为:/FriendServiceRpc/GetFriendList

数据为:127.0.0.1:2181

对于提供RPC服务端,在RpcProvider的Run()方法做以下修改

ZkClient zkcli;
zkcli.Start();
for(auto &sp : m_serviceMap){
    std::string service_path = "/" + sp.first;
    zkcli.Create(service_path.c_str(), nullptr, 0);
    for(auto &mp : sp.second.m_methodMap){
        std::string method_name = service_path + "/" + mp.first;
        char method_path_data[128] = {0};
        sprintf(method_path_data, "%s:%d", ip.c_str(), port);
        zkcli.Create(method_name.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
    }
}

对于调用RPC方法的客户端,在MprpcChannel的CallMethod方法做以下修改

ZkClient zkcli;
zkcli.Start();
std::string method_path = "/" + service_name + "/" + method_name;
std::string host_data = zkcli.GetData(method_path.c_str());
if(host_data == ""){
    controller->SetFailed(method_path + "is not exist!");
    return;
}
int idx = host_data.find(":");
if(idx == -1){
    controller->SetFailed(method_path + "address is invalid!");
    return;
}
std::string ip = host_data.substr(0, idx);
uint16_t port = atoi(host_data.substr(idx + 1, host_data.size() - idx).c_str());