brynet源码阅读——net组件

发布于:2024-12-07 ⋅ 阅读:(138) ⋅ 点赞:(0)

1、EventLoop.hpp

我理解这是一个通信socket监听和任务调度类,下面先介绍自己对成员变量的理解,直接见注释。

// epoll_wait监听返回的数组参数
std::vector<epoll_event> mEventEntries;
// 红黑树句柄
int mEpollFd;
// epoll_wait唤醒工具,一般都会设计这么一个工具用来从eprll_wait唤醒,这个工具仅仅是唤醒的作用,发送的数据并无实际作用
std::unique_ptr<detail::WakeupChannel> mWakeupChannel;
// 这两个参数的作用是标记loop线程的状态,看它们处于什么状态,是不是可以被wakeup
// 个人认为这里的意义不是特别大,待验证
std::atomic_bool mIsInBlock;
std::atomic_bool mIsAlreadyPostWakeup;
// 异步函数相关
std::mutex mAsyncFunctorsMutex;
std::vector<UserFunctor> mAsyncFunctors;
std::vector<UserFunctor> mCopyAsyncFunctors;
// 每次跳出epoll_wait可能会执行的函数
std::vector<UserFunctor> mAfterLoopFunctors;
std::vector<UserFunctor> mCopyAfterLoopFunctors;
// 初始化相关
std::once_flag mOnceInitThreadID;
std::atomic_bool mSelfThreadIDIsInitialized;
current_thread::THREAD_ID_TYPE mSelfThreadID;
// 定时器,主要作用是添加定时任务以及从epoll_wait那准时跳出
brynet::base::TimerMgr::Ptr mTimer;
// socket和会话的映射,会话中包括很多callback函数,有通用的有定制的
std::unordered_map<BrynetSocketFD, TcpConnectionPtr> mTcpConnections;

下面是这个类中最重要的函数,个人理解以注释的形式加在注释中,代码如下:

 void loop(int64_t milliseconds)
 {
     tryInitThreadID();
     // 这个函数只有在loop线程中执行
     exceptInLoopThread();
     // 有待执行的函数时,不再epoll_wait那阻塞
     if (!mAfterLoopFunctors.empty())
     {
         milliseconds = 0;
     }
     // 等待通信套接字触发或者被唤醒
     int numComplete = epoll_wait(mEpollFd, mEventEntries.data(), mEventEntries.size(), milliseconds);

     mIsInBlock = false;

     for (int i = 0; i < numComplete; ++i)
     {
         auto channel = (Channel*) (mEventEntries[i].data.ptr);
         auto event_data = mEventEntries[i].events;
         // 这里执行各个通信套接字的函数,库中TcpConnection中的函数canRecv、onClose、canSend在这里执行
         if (event_data & EPOLLRDHUP)
         {
             channel->canRecv(true);
             channel->onClose();
             continue;
         }

         if (event_data & EPOLLIN)
         {
             channel->canRecv(false);
         }

         if (event_data & EPOLLOUT)
         {
             channel->canSend();
         }
     }
     // 个人认为这里的作用不大,我理解作者的本意是处理完通信套接字的callback后,才可以继续被唤醒,但是只有当流程继续执行到epoll_wait后,才会继续执行后面的代码,不会造成从callback函数没有执行完就从新被唤醒的情况
     mIsAlreadyPostWakeup = false;
     mIsInBlock = true;
     // 执行异步函数
     processAsyncFunctors();
     processAfterLoopFunctors();

     if (static_cast<size_t>(numComplete) == mEventEntries.size())
     {
         reAllocEventSize(mEventEntries.size() + 128);
     }
     // 执行定时器事件
     mTimer->schedule();
 }

2、TcpConnection.hpp

该文件中定义了会话类,一般的会话类都会既提供服务端会用到的功能函数,也会提供客户端用到的功能函数。下面先记录这个类中用到的几种callback函数。

// 建立连接成功执行的callback
using EnterCallback = std::function<void(Ptr)>;
// 接收到数据后执行的callback函数
using DataCallback = std::function<void(brynet::base::BasePacketReader&)>;
// 断开连接执行的callback函数
using DisconnectedCallback = std::function<void(Ptr)>;
// 数据发送后执行的callback函数
using PacketSendedCallback = std::function<void(void)>;
// 负载过大时执行的callback函数
using HighWaterCallback = std::function<void(void)>;

下面介绍各个成员变量的含义:

// 对端IP
const std::string mIP;
// 通信socket
const TcpSocket::Ptr mSocket;
// socket所被监听的EventLoop
const EventLoop::Ptr mEventLoop;
// 是否可写
bool mCanWrite;
// 会话是不是被关闭
bool mAlreadyClose;
// 接收数据相关
std::unique_ptr<struct brynet::base::buffer_s, BufferDeleter> mRecvBuffer;
double mCurrentTanhXDiff = 0;
size_t mRecvBuffOriginSize = 0;
const size_t mMaxRecvBufferSize;

