PostgreSQL 处理链接请求

发布于:2025-05-24 ⋅ 阅读:(17) ⋅ 点赞:(0)

PostgreSQL的后期版本(16,17)在监听客户端链接以及客户端退出部分的监听处理部分由select 函数修改成epoll函数,具体原因不在此介绍

具体如下:

PG14版本还使用的select

static int
ServerLoop(void)
{
        fd_set          readmask;
        int                     nSockets;
        time_t          last_lockfile_recheck_time,
                                last_touch_time;

        last_lockfile_recheck_time = last_touch_time = time(NULL);

        nSockets = initMasks(&readmask);

        for (;;)
        {
                fd_set          rmask;
                int                     selres;
                time_t          now;

                /*
                 * Wait for a connection request to arrive.
                 *
                 * We block all signals except while sleeping. That makes it safe for
                 * signal handlers, which again block all signals while executing, to
                 * do nontrivial work.
                 *
                 * If we are in PM_WAIT_DEAD_END state, then we don't want to accept
                 * any new connections, so we don't call select(), and just sleep.
                 */
                memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set));

                if (pmState == PM_WAIT_DEAD_END)
                {
                        PG_SETMASK(&UnBlockSig);

                        pg_usleep(100000L); /* 100 msec seems reasonable */
                        selres = 0;

                        PG_SETMASK(&BlockSig);
                }
                else
                {
                        /* must set timeout each time; some OSes change it! */
                        struct timeval timeout;

                        /* Needs to run with blocked signals! */
                        DetermineSleepTime(&timeout);

                        PG_SETMASK(&UnBlockSig);

                        //监听 ipv4,ipv6,unix_socket和MyLatch
                        selres = select(nSockets, &rmask, NULL, NULL, &timeout);

                        PG_SETMASK(&BlockSig);
                }

                /* Now check the select() result */
                if (selres < 0)
                {
                        if (errno != EINTR && errno != EWOULDBLOCK)
                        {
                                ereport(LOG,
                                                (errcode_for_socket_access(),
                                                 errmsg("select() failed in postmaster: %m")));
                                return STATUS_ERROR;
                        }
                }

                /*
                 * New connection pending on any of our sockets? If so, fork a child
                 * process to deal with it.
                 */
                if (selres > 0)
                {
                        int                     i;

                        for (i = 0; i < MAXLISTEN; i++)
                        {
                                if (ListenSocket[i] == PGINVALID_SOCKET)
                                        break;
                                if (FD_ISSET(ListenSocket[i], &rmask))
                                {
                                        Port       *port;
                                        //如果是发生在ListenSocket上的事件,就发起新的链接
                                        port = ConnCreate(ListenSocket[i]);

PG17

static int
ServerLoop(void)
{
	time_t		last_lockfile_recheck_time,
				last_touch_time;
	WaitEvent	events[MAXLISTEN];
	int			nevents;

    //添加各种监听对象到epoll对象中
	ConfigurePostmasterWaitSet(true);
	last_lockfile_recheck_time = last_touch_time = time(NULL);

	for (;;)
	{
		time_t		now;

		nevents = WaitEventSetWait(pm_wait_set,
								   DetermineSleepTime(),
								   events,
								   lengthof(events),
								   0 /* postmaster posts no wait_events */ );

		/*
		 * Latch set by signal handler, or new connection pending on any of
		 * our sockets? If the latter, fork a child process to deal with it.
		 */
		for (int i = 0; i < nevents; i++)
		{
			if (events[i].events & WL_LATCH_SET)
				ResetLatch(MyLatch);

			/*
			 * The following requests are handled unconditionally, even if we
			 * didn't see WL_LATCH_SET.  This gives high priority to shutdown
			 * and reload requests where the latch happens to appear later in
			 * events[] or will be reported by a later call to
			 * WaitEventSetWait().
			 */
			if (pending_pm_shutdown_request)
				process_pm_shutdown_request();
			if (pending_pm_reload_request)
				process_pm_reload_request();
			if (pending_pm_child_exit)
				process_pm_child_exit();
			if (pending_pm_pmsignal)
				process_pm_pmsignal();

			if (events[i].events & WL_SOCKET_ACCEPT)
			{
				ClientSocket s;

//发起链接
				if (AcceptConnection(events[i].fd, &s) == STATUS_OK)
					BackendStartup(&s);

				/* We no longer need the open socket in this process */
				if (s.sock != PGINVALID_SOCKET)
				{
					if (closesocket(s.sock) != 0)
						elog(LOG, "could not close client socket: %m");
				}
			}
		}

  

static void
ConfigurePostmasterWaitSet(bool accept_connections)
{
	if (pm_wait_set)
		FreeWaitEventSet(pm_wait_set);
	pm_wait_set = NULL;

    //创建监听集合
	pm_wait_set = CreateWaitEventSet(NULL,
									 accept_connections ? (1 + NumListenSockets) : 1);

    //添加Latch到集合中,主要是为了处理进程退出,更新参数文件等使用,早期版本使用的self pipe实现
    //后期版本使用了signalfd实现,具体看InitializeLatchSupport(latch.c)
	AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
					  NULL);

    //添加ipv4,ipv6,unix_socket到集合中,所以默认监听集合中有4个对象
	if (accept_connections)
	{
		for (int i = 0; i < NumListenSockets; i++)
			AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i],
							  NULL, NULL);
	}
}

链接过程:

if (AcceptConnection(events[i].fd, &s) == STATUS_OK)
		BackendStartup(&s);


static int
BackendStartup(ClientSocket *client_sock)
{
	Backend    *bn;				/* for backend cleanup */
	pid_t		pid;
	BackendStartupData startup_data;

	/*
	 * Create backend data structure.  Better before the fork() so we can
	 * handle failure cleanly.
	 */
	bn = (Backend *) palloc_extended(sizeof(Backend), MCXT_ALLOC_NO_OOM);
	if (!bn)
	{
		ereport(LOG,
				(errcode(ERRCODE_OUT_OF_MEMORY),
				 errmsg("out of memory")));
		return STATUS_ERROR;
	}

	/*
	 * Compute the cancel key that will be assigned to this backend. The
	 * backend will have its own copy in the forked-off process' value of
	 * MyCancelKey, so that it can transmit the key to the frontend.
	 */
	if (!RandomCancelKey(&MyCancelKey))
	{
		pfree(bn);
		ereport(LOG,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("could not generate random cancel key")));
		return STATUS_ERROR;
	}

	/* Pass down canAcceptConnections state */
	startup_data.canAcceptConnections = canAcceptConnections(BACKEND_TYPE_NORMAL);
	bn->dead_end = (startup_data.canAcceptConnections != CAC_OK);
	bn->cancel_key = MyCancelKey;

	/*
	 * Unless it's a dead_end child, assign it a child slot number
	 */
	if (!bn->dead_end)
		bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
	else
		bn->child_slot = 0;

	/* Hasn't asked to be notified about any bgworkers yet */
	bn->bgworker_notify = false;

    //fork新的用户后台进程
	pid = postmaster_child_launch(B_BACKEND,
								  (char *) &startup_data, sizeof(startup_data),
								  client_sock);
	if (pid < 0)
	{
		/* in parent, fork failed */
		int			save_errno = errno;

		if (!bn->dead_end)
			(void) ReleasePostmasterChildSlot(bn->child_slot);
		pfree(bn);
		errno = save_errno;
		ereport(LOG,
				(errmsg("could not fork new process for connection: %m")));
		report_fork_failure_to_client(client_sock, save_errno);
		return STATUS_ERROR;
	}

	/* in parent, successful fork */
	ereport(DEBUG2,
			(errmsg_internal("forked new backend, pid=%d socket=%d",
							 (int) pid, (int) client_sock->sock)));

	/*
	 * Everything's been successful, it's safe to add this backend to our list
	 * of backends.
	 */
	bn->pid = pid;
	bn->bkend_type = BACKEND_TYPE_NORMAL;	/* Can change later to WALSND */
	dlist_push_head(&BackendList, &bn->elem);

#ifdef EXEC_BACKEND
	if (!bn->dead_end)
		ShmemBackendArrayAdd(bn);
#endif

	return STATUS_OK;
}


网站公告

今日签到

点亮在社区的每一天
去签到