PostgreSQL的后期版本(16,17)在监听客户端链接以及客户端退出部分的监听处理部分由select 函数修改成epoll函数,具体原因不在此介绍
具体如下:
PG14版本还使用的select
static int
ServerLoop(void)
{
fd_set readmask;
int nSockets;
time_t last_lockfile_recheck_time,
last_touch_time;
last_lockfile_recheck_time = last_touch_time = time(NULL);
nSockets = initMasks(&readmask);
for (;;)
{
fd_set rmask;
int selres;
time_t now;
/*
* Wait for a connection request to arrive.
*
* We block all signals except while sleeping. That makes it safe for
* signal handlers, which again block all signals while executing, to
* do nontrivial work.
*
* If we are in PM_WAIT_DEAD_END state, then we don't want to accept
* any new connections, so we don't call select(), and just sleep.
*/
memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set));
if (pmState == PM_WAIT_DEAD_END)
{
PG_SETMASK(&UnBlockSig);
pg_usleep(100000L); /* 100 msec seems reasonable */
selres = 0;
PG_SETMASK(&BlockSig);
}
else
{
/* must set timeout each time; some OSes change it! */
struct timeval timeout;
/* Needs to run with blocked signals! */
DetermineSleepTime(&timeout);
PG_SETMASK(&UnBlockSig);
//监听 ipv4,ipv6,unix_socket和MyLatch
selres = select(nSockets, &rmask, NULL, NULL, &timeout);
PG_SETMASK(&BlockSig);
}
/* Now check the select() result */
if (selres < 0)
{
if (errno != EINTR && errno != EWOULDBLOCK)
{
ereport(LOG,
(errcode_for_socket_access(),
errmsg("select() failed in postmaster: %m")));
return STATUS_ERROR;
}
}
/*
* New connection pending on any of our sockets? If so, fork a child
* process to deal with it.
*/
if (selres > 0)
{
int i;
for (i = 0; i < MAXLISTEN; i++)
{
if (ListenSocket[i] == PGINVALID_SOCKET)
break;
if (FD_ISSET(ListenSocket[i], &rmask))
{
Port *port;
//如果是发生在ListenSocket上的事件,就发起新的链接
port = ConnCreate(ListenSocket[i]);
PG17
static int
ServerLoop(void)
{
time_t last_lockfile_recheck_time,
last_touch_time;
WaitEvent events[MAXLISTEN];
int nevents;
//添加各种监听对象到epoll对象中
ConfigurePostmasterWaitSet(true);
last_lockfile_recheck_time = last_touch_time = time(NULL);
for (;;)
{
time_t now;
nevents = WaitEventSetWait(pm_wait_set,
DetermineSleepTime(),
events,
lengthof(events),
0 /* postmaster posts no wait_events */ );
/*
* Latch set by signal handler, or new connection pending on any of
* our sockets? If the latter, fork a child process to deal with it.
*/
for (int i = 0; i < nevents; i++)
{
if (events[i].events & WL_LATCH_SET)
ResetLatch(MyLatch);
/*
* The following requests are handled unconditionally, even if we
* didn't see WL_LATCH_SET. This gives high priority to shutdown
* and reload requests where the latch happens to appear later in
* events[] or will be reported by a later call to
* WaitEventSetWait().
*/
if (pending_pm_shutdown_request)
process_pm_shutdown_request();
if (pending_pm_reload_request)
process_pm_reload_request();
if (pending_pm_child_exit)
process_pm_child_exit();
if (pending_pm_pmsignal)
process_pm_pmsignal();
if (events[i].events & WL_SOCKET_ACCEPT)
{
ClientSocket s;
//发起链接
if (AcceptConnection(events[i].fd, &s) == STATUS_OK)
BackendStartup(&s);
/* We no longer need the open socket in this process */
if (s.sock != PGINVALID_SOCKET)
{
if (closesocket(s.sock) != 0)
elog(LOG, "could not close client socket: %m");
}
}
}
static void
ConfigurePostmasterWaitSet(bool accept_connections)
{
if (pm_wait_set)
FreeWaitEventSet(pm_wait_set);
pm_wait_set = NULL;
//创建监听集合
pm_wait_set = CreateWaitEventSet(NULL,
accept_connections ? (1 + NumListenSockets) : 1);
//添加Latch到集合中,主要是为了处理进程退出,更新参数文件等使用,早期版本使用的self pipe实现
//后期版本使用了signalfd实现,具体看InitializeLatchSupport(latch.c)
AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
NULL);
//添加ipv4,ipv6,unix_socket到集合中,所以默认监听集合中有4个对象
if (accept_connections)
{
for (int i = 0; i < NumListenSockets; i++)
AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i],
NULL, NULL);
}
}
链接过程:
if (AcceptConnection(events[i].fd, &s) == STATUS_OK)
BackendStartup(&s);
static int
BackendStartup(ClientSocket *client_sock)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
BackendStartupData startup_data;
/*
* Create backend data structure. Better before the fork() so we can
* handle failure cleanly.
*/
bn = (Backend *) palloc_extended(sizeof(Backend), MCXT_ALLOC_NO_OOM);
if (!bn)
{
ereport(LOG,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
return STATUS_ERROR;
}
/*
* Compute the cancel key that will be assigned to this backend. The
* backend will have its own copy in the forked-off process' value of
* MyCancelKey, so that it can transmit the key to the frontend.
*/
if (!RandomCancelKey(&MyCancelKey))
{
pfree(bn);
ereport(LOG,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not generate random cancel key")));
return STATUS_ERROR;
}
/* Pass down canAcceptConnections state */
startup_data.canAcceptConnections = canAcceptConnections(BACKEND_TYPE_NORMAL);
bn->dead_end = (startup_data.canAcceptConnections != CAC_OK);
bn->cancel_key = MyCancelKey;
/*
* Unless it's a dead_end child, assign it a child slot number
*/
if (!bn->dead_end)
bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
else
bn->child_slot = 0;
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
//fork新的用户后台进程
pid = postmaster_child_launch(B_BACKEND,
(char *) &startup_data, sizeof(startup_data),
client_sock);
if (pid < 0)
{
/* in parent, fork failed */
int save_errno = errno;
if (!bn->dead_end)
(void) ReleasePostmasterChildSlot(bn->child_slot);
pfree(bn);
errno = save_errno;
ereport(LOG,
(errmsg("could not fork new process for connection: %m")));
report_fork_failure_to_client(client_sock, save_errno);
return STATUS_ERROR;
}
/* in parent, successful fork */
ereport(DEBUG2,
(errmsg_internal("forked new backend, pid=%d socket=%d",
(int) pid, (int) client_sock->sock)));
/*
* Everything's been successful, it's safe to add this backend to our list
* of backends.
*/
bn->pid = pid;
bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */
dlist_push_head(&BackendList, &bn->elem);
#ifdef EXEC_BACKEND
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
#endif
return STATUS_OK;
}