【brpc】安装与使用

发布于:2025-05-23 ⋅ 阅读:(12) ⋅ 点赞:(0)

1. brpc是什么

brpc 是用 c++语言编写的工业级 RPC 框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统。

brpc具体来说它是一个RPC框架(RPC是一个远程调用的框架)。当客户端想完成某个功能,但是本质来说这个功能并不在客户端完成,而是将这个算力交给服务器来进行。通过网络通信的方式将所需要的参数/数据/输入参数发送给服务器,服务器在针对参数/数据进行处理,然后给客户端返回一个结果。

对客户端来说它的上层并不关心内部网络通信的过程,感觉就是调用了一个接口,就完成了数据的处理拿到了结果。其实就是将数据处理过程交给服务器完成。

就比如下面,服务器内部实现了一个具体的Add功能,而本地也有一个Add接口,但是这个接口内部实现的并不是具体的处理过程,而是在内部通过网络通信向服务器发布一个请求,将参数num1、num2发送给服务器,服务器在调用它内部实现的Add方法去完成具体的处理,处理之后在将结果返回给本地,本地拿到结果,然后它的Add接口在返回最终结果。

在这里插入图片描述

可以使用brpc:

  • 搭建能在一个端口支持多协议的服务, 或访问各种服务
    • restful http/https, h2/gRPC。使用 brpc 的 http 实现比 libcurl 方便多了。从其他语言通过 HTTP/h2+json 访问基于 protobuf 的协议.
    • redis 和 memcached, 线程安全,比官方 client 更方便。
    • rtmp/flv/hls, 可用于搭建流媒体服务.
    • 支持 thrift , 线程安全,比官方 client 更方便
    • 各种百度内使用的协议: baidu_std, streaming_rpc, hulu_pbrpc, sofa_pbrpc, nova_pbrpc, public_pbrpc, ubrpc 和使用 nshead 的各种协议.
    • 基于工业级的 RAFT 算法实现搭建高可用分布式系统,已在 braft 开源。
  • Server 能同步或异步处理请求。
  • Client 支持同步、异步、半同步,或使用组合 channels 简化复杂的分库或并发访问。
  • 通过 http 界面调试服务, 使用 cpu, heap, contention profilers.
  • 获得更好的延时和吞吐.
  • 把你组织中使用的协议快速地加入 brpc,或定制各类组件, 包括命名服务 (dns, zk, etcd), 负载均衡 (rr, random, consistent hashing)

2. 安装

先安装依赖

sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev

安装 brpc

git clone https://github.com/apache/brpc.git
cd brpc
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
cmake --build . -j6
make
sudo make install

3. 类与接口介绍

3.1 日志输出类与接口

包含头文件: #include <butil/logging.h>

日志输出这里,本质上我们其实用不着 brpc 的日志输出,因此在这里主要介绍如何关闭日志输出。

日志初始化接口InitLogging,里面参数是LoggingSettings类里面包含一个 LoggingDestination(日志要输出到那里的设定),我们给它设置为 LOG_TO_NONE,也就是不进行日志输出。

namespace logging
{
    enum LoggingDestination
    {
        LOG_TO_NONE = 0
    };
    struct BUTIL_EXPORT LoggingSettings
    {
        LoggingSettings();
        LoggingDestination logging_dest;
    };
    bool InitLogging(const LoggingSettings &settings);
}

3.2 protobuf 类与接口

protobuf最主要就是替我们进行某种结构的序列化以及反序列化,除此之外,它还可以生成rpc通信接口来供我们进行rpc调用。就是替我们生成一系列接口,我们只需要创建子类去继承基类,然后重写基类内部虚函数,然后就可以实现client和server远程调用了。用起来还是比较方便的。

关于下面所有接口介绍,我们编写一个简单的proto文件,通过这个简单的proto文件进行一个框架代码生成后,在对这些结构进行介绍。

下面我们以 Echo(输出 hello world)方法为例, 来讲解基础的同步 RPC 请求是如何实现的

创建 proto 文件 - main.proto

//声明proto版本
syntax="proto3";

