brpc安装与使用
1. brpc是什么
brpc 是用 c++语言编写的工业级 RPC 框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统。
brpc具体来说它是一个RPC框架(RPC是一个远程调用的框架)。当客户端想完成某个功能,但是本质来说这个功能并不在客户端完成,而是将这个算力交给服务器来进行。通过网络通信的方式将所需要的参数/数据/输入参数发送给服务器,服务器在针对参数/数据进行处理,然后给客户端返回一个结果。
对客户端来说它的上层并不关心内部网络通信的过程,感觉就是调用了一个接口,就完成了数据的处理拿到了结果。其实就是将数据处理过程交给服务器完成。
就比如下面,服务器内部实现了一个具体的Add功能,而本地也有一个Add接口,但是这个接口内部实现的并不是具体的处理过程,而是在内部通过网络通信向服务器发布一个请求,将参数num1、num2发送给服务器,服务器在调用它内部实现的Add方法去完成具体的处理,处理之后在将结果返回给本地,本地拿到结果,然后它的Add接口在返回最终结果。
可以使用brpc:
- 搭建能在一个端口支持多协议的服务, 或访问各种服务
-
- restful http/https, h2/gRPC。使用 brpc 的 http 实现比 libcurl 方便多了。从其他语言通过 HTTP/h2+json 访问基于 protobuf 的协议.
-
- redis 和 memcached, 线程安全,比官方 client 更方便。
-
- rtmp/flv/hls, 可用于搭建流媒体服务.
-
- 支持 thrift , 线程安全,比官方 client 更方便
-
- 各种百度内使用的协议: baidu_std, streaming_rpc, hulu_pbrpc, sofa_pbrpc, nova_pbrpc, public_pbrpc, ubrpc 和使用 nshead 的各种协议.
-
- 基于工业级的 RAFT 算法实现搭建高可用分布式系统,已在 braft 开源。
- Server 能同步或异步处理请求。
- Client 支持同步、异步、半同步,或使用组合 channels 简化复杂的分库或并发访问。
- 通过 http 界面调试服务, 使用 cpu, heap, contention profilers.
- 获得更好的延时和吞吐.
- 把你组织中使用的协议快速地加入 brpc,或定制各类组件, 包括命名服务 (dns, zk, etcd), 负载均衡 (rr, random, consistent hashing)
2. 安装
先安装依赖
sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
安装 brpc
git clone https://github.com/apache/brpc.git
cd brpc
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=/usr ..
cmake --build . -j6
make
sudo make install
3. 类与接口介绍
3.1 日志输出类与接口
包含头文件: #include <butil/logging.h>
日志输出这里,本质上我们其实用不着 brpc 的日志输出,因此在这里主要介绍如何关闭日志输出。
日志初始化接口InitLogging,里面参数是LoggingSettings类里面包含一个 LoggingDestination(日志要输出到那里的设定),我们给它设置为 LOG_TO_NONE,也就是不进行日志输出。
namespace logging
{
enum LoggingDestination
{
LOG_TO_NONE = 0
};
struct BUTIL_EXPORT LoggingSettings
{
LoggingSettings();
LoggingDestination logging_dest;
};
bool InitLogging(const LoggingSettings &settings);
}
3.2 protobuf 类与接口
protobuf最主要就是替我们进行某种结构的序列化以及反序列化,除此之外,它还可以生成rpc通信接口来供我们进行rpc调用。就是替我们生成一系列接口,我们只需要创建子类去继承基类,然后重写基类内部虚函数,然后就可以实现client和server远程调用了。用起来还是比较方便的。
关于下面所有接口介绍,我们编写一个简单的proto文件,通过这个简单的proto文件进行一个框架代码生成后,在对这些结构进行介绍。
下面我们以 Echo(输出 hello world)方法为例, 来讲解基础的同步 RPC 请求是如何实现的
创建 proto 文件 - main.proto
//声明proto版本
syntax="proto3";
//声明命名空间,protobuf会针对proto编译,使用C++生成一系列框架,框架里面包含很多类,为了避免这些类与其他冲突,所以让我们设置一个命名空间
package example;
//生成相关rpc接口
option cc_generic_services = true;
//定义 Echo 方法请求对象,请求对象内部有那些字段
message EchoRequest {
string message = 1;//protobuf会针对我们所写的内容进行编译,会将各个字段名称和消息都记录起来,因此需要我们给每个字段一个编号
};
//定义 Echo 方法响应对象,响应对象内部有那些字段
message EchoResponse {
string message = 1;
};
//定义 RPC 远端调用接口,(EchoService是接口基类名称)
service EchoService {
//到时候会生成一个远程的Echo调用接口,请求的时候需要传EchoRequest进行请求,最终会返回一个EchoResponse
rpc Echo(EchoRequest) returns (EchoResponse);
};
执行下面指令,就会生成两个文件,mian.pb.h 是头文件,main.pb.cc是实现
protoc --cpp_out=./ main.proto
这是刚才的命名空间,里面有EchoRequset类继承protobuf中的Message
这里我们只有message一个字段,有获取message和设置message,这里都给我们生成了。
当前这里还有生成的EchoResponse类,里面也有个message字段
还有EchoService继承protobuf中的Service类,这个就是远程服务器使用的一个类,这里面包含了我们设置的Echo接口,不过它是一个虚函数。到时候我们编写的时候创建一个子类继承EchoService去实现内部的Echo虚函数,针对requset请求做出什么处理放到response里。最终发送给客户端。
EchoService_Stub是客户端使用的一个类,内部也有一个Echo接口让用户去使用,用户自己定义一个request请求,最终得到一个response响应。
不管是服务端还是客户端里面都会涉及到protobuf类的一些接口
protobuf相关类:
RpcController:上下文管理类–目前主要用于判断rpc请求是否是OK的
Closure:客户端-主要用于设置异步处理回调;服务端:通过Run()接口宣告请求处理完毕
namespace google
{
namespace protobuf
{
class PROTOBUF_EXPORT Closure
{
public:
Closure() {}
virtual ~Closure();
virtual void Run() = 0;
};
inline Closure *NewCallback(void (*function)());
class PROTOBUF_EXPORT RpcController
{
bool Failed();
std::string ErrorText();
}
}
}
3.3 服务端类与接口
这里只介绍主要用到的成员与接口。
服务端相关类:
Server类:服务器类;
ServerOptions:服务器参数配置类
ClosureGuard:Closure对象的智能管理类
namespace brpc
{
//服务器参数设置
struct ServerOptions
{
// 无数据传输,则指定时间后关闭连接
int idle_timeout_sec; // Default: -1 (disabled)
int num_threads; // Default: #cpu-cores
//....
}
enum ServiceOwnership {
// 添加服务失败时,服务器将负责删除服务对象
SERVER_OWNS_SERVICE,
// 添加服务失败时,服务器也不会删除服务对象
SERVER_DOESNT_OWN_SERVICE
};
class Server
{
//添加什么服务,才会处理什么请求
int AddService(google::protobuf::Service *service,
ServiceOwnership ownership);
//启动服务器
int Start(int port, const ServerOptions *opt);
int Stop(int closewait_ms /*not used anymore*/);
int Join();
// 休眠直到 ctrl+c 按下,或者 stop 和 join 服务器
void RunUntilAskedToQuit();
}
class ClosureGuard
{
explicit ClosureGuard(google::protobuf::Closure *done);
~ClosureGuard()
{
if (_done)
_done->Run();
}
}
class HttpHeader
{
void set_content_type(const std::string &type)
const std::string *GetHeader(const std::string &key) void SetHeader(const std::string &key,
const std::string &value);
const URI &uri() const { return _uri; }
HttpMethod method() const { return _method; }
void set_method(const HttpMethod method) int status_code() void set_status_code(int status_code);
}
class Controller : public google::protobuf::RpcController
{
void set_timeout_ms(int64_t timeout_ms);
void set_max_retry(int max_retry);
google::protobuf::Message *response();
HttpHeader &http_response();
HttpHeader &http_request();
//判断请求是否成功
bool Failed();
//获取错误信息
std::string ErrorText();
//可以设置对应回调函数,当响应被发送完毕进行调用
using AfterRpcRespFnType = std::function<
void(Controller *cntl,
const google::protobuf::Message *req,
const google::protobuf::Message *res)>;
void set_after_rpc_resp_fn(AfterRpcRespFnType &&fn)
}
}
3.4 客户端类与接口
客户端相关类:
Channel:客户端与服务器网络通信信道类;
ChannelOptions:信道配置类
namespace brpc
{
struct ChannelOptions
{
// 请求连接超时时间
int32_t connect_timeout_ms; // Default: 200 (milliseconds)
// rpc 请求超时时间
int32_t timeout_ms; // Default: 500 (milliseconds)
// 最大重试次数
int max_retry; // Default: 3
// 序列化协议类型 options.protocol = "baidu_std";
AdaptiveProtocolType protocol;
//....
}
//以信道方式与服务器进行网络通信
class Channel : public ChannelBase
{
// 初始化接口,成功返回 0;
int Init(const char *server_addr_and_port,
const ChannelOptions *options);
}
}
4. 使用
Rpc调用实现样例:
服务端:
- 创建rpc服务子类继承pb中的EchoService服务类,并实现内部的业务接口逻辑
- 创建rpc服务器类,搭建服务器
- 向服务器类中添加rpc子服务对象–告诉服务器收到什么请求用哪个接口处理
- 启动服务器
客户端:
- 创建网络通信信道
- 实例化pb中的EchoService_Stub类对象
- 发起rpc请求,获取响应进行处理
4.1 同步调用
同步调用是指客户端会阻塞收到 server 端的响应或发生错误。
服务器:
#include <brpc/server.h>
#include <butil/logging.h>
#include "main.pb.h"
//1. 继承与EchoService创建一个子类,并实现rpc调用的业务功能
class EchoServerImpl : public example::EchoService
{
public:
EchoServerImpl(){}
virtual ~EchoServerImpl(){}
// controller:包含除了 request 和 response 之外的参数集合
// request: 请求,只读的,来自 client 端的数据包
// response: 回复。需要用户填充,如果存在 required 字段没有被设置,该次调用会失败。
/* done: done 由框架创建,递给服务回调,包含了调用服务回调后的后续
动作,包括检查 response 正确性,序列化,打包,发送等逻辑。不管成功失败,
done->Run()必须在请求处理完成后被用户调用一次*/
void Echo(google::protobuf::RpcController* controller,
const ::example::EchoRequest* request,
::example::EchoResponse* response,
::google::protobuf::Closure* don)
{
//将don管理起来,析构的时候自动调用don->run();
brpc::ClosureGuard rpc_guard(don);
std::cout<<"收到消息: "<<request->message()<<std::endl;
std::string str = request->message() + "--这是响应";
response->set_message(str);
}
};
int main(int argc,char* argv[])
{
//关闭brpc的默认日志输出
logging::LoggingSettings settings;
settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
logging::InitLogging(settings);
//2. 构造服务器对象
brpc::Server server;
//3. 向服务器对象新增EchoService服务
EchoServerImpl echo_service;
if(server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE) == -1)
{
std::cout << "添加Rpc服务失败!\n";
return -1;
}
//4. 启动服务器
brpc::ServerOptions options;
options.idle_timeout_sec = -1;//连接空闲超时时间-超时后连接被关闭
options.num_threads = 1;// IO线程数量
if(server.Start(8080,&options) == -1)
{
std::cout << "启动服务器失败!\n";
return -1;
}
server.RunUntilAskedToQuit();//修改等待运行结束
}
客户端:
#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"
int main(int argc,char* argv[])
{
//1. 构造Channel信道,连接服务器
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待
options.timeout_ms = -1;//rpc请求等待超时时间,-1表示一直等待
options.max_retry = 3;//请求重试次数
options.protocol = "baidu_std";//序列化协议,默认使用baidu_std
brpc::Channel channel;
if(channel.Init("127.0.0.1:8080",&options) == -1)
{
std::cout << "初始化信道失败!\n";
return -1;
}
//2. 构造EchoService_Stub对象,用于进行rpc调用
example::EchoService_Stub stub(&channel);
//3. 进行Rpc调用,发起Rpc请求,获取响应进行处理
example::EchoRequest req;
req.set_message("你好,吃饭了吗");
brpc::Controller cntl;
example::EchoResponse rsp;
stub.Echo(&cntl,&req,&rsp,nullptr);//同步调用,阻塞等待响应
if(cntl->Failed())
{
std::cout<<"Rpc调用失败: "<<cntl->ErrorText()<<std::endl;
return -1;
}
std::cout<<"收到响应: "<<rsp->message()<<std::endl;
delete cntl;
delete rsp;
return 0;
}
all : server client
server : server.cc main.pb.cc
g++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
client : client.cc main.pb.cc
g++ -std=c++17 $^ -o $@ -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb
4.2 异步调用
异步调用是指客户端注册一个响应处理回调函数, 当调用一个 RPC 接口时立即返回,不会阻塞等待响应, 当 server 端返回响应时会调用传入的回调函数处理响应。
#include <brpc/channel.h>
#include <thread>
#include "main.pb.h"
void callback(brpc::Controller* cntl, ::example::EchoResponse* response)
{
//使用智能指针管理不然调用失败,两个指针指向对象没释放造成内存泄漏
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<example::EchoResponse> resp_guard(response);
if (cntl->Failed())
{
std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;
return;
}
std::cout << "收到响应: " << response->message() << std::endl;
}
int main(int argc,char* argv[])
{
//1. 构造Channel信道,连接服务器
brpc::ChannelOptions options;
options.connect_timeout_ms = -1;// 连接等待超时时间,-1表示一直等待
options.timeout_ms = -1;//rpc请求等待超时时间,-1表示一直等待
options.max_retry = 3;//请求重试次数
options.protocol = "baidu_std";//序列化协议,默认使用baidu_std
brpc::Channel channel;
if(channel.Init("127.0.0.1:8080",&options) == -1)
{
std::cout << "初始化信道失败!\n";
return -1;
}
//2. 构造EchoService_Stub对象,用于进行rpc调用
example::EchoService_Stub stub(&channel);
//3. 进行Rpc调用,发起Rpc请求,获取响应进行处理
example::EchoRequest req;
req.set_message("你好,吃饭了吗");
//异步调用,并不会阻塞在Echo,如果下面两个用的是局部变量就有可能出了作用域就销毁了
//但是异步调用接口还要用,因此在堆上申请
brpc::Controller* cntl = new brpc::Controller;
example::EchoResponse* rsp = new example::EchoResponse;
// stub.Echo(cntl,&req,rsp,nullptr);//同步调用,阻塞等待响应
// if(cntl->Failed())
// {
// std::cout<<"Rpc调用失败: "<<cntl->ErrorText()<<std::endl;
// return -1;
// }
// std::cout<<"收到响应: "<<rsp->message()<<std::endl;
// delete cntl;
// delete rsp;
auto clusure = google::protobuf::NewCallback(callback,cntl,rsp);
stub.Echo(cntl, &req, rsp, clusure); //异步调用
std::cout << "异步调用结束!\n";
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}