// 发送数据相关
struct PendingPacket
{
    SendableMsg::Ptr data;
    size_t left;
    PacketSendedCallback mCompleteCallback;
};

std::vector<PacketSendedCallback> pedingCallbacks;
using PacketListType = std::deque<PendingPacket>;
PacketListType mSendList;
size_t mSendingMsgSize;
// callback
EnterCallback mEnterCallback;
DataCallback mDataCallback;
DisconnectedCallback mDisConnectCallback;
HighWaterCallback mHighWaterCallback;
size_t mHighWaterSize;

bool mIsPostFlush;

bool mRecvData;
// 心跳相关
std::chrono::nanoseconds mCheckTime{};
brynet::base::Timer::WeakPtr mTimer;

比较关键的函数介绍:

// 这个函数会将socket放到红黑树中,并且将数据结构拼装好
 bool onEnterEventLoop()
 {
     assert(mEventLoop->isInLoopThread());
     if (!mEventLoop->isInLoopThread())
     {
         throw std::runtime_error("not in io thread call onEnterEventLoop");
     }
     // linkChannel是放到红黑树
     if (!brynet::net::base::SocketNonblock(mSocket->getFD()) ||
         !mEventLoop->linkChannel(mSocket->getFD(), this))
     {
         return false;
     }

     const auto findRet = mEventLoop->getTcpConnection(mSocket->getFD());
     (void) findRet;
     assert(findRet == nullptr);

     if (!checkRead())
     {
         return false;
     }
     // 数据结构拼装好
     mEventLoop->addTcpConnection(mSocket->getFD(), shared_from_this());
     // 调用建立连接成功的函数
     causeEnterCallback();

     return true;
 }
// 发送数据
void sendInLoop(const SendableMsg::Ptr& msg,
                PacketSendedCallback&& callback = nullptr)
{
    if (mAlreadyClose)
    {
        return;
    }

    const auto len = msg->size();
    mSendingMsgSize += len;
    mSendList.emplace_back(PendingPacket{
            msg,
            len,
            std::move(callback)});
    // 上面是将要发送的数据放到对应的数据结构
    // 下面是真正的发送的动作
    runAfterFlush();
    // 判断是不是要执行负载相关的函数
    if (mSendingMsgSize > mHighWaterSize &&
        mHighWaterCallback != nullptr)
    {
        mHighWaterCallback();
    }
}
// 处理接收到的数据,这里的callback是自定义的,有些会在callback中定义返回的数据并返回
void processRecvMessage()
{
    if (mDataCallback != nullptr && buffer_getreadvalidcount(mRecvBuffer.get()) > 0)
    {
        auto reader = brynet::base::BasePacketReader(buffer_getreadptr(mRecvBuffer.get()),
                                                     buffer_getreadvalidcount(mRecvBuffer.get()), false);
        mDataCallback(reader);
        const auto consumedLen = reader.savedPos();
        assert(consumedLen <= reader.size());
        if (consumedLen <= reader.size())
        {
            buffer_addreadpos(mRecvBuffer.get(), consumedLen);
        }
    }
}

3、ConnectionOption.hpp

这个类中主要定义了一些会话相关的参数,比如一些callback、是不是使用ssl、以及接收数据的缓冲区等

4、TCPServiceDetail.hpp

这个文件中的函数HelperAddTcpConnection的主要作用是将socket和它关联的ConnectionOption设置建立会话对象,并将它放在EventLoop中去监听。下面对类中的主要函数进行介绍,理解直接加入到注释中。

// 开启worker_thread,这里可以指定线程的个数
std::vector<brynet::net::EventLoop::Ptr> startWorkerThread(size_t threadNum,
                                                               FrameCallback callback = nullptr)
{
   if (threadNum == 0)
   {
       throw std::runtime_error("thread num is zero");
   }

   std::vector<brynet::net::EventLoop::Ptr> eventLoops;

   std::lock_guard<std::mutex> lck(mServiceGuard);
   std::lock_guard<std::mutex> lock(mIOLoopGuard);

   if (!mIOLoopDatas.empty())
   {
       throw std::runtime_error("worker thread already started");
   }

   mRunIOLoop = std::make_shared<bool>(true);
   // 这里只是初始化数据结构,真正的赋值是在for循环中
   mIOLoopDatas.resize(threadNum);
   // 这里是自己定义的信号量,作用是保住所有的线程都跑起来后再退出函数
   auto wg = brynet::base::WaitGroup::Create();
   for (auto& v : mIOLoopDatas)
   {
       auto eventLoop = std::make_shared<EventLoop>();
       eventLoops.push_back(eventLoop);

       auto runIoLoop = mRunIOLoop;
       wg->add(1);
       v = IOLoopData::Create(eventLoop,
                              std::make_shared<std::thread>(
                                      [wg, callback, runIoLoop, eventLoop]() {
                                          eventLoop->bindCurrentThread();
                                          wg->done();
                                          // 这里的退出由整个服务负责,这里面的线程都数据这个服务
                                          while (*runIoLoop)
                                          {
                                              // 启动线程的等待监听函数
                                              eventLoop->loopCompareNearTimer(sDefaultLoopTimeOutMS);
                                              // 每loop一次,执行哪些一遍callback
                                              if (callback != nullptr)
                                              {
                                                  callback(eventLoop);
                                              }
                                          }
                                      }));
   }
   // 保住所有的线程都跑起来后再退出函数
   wg->wait();

   return eventLoops;
}

