一、Cap’n Proto介绍
官网: Cap’n Proto
定义
Cap’n Proto是一款号称具有"infinity faster"的RPC框架。你可以认为它是JSON,只是它直接生成了二进制格式的消息。因为其没有任何encoding/decoding步骤,所以其性能得到大幅提升。由于其异步Promise PipeLine的特点,相比于传统RPC方式实现foo+bar调用,其不需要返回X中间结果,而是一次性将请求发送给Server,Server则只需要返回一个Promise。
四个等级(由低到高)
- Object references and promise pipelining
- Persistent capabilities
- Three-way interactions
- Reference equality / joining
五种模式
- 1v1单向
Server提供能力,client使用能力,只能client主动发起对能力的request, server只能被动应答不支持server主动发起request
- 1v1双向
Client和server都提供相同或不同的能力,互相调用对方的能力
- Nv1单向
同一个server可以为多个client提供相同或不同的能力,不支持server主动发起request
- Nv1 双向
同一个server可以为多个client提供相同或不同的能力,Client也可以提供能力给server使用
- NvN单双向
同一个server可以为多个client提供相同或不同的能力,同一个Client也可以提供能力给外部用
性能提升比较图
通信流程图
时序图
组件图
二、Cap’n Proto编译手顺
1、安装Cap’n Proto所需的GNU自动工具
sudo apt-get install libtool
sudo apt-get install autoconf
sudo apt-get install automake
2、安装Cap'n Proto并安装编译
git clone https://github.com/capnproto/capnproto.git
cd capnproto/c++
autoreconf -i
./configure
make -j6 check
sudo make install
三、Cap’n Proto Sapmle
本例子来源于官方代码,路径如下:
capnproto/c++/samples
1、server和client编写
1.calculator.capnp
@0x85150b117366d14b;
interface Calculator {
# A "simple" mathematical calculator, callable via RPC.
#
# But, to show off Cap'n Proto, we add some twists:
#
# - You can use the result from one call as the input to the next
# without a network round trip. To accomplish this, evaluate()
# returns a `Value` object wrapping the actual numeric value.
# This object may be used in a subsequent expression. With
# promise pipelining, the Value can actually be used before
# the evaluate() call that creates it returns!
#
# - You can define new functions, and then call them. This again
# shows off pipelining, but it also gives the client the
# opportunity to define a function on the client side and have
# the server call back to it.
#
# - The basic arithmetic operators are exposed as Functions, and
# you have to call getOperator() to obtain them from the server.
# This again demonstrates pipelining -- using getOperator() to
# get each operator and then using them in evaluate() still
# only takes one network round trip.
evaluate @0 (expression :Expression) -> (value :Value);
# Evaluate the given expression and return the result. The
# result is returned wrapped in a Value interface so that you
# may pass it back to the server in a pipelined request. To
# actually get the numeric value, you must call read() on the
# Value -- but again, this can be pipelined so that it incurs
# no additional latency.
struct Expression {
# A numeric expression.
union {
literal @0 :Float64;
# A literal numeric value.
previousResult @1 :Value;
# A value that was (or, will be) returned by a previous
# evaluate().
parameter @2 :UInt32;
# A parameter to the function (only valid in function bodies;
# see defFunction).
call :group {
# Call a function on a list of parameters.
function @3 :Function;
params @4 :List(Expression);
}
}
}
interface Value {
# Wraps a numeric value in an RPC object. This allows the value
# to be used in subsequent evaluate() requests without the client
# waiting for the evaluate() that returns the Value to finish.
read @0 () -> (value :Float64);
# Read back the raw numeric value.
}
defFunction @1 (paramCount :Int32, body :Expression)
-> (func :Function);
# Define a function that takes `paramCount` parameters and returns the
# evaluation of `body` after substituting these parameters.
interface Function {
# An algebraic function. Can be called directly, or can be used inside
# an Expression.
#
# A client can create a Function that runs on the server side using
# `defFunction()` or `getOperator()`. Alternatively, a client can
# implement a Function on the client side and the server will call back
# to it. However, a function defined on the client side will require a
# network round trip whenever the server needs to call it, whereas
# functions defined on the server and then passed back to it are called
# locally.
call @0 (params :List(Float64)) -> (value :Float64);
# Call the function on the given parameters.
}
getOperator @2 (op :Operator) -> (func :Function);
# Get a Function representing an arithmetic operator, which can then be
# used in Expressions.
enum Operator {
add @0;
subtract @1;
multiply @2;
divide @3;
}
}
2.calculator-server.c++
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "calculator.capnp.h"
#include <kj/debug.h>
#include <capnp/ez-rpc.h>
#include <capnp/message.h>
#include <iostream>
typedef unsigned int uint;
kj::Promise<double> readValue(Calculator::Value::Client value) {
// Helper function to asynchronously call read() on a Calculator::Value and
// return a promise for the result. (In the future, the generated code might
// include something like this automatically.)
return value.readRequest().send()
.then([](capnp::Response<Calculator::Value::ReadResults> result) {
return result.getValue();
});
}
kj::Promise<double> evaluateImpl(
Calculator::Expression::Reader expression,
capnp::List<double>::Reader params = capnp::List<double>::Reader()) {
// Implementation of CalculatorImpl::evaluate(), also shared by
// FunctionImpl::call(). In the latter case, `params` are the parameter
// values passed to the function; in the former case, `params` is just an
// empty list.
switch (expression.which()) {
case Calculator::Expression::LITERAL:
return expression.getLiteral();
case Calculator::Expression::PREVIOUS_RESULT:
return readValue(expression.getPreviousResult());
case Calculator::Expression::PARAMETER: {
KJ_REQUIRE(expression.getParameter() < params.size(),
"Parameter index out-of-range.");
return params[expression.getParameter()];
}
case Calculator::Expression::CALL: {
auto call = expression.getCall();
auto func = call.getFunction();
// Evaluate each parameter.
kj::Array<kj::Promise<double>> paramPromises =
KJ_MAP(param, call.getParams()) {
return evaluateImpl(param, params);
};
// Join the array of promises into a promise for an array.
kj::Promise<kj::Array<double>> joinedParams =
kj::joinPromises(kj::mv(paramPromises));
// When the parameters are complete, call the function.
return joinedParams.then([KJ_CPCAP(func)](kj::Array<double>&& paramValues) mutable {
auto request = func.callRequest();
request.setParams(paramValues);
return request.send().then(
[](capnp::Response<Calculator::Function::CallResults>&& result) {
return result.getValue();
});
});
}
default:
// Throw an exception.
KJ_FAIL_REQUIRE("Unknown expression type.");
}
}
class ValueImpl final: public Calculator::Value::Server {
// Simple implementation of the Calculator.Value Cap'n Proto interface.
public:
ValueImpl(double value): value(value) {}
kj::Promise<void> read(ReadContext context) {
context.getResults().setValue(value);
return kj::READY_NOW;
}
private:
double value;
};
class FunctionImpl final: public Calculator::Function::Server {
// Implementation of the Calculator.Function Cap'n Proto interface, where the
// function is defined by a Calculator.Expression.
public:
FunctionImpl(uint paramCount, Calculator::Expression::Reader body)
: paramCount(paramCount) {
this->body.setRoot(body);
}
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == paramCount, "Wrong number of parameters.");
return evaluateImpl(body.getRoot<Calculator::Expression>(), params)
.then([KJ_CPCAP(context)](double value) mutable {
context.getResults().setValue(value);
});
}
private:
uint paramCount;
// The function's arity.
capnp::MallocMessageBuilder body;
// Stores a permanent copy of the function body.
};
class OperatorImpl final: public Calculator::Function::Server {
// Implementation of the Calculator.Function Cap'n Proto interface, wrapping
// basic binary arithmetic operators.
public:
OperatorImpl(Calculator::Operator op): op(op) {}
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == 2, "Wrong number of parameters.");
double result;
switch (op) {
case Calculator::Operator::ADD: result = params[0] + params[1]; break;
case Calculator::Operator::SUBTRACT:result = params[0] - params[1]; break;
case Calculator::Operator::MULTIPLY:result = params[0] * params[1]; break;
case Calculator::Operator::DIVIDE: result = params[0] / params[1]; break;
default:
KJ_FAIL_REQUIRE("Unknown operator.");
}
context.getResults().setValue(result);
return kj::READY_NOW;
}
private:
Calculator::Operator op;
};
class CalculatorImpl final: public Calculator::Server {
// Implementation of the Calculator Cap'n Proto interface.
public:
kj::Promise<void> evaluate(EvaluateContext context) override {
return evaluateImpl(context.getParams().getExpression())
.then([KJ_CPCAP(context)](double value) mutable {
context.getResults().setValue(kj::heap<ValueImpl>(value));
});
}
kj::Promise<void> defFunction(DefFunctionContext context) override {
auto params = context.getParams();
context.getResults().setFunc(kj::heap<FunctionImpl>(
params.getParamCount(), params.getBody()));
return kj::READY_NOW;
}
kj::Promise<void> getOperator(GetOperatorContext context) override {
context.getResults().setFunc(kj::heap<OperatorImpl>(
context.getParams().getOp()));
return kj::READY_NOW;
}
};
int main(int argc, const char* argv[]) {
if (argc != 2) {
std::cerr << "usage: " << argv[0] << " ADDRESS[:PORT]\n"
"Runs the server bound to the given address/port.\n"
"ADDRESS may be '*' to bind to all local addresses.\n"
":PORT may be omitted to choose a port automatically." << std::endl;
return 1;
}
// Set up a server.
capnp::EzRpcServer server(kj::heap<CalculatorImpl>(), argv[1]);
// Write the port number to stdout, in case it was chosen automatically.
auto& waitScope = server.getWaitScope();
uint port = server.getPort().wait(waitScope);
if (port == 0) {
// The address format "unix:/path/to/socket" opens a unix domain socket,
// in which case the port will be zero.
std::cout << "Listening on Unix socket..." << std::endl;
} else {
std::cout << "Listening on port " << port << "..." << std::endl;
}
// Run forever, accepting connections and handling requests.
kj::NEVER_DONE.wait(waitScope);
}
3.calculator-client.c++
#include "calculator.capnp.h"
#include <capnp/ez-rpc.h>
#include <kj/debug.h>
#include <math.h>
#include <iostream>
class PowerFunction final: public Calculator::Function::Server {
public:
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == 2, "Wrong number of parameters.");
context.getResults().setValue(pow(params[0], params[1]));
return kj::READY_NOW;
}
};
int main(int argc, const char* argv[]) {
if (argc != 2) {
std::cerr << "usage: " << argv[0] << " HOST:PORT\n"
"Connects to the Calculator server at the given address and "
"does some RPCs." << std::endl;
return 1;
}
capnp::EzRpcClient client(argv[1]);
Calculator::Client calculator = client.getMain<Calculator>();
// Keep an eye on `waitScope`. Whenever you see it used is a place where we
// stop and wait for the server to respond. If a line of code does not use
// `waitScope`, then it does not block!
auto& waitScope = client.getWaitScope();
{
// Make a request that just evaluates the literal value 123.
//
// What's interesting here is that evaluate() returns a "Value", which is
// another interface and therefore points back to an object living on the
// server. We then have to call read() on that object to read it.
// However, even though we are making two RPC's, this block executes in
// *one* network round trip because of promise pipelining: we do not wait
// for the first call to complete before we send the second call to the
// server.
std::cout << "Evaluating a literal... ";
std::cout.flush();
// Set up the request.
auto request = calculator.evaluateRequest();
request.getExpression().setLiteral(123);
// Send it, which returns a promise for the result (without blocking).
auto evalPromise = request.send();
// Using the promise, create a pipelined request to call read() on the
// returned object, and then send that.
auto readPromise = evalPromise.getValue().readRequest().send();
// Now that we've sent all the requests, wait for the response. Until this
// point, we haven't waited at all!
auto response = readPromise.wait(waitScope);
KJ_ASSERT(response.getValue() == 123);
std::cout << "PASS" << std::endl;
}
{
// Make a request to evaluate 123 + 45 - 67.
//
// The Calculator interface requires that we first call getOperator() to
// get the addition and subtraction functions, then call evaluate() to use
// them. But, once again, we can get both functions, call evaluate(), and
// then read() the result -- four RPCs -- in the time of *one* network
// round trip, because of promise pipelining.
std::cout << "Using add and subtract... ";
std::cout.flush();
Calculator::Function::Client add = nullptr;
Calculator::Function::Client subtract = nullptr;
{
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::ADD);
add = request.send().getFunc();
}
{
// Get the "subtract" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::SUBTRACT);
subtract = request.send().getFunc();
}
// Build the request to evaluate 123 + 45 - 67.
auto request = calculator.evaluateRequest();
auto subtractCall = request.getExpression().initCall();
subtractCall.setFunction(subtract);
auto subtractParams = subtractCall.initParams(2);
subtractParams[1].setLiteral(67);
auto addCall = subtractParams[0].initCall();
addCall.setFunction(add);
auto addParams = addCall.initParams(2);
addParams[0].setLiteral(123);
addParams[1].setLiteral(45);
// Send the evaluate() request, read() the result, and wait for read() to
// finish.
auto evalPromise = request.send();
auto readPromise = evalPromise.getValue().readRequest().send();
auto response = readPromise.wait(waitScope);
KJ_ASSERT(response.getValue() == 101);
std::cout << "PASS" << std::endl;
}
{
// Make a request to evaluate 4 * 6, then use the result in two more
// requests that add 3 and 5.
//
// Since evaluate() returns its result wrapped in a `Value`, we can pass
// that `Value` back to the server in subsequent requests before the first
// `evaluate()` has actually returned. Thus, this example again does only
// one network round trip.
std::cout << "Pipelining eval() calls... ";
std::cout.flush();
Calculator::Function::Client add = nullptr;
Calculator::Function::Client multiply = nullptr;
{
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::ADD);
add = request.send().getFunc();
}
{
// Get the "multiply" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::MULTIPLY);
multiply = request.send().getFunc();
}
// Build the request to evaluate 4 * 6
auto request = calculator.evaluateRequest();
auto multiplyCall = request.getExpression().initCall();
multiplyCall.setFunction(multiply);
auto multiplyParams = multiplyCall.initParams(2);
multiplyParams[0].setLiteral(4);
multiplyParams[1].setLiteral(6);
auto multiplyResult = request.send().getValue();
// Use the result in two calls that add 3 and add 5.
auto add3Request = calculator.evaluateRequest();
auto add3Call = add3Request.getExpression().initCall();
add3Call.setFunction(add);
auto add3Params = add3Call.initParams(2);
add3Params[0].setPreviousResult(multiplyResult);
add3Params[1].setLiteral(3);
auto add3Promise = add3Request.send().getValue().readRequest().send();
auto add5Request = calculator.evaluateRequest();
auto add5Call = add5Request.getExpression().initCall();
add5Call.setFunction(add);
auto add5Params = add5Call.initParams(2);
add5Params[0].setPreviousResult(multiplyResult);
add5Params[1].setLiteral(5);
auto add5Promise = add5Request.send().getValue().readRequest().send();
// Now wait for the results.
KJ_ASSERT(add3Promise.wait(waitScope).getValue() == 27);
KJ_ASSERT(add5Promise.wait(waitScope).getValue() == 29);
std::cout << "PASS" << std::endl;
}
{
// Our calculator interface supports defining functions. Here we use it
// to define two functions and then make calls to them as follows:
//
// f(x, y) = x * 100 + y
// g(x) = f(x, x + 1) * 2;
// f(12, 34)
// g(21)
//
// Once again, the whole thing takes only one network round trip.
std::cout << "Defining functions... ";
std::cout.flush();
Calculator::Function::Client add = nullptr;
Calculator::Function::Client multiply = nullptr;
Calculator::Function::Client f = nullptr;
Calculator::Function::Client g = nullptr;
{
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::ADD);
add = request.send().getFunc();
}
{
// Get the "multiply" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::MULTIPLY);
multiply = request.send().getFunc();
}
{
// Define f.
auto request = calculator.defFunctionRequest();
request.setParamCount(2);
{
// Build the function body.
auto addCall = request.getBody().initCall();
addCall.setFunction(add);
auto addParams = addCall.initParams(2);
addParams[1].setParameter(1); // y
auto multiplyCall = addParams[0].initCall();
multiplyCall.setFunction(multiply);
auto multiplyParams = multiplyCall.initParams(2);
multiplyParams[0].setParameter(0); // x
multiplyParams[1].setLiteral(100);
}
f = request.send().getFunc();
}
{
// Define g.
auto request = calculator.defFunctionRequest();
request.setParamCount(1);
{
// Build the function body.
auto multiplyCall = request.getBody().initCall();
multiplyCall.setFunction(multiply);
auto multiplyParams = multiplyCall.initParams(2);
multiplyParams[1].setLiteral(2);
auto fCall = multiplyParams[0].initCall();
fCall.setFunction(f);
auto fParams = fCall.initParams(2);
fParams[0].setParameter(0);
auto addCall = fParams[1].initCall();
addCall.setFunction(add);
auto addParams = addCall.initParams(2);
addParams[0].setParameter(0);
addParams[1].setLiteral(1);
}
g = request.send().getFunc();
}
// OK, we've defined all our functions. Now create our eval requests.
// f(12, 34)
auto fEvalRequest = calculator.evaluateRequest();
auto fCall = fEvalRequest.initExpression().initCall();
fCall.setFunction(f);
auto fParams = fCall.initParams(2);
fParams[0].setLiteral(12);
fParams[1].setLiteral(34);
auto fEvalPromise = fEvalRequest.send().getValue().readRequest().send();
// g(21)
auto gEvalRequest = calculator.evaluateRequest();
auto gCall = gEvalRequest.initExpression().initCall();
gCall.setFunction(g);
gCall.initParams(1)[0].setLiteral(21);
auto gEvalPromise = gEvalRequest.send().getValue().readRequest().send();
// Wait for the results.
KJ_ASSERT(fEvalPromise.wait(waitScope).getValue() == 1234);
KJ_ASSERT(gEvalPromise.wait(waitScope).getValue() == 4244);
std::cout << "PASS" << std::endl;
}
{
// Make a request that will call back to a function defined locally.
//
// Specifically, we will compute 2^(4 + 5). However, exponent is not
// defined by the Calculator server. So, we'll implement the Function
// interface locally and pass it to the server for it to use when
// evaluating the expression.
//
// This example requires two network round trips to complete, because the
// server calls back to the client once before finishing. In this
// particular case, this could potentially be optimized by using a tail
// call on the server side -- see CallContext::tailCall(). However, to
// keep the example simpler, we haven't implemented this optimization in
// the sample server.
std::cout << "Using a callback... ";
std::cout.flush();
Calculator::Function::Client add = nullptr;
{
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::ADD);
add = request.send().getFunc();
}
// Build the eval request for 2^(4+5).
auto request = calculator.evaluateRequest();
auto powCall = request.getExpression().initCall();
powCall.setFunction(kj::heap<PowerFunction>());
auto powParams = powCall.initParams(2);
powParams[0].setLiteral(2);
auto addCall = powParams[1].initCall();
addCall.setFunction(add);
auto addParams = addCall.initParams(2);
addParams[0].setLiteral(4);
addParams[1].setLiteral(5);
// Send the request and wait.
auto response = request.send().getValue().readRequest()
.send().wait(waitScope);
KJ_ASSERT(response.getValue() == 512);
std::cout << "PASS" << std::endl;
}
return 0;
}
2、server和client编译运行(CMake)
project("Cap'n Proto Samples" CXX)
cmake_minimum_required(VERSION 3.1)
find_package(CapnProto CONFIG REQUIRED)
if(TARGET CapnProto::capnp-rpc)
capnp_generate_cpp(calculatorSources calculatorHeaders calculator.capnp)
add_executable(calculator-client calculator-client.cpp ${calculatorSources})
add_executable(calculator-server calculator-server.cpp ${calculatorSources})
target_link_libraries(calculator-client PRIVATE CapnProto::capnp-rpc)
target_link_libraries(calculator-server PRIVATE CapnProto::capnp-rpc)
target_include_directories(calculator-client PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
target_include_directories(calculator-server PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
endif()
3、测试运行
test.shcapnpc -oc++ calculator.capnp
c++ -std=c++14 -Wall calculator-client.c++ calculator.capnp.c++ \
$(pkg-config --cflags --libs capnp-rpc) -o calculator-client
c++ -std=c++14 -Wall calculator-server.c++ calculator.capnp.c++ \
$(pkg-config --cflags --libs capnp-rpc) -o calculator-server
rm -f /tmp/capnp-calculator-example-$$
./calculator-server unix:/tmp/capnp-calculator-example-$$ &
sleep 0.1
./calculator-client unix:/tmp/capnp-calculator-example-$$
kill %+
wait %+ || true
rm calculator-client calculator-server calculator.capnp.c++ calculator.capnp.h /tmp/capnp-calculator-example-$$
四、Cap’n Proto Multi-stream Flow Control
官网: Cap’n Proto 0.8 Multi-stream Flow Control
1、server和client编写
1.MyInterface.capnp
@0x9b7efa693ff3d429;
interface MyInterface {
streamingCall @0 (callback:Callback) -> (chunk:Data);
struct Callback {
literal @0 :Float64;
}
interface Data {
read @0 () -> (data:Float64);
}
}
2.server.c++
#include "MyInterface.capnp.h"
#include <kj/debug.h>
#include <capnp/ez-rpc.h>
#include <capnp/message.h>
#include <iostream>
typedef unsigned int uint;
kj::Promise<double> readData(MyInterface::Data::Client data)
{
// Helper function to asynchronously call read() on a Calculator::Data and
// return a promise for the result. (In the future, the generated code might
// include something like this automatically.)
return data.readRequest().send().then([](capnp::Response<MyInterface::Data::ReadResults> result)
{ return result.getData(); });
}
class DataImpl final : public MyInterface::Data::Server
{
// Simple implementation of the MyInterface.Data Cap'n Proto interface.
public:
DataImpl(double data) : data(data) {}
kj::Promise<void> read(ReadContext context)
{
context.getResults().setData(data);
return kj::READY_NOW;
}
private:
double data;
};
kj::Promise<double> streamingCallImpl(
MyInterface::Callback::Reader data,
capnp::List<double>::Reader params = capnp::List<double>::Reader())
{
// Implementation of MyInterfaceImpl::streamingCall(), also shared by
// FunctionImpl::call().
return data.getLiteral();
}
class MyInterfaceImpl final : public MyInterface::Server
{
// Implementation of the MyInterface Cap'n Proto interface.
public:
kj::Promise<void> streamingCall(StreamingCallContext context) override
{
return streamingCallImpl(context.getParams().getCallback())
.then([KJ_CPCAP(context)](double value) mutable
{ context.getResults().setChunk(kj::heap<DataImpl>(value)); });
}
};
int main(int argc, const char *argv[])
{
// Set up a server.
capnp::EzRpcServer server(kj::heap<MyInterfaceImpl>(), argv[1]);
// Write the port number to stdout, in case it was chosen automatically.
auto &waitScope = server.getWaitScope();
uint port = server.getPort().wait(waitScope);
if (port == 0)
{ // The address format "unix:/path/to/socket" opens a unix domain socket,
// in which case the port will be zero.
std::cout << "Listening on Unix socket..." << std::endl;
}
else
{
std::cout << "Listening on port " << port << "..." << std::endl;
}
// Run forever, accepting connections and handling requests.
kj::NEVER_DONE.wait(waitScope);
}
3.client.c++
#include "MyInterface.capnp.h"
#include <capnp/ez-rpc.h>
#include <kj/debug.h>
#include <iostream>
#include <unistd.h>
class MyInterfaceImpl final : public MyInterface::Server
{
// An implementation of the Function interface wrapping pow(). Note that
// we're implementing this on the client side and will pass a reference to
// the server. The server will then be able to make calls back to the client.
public:
kj::Promise<void> streamingCall(StreamingCallContext context)
{
std::cerr << "client sendChunk" << std::endl;
return kj::READY_NOW;
}
};
int main(int argc, const char *argv[])
{
if (argc != 2)
{
std::cerr << "usage: " << argv[0] << " HOST:PORT\n"
"Connects to the MyInterface server at the given address and "
"does some RPCs."
<< std::endl;
return 1;
}
capnp::EzRpcClient client(argv[1]);
MyInterface::Client myinterface = client.getMain<MyInterface>();
// Keep an eye on `waitScope`. Whenever you see it used is a place where we
// stop and wait for the server to respond. If a line of code does not use
// `waitScope`, then it does not block!
auto &waitScope = client.getWaitScope();
{
// Make a request that just evaluates the literal value 123.
std::cout << "Evaluating a literal... ";
std::cout.flush();
// //Set up request
auto request = myinterface.streamingCallRequest();
request.getCallback().setLiteral(123);
// Send it, which returns a promise for the result (without blocking
auto evalPromsie = request.send();
// Using the promise, create a pipelined request to call read() on the
// returned object, and then send that.
auto readPromise = evalPromsie.getChunk().readRequest().send();
// Now that we've sent all the requests, wait for the response. Until this
// point, we haven't waited at all!
auto response = readPromise.wait(waitScope);
KJ_ASSERT(response.getData() == 123);
std::cout << "PASS" << std::endl;
}
return 0;
}
2、server和client编译运行(CMake)
4.CMakeLists.txt
project("Multi-stream Flow Control Samples" CXX)
cmake_minimum_required(VERSION 3.1)
find_package(CapnProto CONFIG REQUIRED)
if(TARGET CapnProto::capnp-rpc)
capnp_generate_cpp(MyInterfaceSources MyInterfaceHeaders MyInterface.capnp)
add_executable(client client.c++ ${MyInterfaceSources})
add_executable(server server.c++ ${MyInterfaceSources})
target_link_libraries(client PRIVATE CapnProto::capnp-rpc)
target_link_libraries(server PRIVATE CapnProto::capnp-rpc)
target_include_directories(client PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
target_include_directories(server PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
endif()
3、测试运行
5.test.sh
capnpc -oc++ MyInterface.capnp
c++ -std=c++14 -Wall client.c++ MyInterface.capnp.c++ \
$(pkg-config --cflags --libs capnp-rpc) -o client
c++ -std=c++14 -Wall server.c++ MyInterface.capnp.c++ \
$(pkg-config --cflags --libs capnp-rpc) -o server
rm -f /tmp/capnp-MyInterface-example-$$
./server unix:/tmp/capnp-MyInterface-example-$$ &
sleep 0.1
./client unix:/tmp/capnp-MyInterface-example-$$
kill %+
wait %+ || true
rm client server MyInterface.capnp.c++ MyInterface.capnp.h /tmp/capnp-MyInterface-example-$$