//声明命名空间,protobuf会针对proto编译,使用C++生成一系列框架,框架里面包含很多类,为了避免这些类与其他冲突,所以让我们设置一个命名空间
package example;
 
//生成相关rpc接口
option cc_generic_services = true;

//定义 Echo 方法请求对象,请求对象内部有那些字段
message EchoRequest {
 string message = 1;//protobuf会针对我们所写的内容进行编译,会将各个字段名称和消息都记录起来,因此需要我们给每个字段一个编号
};


//定义 Echo 方法响应对象,响应对象内部有那些字段
message EchoResponse {
 string message = 1;
};

//定义 RPC 远端调用接口,(EchoService是接口基类名称)
service EchoService {
 //到时候会生成一个远程的Echo调用接口,请求的时候需要传EchoRequest进行请求,最终会返回一个EchoResponse
 rpc Echo(EchoRequest) returns (EchoResponse);
};

执行下面指令,就会生成两个文件,mian.pb.h 是头文件,main.pb.cc是实现

protoc --cpp_out=./ main.proto

这是刚才的命名空间,里面有EchoRequset类继承protobuf中的Message

在这里插入图片描述

这里我们只有message一个字段,有获取message和设置message,这里都给我们生成了。

在这里插入图片描述

当前这里还有生成的EchoResponse类,里面也有个message字段

在这里插入图片描述

还有EchoService继承protobuf中的Service类,这个就是远程服务器使用的一个类,这里面包含了我们设置的Echo接口,不过它是一个虚函数。到时候我们编写的时候创建一个子类继承EchoService去实现内部的Echo虚函数,针对requset请求做出什么处理放到response里。最终发送给客户端。

在这里插入图片描述

EchoService_Stub是客户端使用的一个类,内部也有一个Echo接口让用户去使用,用户自己定义一个request请求,最终得到一个response响应。

在这里插入图片描述

不管是服务端还是客户端里面都会涉及到protobuf类的一些接口

在这里插入图片描述

protobuf相关类:

RpcController:上下文管理类–目前主要用于判断rpc请求是否是OK的
Closure:客户端-主要用于设置异步处理回调;服务端:通过Run()接口宣告请求处理完毕

namespace google
{
    namespace protobuf
    {
        class PROTOBUF_EXPORT Closure
        {
        public:
            Closure() {}
            virtual ~Closure();
            virtual void Run() = 0;
        };
        inline Closure *NewCallback(void (*function)());
        class PROTOBUF_EXPORT RpcController
        {
            bool Failed();
            std::string ErrorText();
        }
    }
}

3.3 服务端类与接口

这里只介绍主要用到的成员与接口。

服务端相关类:
Server类:服务器类;
ServerOptions:服务器参数配置类
ClosureGuard:Closure对象的智能管理类

namespace brpc
{
	//服务器参数设置
    struct ServerOptions
    {
        // 无数据传输,则指定时间后关闭连接
        int idle_timeout_sec; // Default: -1 (disabled)
        int num_threads;      // Default: #cpu-cores
                              //....
    } 
    enum ServiceOwnership {
        // 添加服务失败时,服务器将负责删除服务对象
        SERVER_OWNS_SERVICE,
        // 添加服务失败时,服务器也不会删除服务对象
        SERVER_DOESNT_OWN_SERVICE
    };
    class Server
    {
    	//添加什么服务,才会处理什么请求
        int AddService(google::protobuf::Service *service,
                       ServiceOwnership ownership);
        //启动服务器
        int Start(int port, const ServerOptions *opt);
        int Stop(int closewait_ms /*not used anymore*/);
        int Join();
        // 休眠直到 ctrl+c 按下,或者 stop 和 join 服务器
        void RunUntilAskedToQuit();
    } 
    class ClosureGuard
    {
        explicit ClosureGuard(google::protobuf::Closure *done);
        ~ClosureGuard()
        {
            if (_done)
                _done->Run();
        }
    } 
    class HttpHeader
    {
        void set_content_type(const std::string &type)
            const std::string *GetHeader(const std::string &key) void SetHeader(const std::string &key,
                                                                                const std::string &value);
        const URI &uri() const { return _uri; }
        HttpMethod method() const { return _method; }
        void set_method(const HttpMethod method) int status_code() void set_status_code(int status_code);

    } 
    class Controller : public google::protobuf::RpcController
    {
        void set_timeout_ms(int64_t timeout_ms);
        void set_max_retry(int max_retry);
        google::protobuf::Message *response();
        HttpHeader &http_response();
        HttpHeader &http_request();
        //判断请求是否成功
        bool Failed();
        //获取错误信息
        std::string ErrorText();
		