void stopWorkerThread()
{
   std::lock_guard<std::mutex> lck(mServiceGuard);
   std::lock_guard<std::mutex> lock(mIOLoopGuard);

   *mRunIOLoop = false;

   for (const auto& v : mIOLoopDatas)
   {
       // 这里唤醒的目的是执行完每个线程中哪些异步函数、定时器事件等,epoll_wait那
       v->getEventLoop()->wakeup();
       try
       {
           if (v->getIOThread()->joinable())
           {
               v->getIOThread()->join();
           }
       }
       catch (std::system_error& e)
       {
           (void) e;
       }
   }
   mIOLoopDatas.clear();
}
// 添加会话函数,但是可以通过参数指定添加到当前线程中
void addTcpConnection(TcpSocket::Ptr socket, ConnectionOption option)
{
   EventLoop::Ptr eventLoop;
   if (option.forceSameThreadLoop)
   {
       eventLoop = getSameThreadEventLoop();
   }
   else
   {
       eventLoop = getRandomEventLoop();
   }
   return HelperAddTcpConnection(eventLoop, std::move(socket), std::move(option));
}

5、ConnectorWorkInfo.hpp和ConnectorDetail.hpp

ConnectorWorkInfo.hpp文件中定义了ConnectOption、ConnectorWorkInfo类和RunOnceCheckConnect函数。ConnectorDetail.hpp文件中定义了AsyncConnectorDetail类,下面分别进行介绍

5.1 ConnectOption

这个类中主要是一些参数,这些参数是用做客户端建立会话的一些参数,主要包括服务端ip、端口、请求超时时间、各种callback

5.2 ConnectorWorkInfo

这个类主要的目的是处理客户端向服务器发送连接请求流程,可以设定连接成功处理哪些函数,失败处理哪些函数,其中mConnectionInfos主要用于存储在连接过程中的连接,checkConnectStatus函数可以定期的检查其中的连接是否连接成功,并且可以实现清楚的目的。关键函数理解如下:

void processConnect(const AsyncConnectAddr& addr)
{
    // 个人理解,放在第一if后面好一些
    struct sockaddr_in server_addr = sockaddr_in(); 
    BrynetSocketFD clientfd = BRYNET_INVALID_SOCKET;
    
    const int ExpectedError = EINPROGRESS;
    int n = 0;

    brynet::net::base::InitSocket();

    clientfd = brynet::net::base::SocketCreate(AF_INET, SOCK_STREAM, 0);
    if (clientfd == BRYNET_INVALID_SOCKET)
    {
        goto FAILED;
    }
    // 初始化相关资源
    brynet::net::base::SocketNonblock(clientfd);
    server_addr.sin_family = AF_INET;
    inet_pton(AF_INET, addr.getIP().c_str(), &server_addr.sin_addr.s_addr);
    server_addr.sin_port = static_cast<decltype(server_addr.sin_port)>(htons(addr.getPort()));
    // 连接
    n = connect(clientfd, (struct sockaddr*) &server_addr, sizeof(struct sockaddr));
    if (n == 0)
    {
        // 大部分走这里,连接成功,然后判断是不是自连接
        if (brynet::net::base::IsSelfConnect(clientfd))
        {
            goto FAILED;
        }
    }
    else if (BRYNET_ERRNO != ExpectedError)
    {
        goto FAILED;
    }
    else
    {
        // 连接没有成功,处于正在连接的状态,把套接字放到mConnectingInfos,等待连接成功
        ConnectingInfo ci;
        ci.startConnectTime = std::chrono::steady_clock::now();
        ci.successCB = addr.getSuccessCB();
        ci.failedCB = addr.getFailedCB();
        ci.timeout = addr.getTimeout();
        ci.processCallbacks = addr.getProcessCallbacks();

        mConnectingInfos[clientfd] = ci;
        poller_add(mPoller.get(), clientfd, brynet::base::WriteCheck);

        return;
    }
    // 连接成功走到这,处理callback
    if (addr.getSuccessCB() != nullptr)
    {
        auto tcpSocket = TcpSocket::Create(clientfd, false);
        for (const auto& process : addr.getProcessCallbacks())
        {
            process(*tcpSocket);
        }
        addr.getSuccessCB()(std::move(tcpSocket));
    }
    return;

FAILED:
    if (clientfd != BRYNET_INVALID_SOCKET)
    {
        brynet::net::base::SocketClose(clientfd);
        clientfd = BRYNET_INVALID_SOCKET;
        (void) clientfd;
    }
    if (addr.getFailedCB() != nullptr)
    {
        addr.getFailedCB()();
    }
}

