基于libhv开源库实现的TCP Client & Server sample, TCP Client支持同步和异步数据传输, 同步使用C++11的future特性实现.
libhv介绍
Like libevent, libev, and libuv, libhv provides event-loop with non-blocking IO and timer, but simpler api and richer protocols.
✨ Features
- Cross-platform (Linux, Windows, macOS, Android, iOS, BSD, Solaris)
- High-performance EventLoop (IO, timer, idle, custom, signal)
- TCP/UDP client/server/proxy
- TCP supports heartbeat, reconnect, upstream, MultiThread-safe write and close, etc.
- Built-in common unpacking modes (FixedLength, Delimiter, LengthField)
- RUDP support: WITH_KCP
- SSL/TLS support: (via WITH_OPENSSL or WITH_GNUTLS or WITH_MBEDTLS)
- HTTP client/server (support https http1/x http2 grpc)
- HTTP supports static service, indexof service, forward/reverse proxy service, sync/async API handler
- HTTP supports RESTful, router, middleware, keep-alive, chunked, SSE, etc.
- WebSocket client/server
- MQTT client
异步数据传输实现机制
RpcMessage HTcpClient::SendSyncMessage(RpcMessage& rpc_msg,
const int64_t timeout) {
if (connect_state_ != kConnected) {
LOG(ERROR) << "HTcpClient::SendMessage() - not connected!";
return RpcMessage();
}
std::shared_ptr<MessageFuture> message_future =
std::make_shared<MessageFuture>();
message_future->timeout_ = timeout;
message_future->request_ = rpc_msg;
rpc_msg.request_id_ = counter_.GetAndIncrement();
{
std::lock_guard<std::mutex> lock(mutex_);
map_.insert(std::make_pair(rpc_msg.request_id_, message_future));
}
SendMessage(rpc_msg);
try {
return message_future->Get(timeout);
} catch (TimeoutException& e) {
LOG(ERROR) << "HTcpClient::SendSyncMessage() - timeout exception, timeout: "
<< timeout;
// remove the request from the map
std::lock_guard<std::mutex> lock(mutex_);
size_t erased_count = map_.erase(rpc_msg.request_id_);
LOG(INFO) << "HTcpClient::SendSyncMessage() - erased_count: "
<< erased_count;
return RpcMessage();
}
}
HTcpClient 类的成员函数 SendSyncMessage,用于同步发送 RPC 消息并等待响应。
首先,函数检查客户端的连接状态 connect_state_ 是否为已连接状态。如果未连接,则记录错误日志并返回一个空的 RpcMessage 对象。
然后,创建一个 MessageFuture 对象,并设置其超时时间和请求消息。接着,使用 counter_.GetAndIncrement() 为请求消息生成一个唯一的请求 ID。
在一个受互斥锁 mutex_ 保护的代码块中,将请求 ID 和 MessageFuture 对象插入到映射 map_ 中。
然后,调用 SendMessage 方法发送请求消息。
在尝试获取响应时,调用 message_future->Get(timeout) 方法。如果在指定的超时时间内未收到响应,则捕获 TimeoutException 并记录错误日志。然后,再次锁住互斥量并从映射中移除对应的请求 ID,记录被移除的条目数量。
最终,函数返回一个空的 RpcMessage 对象,表示请求失败或超时。
MessageFuture Get实现
RpcMessage MessageFuture::Get(int64_t timeout) {
std::future<RpcMessage> future = promise_.get_future();
std::future_status status =
future.wait_for(std::chrono::milliseconds(timeout));
if (status == std::future_status::ready) {
return future.get();
} else if (status == std::future_status::timeout) {
// throw timeout exception
throw TimeoutException("timeout exception");
}
}
MessageFuture 类的成员函数 Get,用于在指定的超时时间内获取 RpcMessage 对象。
首先,函数通过 promise_.get_future() 获取一个 std::future 对象 future。std::promise 和 std::future 是 C++ 标准库中的同步机制,用于在线程间传递结果。promise_ 是一个 std::promise 对象,它的 get_future 方法返回一个与之关联的 std::future 对象。
接下来,函数调用 future.wait_for(std::chrono::milliseconds(timeout)),等待指定的超时时间(以毫秒为单位)。wait_for 方法返回一个 std::future_status 枚举值,表示 future 的状态。可能的状态包括 std::future_status::ready(表示结果已准备好)和 std::future_status::timeout(表示等待超时)。
如果 status 等于 std::future_status::ready,则调用 future.get() 获取 RpcMessage 对象并返回。get 方法会阻塞当前线程,直到结果可用,并返回存储在 future 中的值。
如果 status 等于 std::future_status::timeout,则抛出一个 TimeoutException 异常,表示等待超时。异常消息为 “timeout exception”。
通过这种方式,Get 函数能够在指定的超时时间内等待并获取 RpcMessage 对象,如果超时则抛出异常。
使用事项
static void onConnection(const TcpClient::TSocketChannelPtr& channel) {
if (channel->isConnected()) {
LOG(INFO) << "connected";
RpcMessage msg;
msg.command_ = 100;
msg.request_id_ = counter.GetAndIncrement();
msg.payload_ = "hello";
msg.length_ = msg.payload_.size();
// client.SendMessage(msg);
std::thread([channel, &msg] {
LOG(INFO) << "SendSyncMessage Start";
RpcMessage ret = client.SendSyncMessage(msg, 10000);
LOG(INFO) << "SendSyncMessage END" << " request_id: " << ret.request_id_
<< " payload: " << ret.payload_;
}).detach();
} else {
LOG(INFO) << "disconnected";
}
}
同步传输
为什么放在另外的线程中处理?
TCPClient自身会有EventLoopThread, 是在此线程中运行, 而SendAsyncMessage会阻塞此线程, 因此具体数据的处理都放在其它线程中处理; 类似于Linux驱动开发中的中断处理, 分为上半部和下半部.
Reference
基于libhv实现的TCP client和TCP server,支持同步和异步, 帮我点点Star