		//可以设置对应回调函数,当响应被发送完毕进行调用
        using AfterRpcRespFnType = std::function<
            void(Controller *cntl,
                 const google::protobuf::Message *req,
                 const google::protobuf::Message *res)>;
        void set_after_rpc_resp_fn(AfterRpcRespFnType &&fn)
    }
}

3.4 客户端类与接口

客户端相关类:
Channel:客户端与服务器网络通信信道类;
ChannelOptions:信道配置类

namespace brpc
{
    struct ChannelOptions
    {
        // 请求连接超时时间
        int32_t connect_timeout_ms; // Default: 200 (milliseconds)
        // rpc 请求超时时间
        int32_t timeout_ms; // Default: 500 (milliseconds)
        // 最大重试次数
        int max_retry; // Default: 3
        // 序列化协议类型 options.protocol = "baidu_std";
        AdaptiveProtocolType protocol;
        //....
    } 
    //以信道方式与服务器进行网络通信
    class Channel : public ChannelBase
    {
        // 初始化接口,成功返回 0;
        int Init(const char *server_addr_and_port,
                 const ChannelOptions *options);
    }
}

4. 使用

Rpc调用实现样例:

服务端:

  1. 创建rpc服务子类继承pb中的EchoService服务类,并实现内部的业务接口逻辑
  2. 创建rpc服务器类,搭建服务器
  3. 向服务器类中添加rpc子服务对象–告诉服务器收到什么请求用哪个接口处理
  4. 启动服务器

客户端:

  1. 创建网络通信信道
  2. 实例化pb中的EchoService_Stub类对象
  3. 发起rpc请求,获取响应进行处理

4.1 同步调用

同步调用是指客户端会阻塞收到 server 端的响应或发生错误。

服务器:

#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"


//1. 继承与EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServerImpl : public example::EchoService
{
    public:
        EchoServerImpl(){}
        virtual ~EchoServerImpl(){}
        
        // controller:包含除了 request 和 response 之外的参数集合
        // request: 请求,只读的,来自 client 端的数据包
        // response: 回复。需要用户填充,如果存在 required 字段没有被设置,该次调用会失败。
        /* done: done 由框架创建,递给服务回调,包含了调用服务回调后的后续
        动作,包括检查 response 正确性,序列化,打包,发送等逻辑。不管成功失败,
        done->Run()必须在请求处理完成后被用户调用一次*/
        void Echo(google::protobuf::RpcController* controller,
            const ::example::EchoRequest* request,
            ::example::EchoResponse* response,
            ::google::protobuf::Closure* don)
        {
            //将don管理起来,析构的时候自动调用don->run();
            brpc::ClosureGuard rpc_guard(don);
            std::cout<<"收到消息: "<<request->message()<<std::endl;
            std::string str = request->message() + "--这是响应";
            response->set_message(str);
        }
};


int main(int argc,char* argv[])
{
    //关闭brpc的默认日志输出
    logging::LoggingSettings settings;
    settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
    logging::InitLogging(settings);

    //2. 构造服务器对象
    brpc::Server server;
    //3. 向服务器对象新增EchoService服务
    EchoServerImpl echo_service;
    if(server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE)  == -1)
    {
        std::cout << "添加Rpc服务失败!\n";
        return -1;
    }
    //4. 启动服务器
    brpc::ServerOptions options;
    options.idle_timeout_sec = -1;//连接空闲超时时间-超时后连接被关闭
    options.num_threads = 1;// IO线程数量
    if(server.Start(8080,&options) == -1)
    {
        std::cout << "启动服务器失败!\n";
        return -1;
    }
    server.RunUntilAskedToQuit();//修改等待运行结束
}