5.3 RunOnceCheckConnect

static void RunOnceCheckConnect(
        const std::shared_ptr<brynet::net::EventLoop>& eventLoop,
        const std::shared_ptr<ConnectorWorkInfo>& workerInfo)
{
    // 任务处理
    eventLoop->loop(std::chrono::milliseconds(10).count());
    // 处理连接问题
    workerInfo->checkConnectStatus(0);
    // 处理正在连接的连接超时问题
    workerInfo->checkTimeout();
}

5.4 AsyncConnectorDetail

该类主要作用是可以添加客户端的连接,我的理解也可以用做一个客户端的封装类,主要函数注释如下:

// 启动客户端线程
 void startWorkerThread()
 {
      std::lock_guard<std::mutex> lck(mThreadGuard);

      if (mThread != nullptr)
      {
          throw std::runtime_error("connect thread already started");
      }

      mIsRun = std::make_shared<bool>(true);
      mWorkInfo = std::make_shared<detail::ConnectorWorkInfo>();
      mEventLoop = std::make_shared<EventLoop>();

      auto eventLoop = mEventLoop;
      auto workerInfo = mWorkInfo;
      auto isRun = mIsRun;

      auto wg = brynet::base::WaitGroup::Create();
      wg->add(1);
      mThread = std::make_shared<std::thread>([wg, eventLoop, workerInfo, isRun]() {
          eventLoop->bindCurrentThread();
          wg->done();

          while (*isRun)
          {
              // 当客户端线程没有停的时候一直执行,loop动作、建立连接相关的功能函数
              detail::RunOnceCheckConnect(eventLoop, workerInfo);
          }

          workerInfo->causeAllFailed();
      });
      // 这里作用和前面介绍的一样,保证线程跑起来后退出啊函数
      wg->wait();
  }

6、ListenThreadDeatil.hpp

该文件中封装了一个监听线程,关键代码如下:

class ListenThreadDetail : public brynet::base::NonCopyable
{
protected:
    using AccepCallback = std::function<void(TcpSocket::Ptr)>;
    using TcpSocketProcessCallback = std::function<void(TcpSocket&)>;
 void startListen()
    {
        std::lock_guard<std::mutex> lck(mListenThreadGuard);

        if (mListenThread != nullptr)
        {
            throw std::runtime_error("listen thread already started");
        }

        const auto fd = brynet::net::base::Listen(mIsIPV6, mIP.c_str(), mPort, 512, mEnabledReusePort);
        if (fd == BRYNET_INVALID_SOCKET)
        {
            throw BrynetCommonException(
                    std::string("listen error of:") + std::to_string(BRYNET_ERRNO));
        }

        mRunListen = std::make_shared<bool>(true);

        auto listenSocket = std::shared_ptr<ListenSocket>(ListenSocket::Create(fd));
        auto isRunListen = mRunListen;
        auto callback = mCallback;
        auto processCallbacks = mProcessCallbacks;
        mListenThread = std::make_shared<std::thread>(
                [isRunListen, listenSocket, callback, processCallbacks]() mutable {
                    while (*isRunListen)
                    { 
                        // 这里是阻塞的,直到有请求或者返回的是nullptr
                        auto clientSocket = runOnceListen(listenSocket);
                        if (clientSocket == nullptr)
                        {
                            continue;
                        }

                        if (*isRunListen)
                        {
                            for (const auto& process : processCallbacks)
                            {
                                process(*clientSocket);
                            }
                            // 这里每次建立连接成功,都把套接字方法监听树中,这样就可以监听客户端发来的数据了
                            callback(std::move(clientSocket));
                        }
                    }
                });
    }
private:
    const bool mIsIPV6;
    // 监听的ip和端口
    const std::string mIP;
    const int mPort;
    // 接收到连接请求执行的,我理解主要是将用于通信的socket添加到红黑树中
    const AccepCallback mCallback;
    // 连接成功执行的callback
    const std::vector<TcpSocketProcessCallback> mProcessCallbacks;
    const bool mEnabledReusePort;

    std::shared_ptr<bool> mRunListen;
    std::shared_ptr<std::thread> mListenThread;
    std::mutex mListenThreadGuard;
};

网站公告

今日签到

点亮在社区的每一天
去签到