Clickhouse-keeper源码分析:TCP连接处理流程

发布于:2025-06-29 ⋅ 阅读:(17) ⋅ 点赞:(0)

Keeper网络处理模型

TCPServer的创建和初始化

在keeper启动时,会根据配置文件中的listen_host标签个数创建对应的TCPServer以及HTTPServer等:

std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");

1.初始化/创建TCPServer以及HTTPServer(这里省略别的server的创建逻辑):

    for (const auto & listen_host : listen_hosts)

    {

        /// TCP Keeper

        const char * port_name = "keeper_server.tcp_port";

        createServer(listen_host, port_name, listen_try, [&](UInt16 port)

        {

            Poco::Net::ServerSocket socket;

            auto address = socketBindListen(socket, listen_host, port);

            socket.setReceiveTimeout(Poco::Timespan{tcp_receive_timeout, 0});

            socket.setSendTimeout(Poco::Timespan{tcp_send_timeout, 0});

            servers->emplace_back(

                listen_host,

                port_name,

                "Keeper (tcp): " + address.toString(),

                std::make_unique<TCPServer>(

                    new KeeperTCPHandlerFactory(

                        config_getter, global_context->getKeeperDispatcher(),

                        tcp_receive_timeout, tcp_send_timeout, false), server_pool, socket));

        });

        ......

    }

初始TCP Keeper server的时候有这么几个重要的类:

(1)TCPServer:主要作用是监听网络连接(内部会使用epoll / poll / select)

(2)KeeperTCPHandlerFactory:用来处理连接

(3)socket:服务端监听的端口号

2.启动阶段:初始化线程池,之后这个线程池也是TCPServer中的线程池

Poco::ThreadPool server_pool(3, config().getUInt("max_connections", 1024));

之后挨个启动创建好的网络服务器:

    for (auto & server : *servers)

    {

        server.start();

        LOG_INFO(log, "Listening for {}", server.getDescription());

    }

这里主要说明下TCPServer的启动流程。

其中每个server的类型为:ProtocolServerAdapter,它就是一个适配器,对于不同的server会有不同的实现,TCPServer的实现为以下类:

可以看到最终调的还是TCPServer的start函数。

Clickhouse中的TCPServer继承Poco库中的TCPServer:

class TCPServer : public Poco::Net::TCPServer

{

public:

    explicit TCPServer(

        TCPServerConnectionFactory::Ptr factory,

        Poco::ThreadPool & thread_pool,

        Poco::Net::ServerSocket & socket,

        Poco::Net::TCPServerParams::Ptr params = new Poco::Net::TCPServerParams,

        const TCPServerConnectionFilter::Ptr & filter = nullptr);

    /// Close the socket and ask existing connections to stop serving queries

    void stop()

    {

        ......

    }

    bool isOpen() const { return is_open; }

    UInt16 portNumber() const { return port_number; }

    const Poco::Net::ServerSocket& getSocket() { return socket; }

private:

    TCPServerConnectionFactory::Ptr factory;

    Poco::Net::ServerSocket socket;

    std::atomic<bool> is_open;

    UInt16 port_number;

};

所以最终走的还是Poco库中的TCPServer的start接口。

TCPServer的启动流程

这里,我们将目光聚焦于Poco库中的TCPServer的start接口。

void TCPServer::start()

{

    poco_assert (_stopped);

    _stopped = false;

    _thread.start(*this);

}

_thread.start(*this)的实现,请记住这个target就是TCPServer,因为之后会调到它的run接口

void Thread::start(Runnable& target)

{

    startImpl(new RunnableHolder(target));

}

startImpl(new RunnableHolder(target))的实现,Foundation\src\Thread_POSIX.cpp下的

void ThreadImpl::startImpl(SharedPtr<Runnable> pTarget)

{

    ......

    {

        FastMutex::ScopedLock l(_pData->mutex);

        _pData->pRunnableTarget = pTarget;

        int errorCode;

        if ((errorCode = pthread_create(&_pData->thread, &attributes, runnableEntry, this)))

        {

            _pData->pRunnableTarget = 0;

            pthread_attr_destroy(&attributes);

            throw SystemException(Poco::format("cannot start thread (%s)",

                Error::getMessage(errorCode)));

        }

    }

    ......

}