客户端:

#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"


int main(int argc,char* argv[])
{
    //1. 构造Channel信道,连接服务器
    brpc::ChannelOptions options;
    options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待
    options.timeout_ms = -1;//rpc请求等待超时时间,-1表示一直等待
    options.max_retry = 3;//请求重试次数
    options.protocol = "baidu_std";//序列化协议,默认使用baidu_std
    brpc::Channel channel;
    if(channel.Init("127.0.0.1:8080",&options) == -1)
    {
        std::cout << "初始化信道失败!\n";
        return -1;
    }
    //2. 构造EchoService_Stub对象,用于进行rpc调用
    example::EchoService_Stub stub(&channel);
    //3. 进行Rpc调用,发起Rpc请求,获取响应进行处理
    example::EchoRequest req;
    req.set_message("你好,吃饭了吗");

    brpc::Controller cntl;
    example::EchoResponse rsp;
    stub.Echo(&cntl,&req,&rsp,nullptr);//同步调用,阻塞等待响应
    if(cntl->Failed())
    {
        std::cout<<"Rpc调用失败: "<<cntl->ErrorText()<<std::endl;
        return -1;
    }
    std::cout<<"收到响应: "<<rsp->message()<<std::endl;
    delete cntl;
    delete rsp;
    return 0;
}
all : server client
server : server.cc main.pb.cc
	g++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client : client.cc main.pb.cc
	g++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb

4.2 异步调用

异步调用是指客户端注册一个响应处理回调函数, 当调用一个 RPC 接口时立即返回,不会阻塞等待响应, 当 server 端返回响应时会调用传入的回调函数处理响应。

#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"


void callback(brpc::Controller* cntl, ::example::EchoResponse* response)
{
    //使用智能指针管理不然调用失败,两个指针指向对象没释放造成内存泄漏
    std::unique_ptr<brpc::Controller> cntl_guard(cntl);
    std::unique_ptr<example::EchoResponse> resp_guard(response);
    if (cntl->Failed()) 
    {
        std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;
        return;
    }
    std::cout << "收到响应: " << response->message() << std::endl;
}

int main(int argc,char* argv[])
{
    //1. 构造Channel信道,连接服务器
    brpc::ChannelOptions options;
    options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待
    options.timeout_ms = -1;//rpc请求等待超时时间,-1表示一直等待
    options.max_retry = 3;//请求重试次数
    options.protocol = "baidu_std";//序列化协议,默认使用baidu_std
    brpc::Channel channel;
    if(channel.Init("127.0.0.1:8080",&options) == -1)
    {
        std::cout << "初始化信道失败!\n";
        return -1;
    }
    //2. 构造EchoService_Stub对象,用于进行rpc调用
    example::EchoService_Stub stub(&channel);
    //3. 进行Rpc调用,发起Rpc请求,获取响应进行处理
    example::EchoRequest req;
    req.set_message("你好,吃饭了吗");

    //异步调用,并不会阻塞在Echo,如果下面两个用的是局部变量就有可能出了作用域就销毁了
    //但是异步调用接口还要用,因此在堆上申请
    brpc::Controller* cntl = new brpc::Controller;
    example::EchoResponse* rsp = new example::EchoResponse;
    // stub.Echo(cntl,&req,rsp,nullptr);//同步调用,阻塞等待响应
    // if(cntl->Failed())
    // {
    //     std::cout<<"Rpc调用失败: "<<cntl->ErrorText()<<std::endl;
    //     return -1;
    // }
    // std::cout<<"收到响应: "<<rsp->message()<<std::endl;
    // delete cntl;
    // delete rsp;

    auto clusure = google::protobuf::NewCallback(callback,cntl,rsp);
    stub.Echo(cntl, &req, rsp, clusure); //异步调用
    std::cout << "异步调用结束!\n";
    std::this_thread::sleep_for(std::chrono::seconds(3));
    return 0;
}

网站公告

今日签到

点亮在社区的每一天
去签到