可以看到最终使用pthread_create创建一个线程执行runnableEntry,runnableEntry是一个函数,会做以下事情,也就是调用Poco::Net::TCPServer的run接口

void* ThreadImpl::runnableEntry(void* pThread)

{

    ......   

    try

    {

        pData->pRunnableTarget->run();

    }

   ......

}

pData->pRunnableTarget在startImpl设置的:

所以总结一下:start接口就是创建一个线程去执行run接口,后续的其他类的start接口与之逻辑类似。

此时理一下TCPServer::run() [Net\src\TCPServer.cpp] 做了什么。

可以看到主要就是监听连接,如果连接被接受,就执行_pDispatcher->enqueue(ss)。

我们在创建TCPServer的时候,没有指定filter,所以_pConnectionFilter为nullptr。

先看一下:SocketImpl::poll((const Poco::Timespan& timeout, int mode)

根据POCO_HAVE_FD_EPOLL,POCO_HAVE_FD_POLL的设置采用不同的监听手段:

(1)epoll

Net\include\Poco\Net\Net.h

#if (POCO_OS == POCO_OS_LINUX) || (POCO_OS == POCO_OS_WINDOWS_NT) || (POCO_OS == POCO_OS_ANDROID)

    #define POCO_HAVE_FD_EPOLL 1

#endif

(2)poll

Net\include\Poco\Net\Net.h

#if defined(POCO_OS_FAMILY_BSD)

    #ifndef POCO_HAVE_FD_POLL

        #define POCO_HAVE_FD_POLL 1

    #endif

#endif

(3)select


监听到连接后,接受连接:

SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr)

{

    if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();

    sockaddr_storage buffer;

    struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&buffer);

    poco_socklen_t saLen = sizeof(buffer);

    poco_socket_t sd;

    do

    {

        sd = ::accept(_sockfd, pSA, &saLen);

    }

    while (sd == POCO_INVALID_SOCKET && lastError() == POCO_EINTR);

    if (sd != POCO_INVALID_SOCKET)

    {

        clientAddr = SocketAddress(pSA, saLen);

        return new StreamSocketImpl(sd);

    }

    error(); // will throw

    return nullptr;

}


最后便是处理连接,

处理连接主要在TCPServerDispatcher::enqueue(const StreamSocket& socket)中进行

简短调用栈为:

hreadPool::startWithPriority()        ->

         PooledThread::start(...)        ->

         |       getThread()        ->

         |               createThread();

         |               pThread->start();

         |                       Thread::start(Runnable& target)        ->

         |                               startImpl()        ->

         |                                       PooledThread::run()

         |                                               _targetReady.wait();

         |                                               pTarget->run();

         |                       _started.wait();

         | 

         PooledThread::start()        ->

                _targetReady.set()

在pTarget->run()之前之前会卡在_targetReady.wait(),直到_targetReady.set()。

而pTarget就是TCPServerDispatcher,所以pTarget->run()即为TCPServerDispatcher::run(),

TCPServerDispatcher::run()主要来处理连接:

_pConnectionFactory即为KeeperTCPHandlerFactory,

 KeeperTCPHandlerFactory会为连接创建一个处理线程KeeperTCPHandler,

 TCPServerDispatcher::run()调用pConnection->start()使用1个线程来处理连接,pConnection也就是KeeperTCPHandler,而KeeperTCPHandler继承于Poco::Net::TCPServerConnection,所以KeeperTCPHandler的start就是Poco::Net::TCPServerConnection的start。

Poco::Net::TCPServerConnection的start参考以前的逻辑,最终会调用到KeeperTCPHandler的run接口,简短KeeperTCPHandler::run的调用栈为:

 KeeperTCPHandler::run()        ->

        KeeperTCPHandler::runImpl()

KeeperTCPHandler处理连接

调用栈:

KeeperHandler监听两种fd:(1)socketfd(来自当前连接的请求)(2)来自KeeperRspT通过pipe的响应请求:

Keeper端通过KeeperRspT线程异步处理请求,所以当KeeperRspT处理完请求之后通过pipe将响应请求写到pipe的writeFd,之后KeeperHandler监听pipe的ReadFd。

TODO:KeeperTCPHandler线程处理请求逻辑。


网站公告

今日签到

点亮在社区的每一天
去签到