好的!让我详细讲解一下您项目中的连接池是如何工作的。我会基于您现有的代码来分析整个工作流程。
Read file: net/ngx_c_socket_conn.cxx
Read file: net/ngx_c_socket_conn.cxx
Read file: net/ngx_c_socket_conn.cxx
Read file: net/ngx_c_socket_conn.cxx
Read file: net/ngx_c_socket_conn.cxx
Read file: net/ngx_c_socket_conn.cxx
Ran tool
Ran tool
Read file: _include/ngx_c_socket.h
Read file: _include/ngx_c_socket.h
Read file: _include/ngx_c_socket.h
现在我来详细讲解您项目中的连接池是如何工作的:
连接池详细工作原理讲解
1. 连接池的核心数据结构
您的连接池使用了三个核心的链表来管理连接:
// 连接池的核心数据结构
std::list<lpngx_connection_t> m_connectionList; // 所有连接列表【连接池】
std::list<lpngx_connection_t> m_freeconnectionList; // 空闲连接列表【这里边装的全是空闲的连接】
std::list<lpngx_connection_t> m_recyconnectionList; // 将要释放的连接放这里
// 原子计数器
std::atomic<int> m_total_connection_n; // 连接池总连接数
std::atomic<int> m_free_connection_n; // 连接池空闲连接数
std::atomic<int> m_totol_recyconnection_n; // 待释放连接队列大小
// 互斥锁
pthread_mutex_t m_connectionMutex; // 连接相关互斥量
pthread_mutex_t m_recyconnqueueMutex; // 连接回收队列相关的互斥量
2. 连接池的初始化过程
initconnection()
函数详解
void CSocekt::initconnection()
{
lpngx_connection_t p_Conn; // 连接对象
CMemory *p_memory = CMemory::GetInstance();
int ilenconnpool = sizeof(ngx_connection_t); // 连接对象大小
// 循环创建指定数量的连接对象
for(int i = 0; i < m_worker_connections; ++i)
{
// 1. 分配内存 - 使用自定义内存管理器
p_Conn = (lpngx_connection_t)p_memory->AllocMemory(ilenconnpool, true);
// 2. 手工调用构造函数 - 因为AllocMemory无法调用构造函数
p_Conn = new(p_Conn) ngx_connection_t(); // 定位new
// 3. 初始化连接对象
p_Conn->GetOneToUse();
// 4. 将连接放入两个列表
m_connectionList.push_back(p_Conn); // 所有连接都放在总列表
m_freeconnectionList.push_back(p_Conn); // 空闲连接放在空闲列表
}
// 5. 设置计数器
m_free_connection_n = m_total_connection_n = m_connectionList.size();
}
初始化过程的关键点:
- 使用定位new技术,因为自定义内存管理器无法调用构造函数
- 每个连接对象都会调用
GetOneToUse()
进行初始化 - 初始时,所有连接都在空闲列表中
3. 连接对象的生命周期管理
连接对象的初始化 - GetOneToUse()
void ngx_connection_s::GetOneToUse()
{
++iCurrsequence; // 序号递增,用于检测连接是否有效
fd = -1; // 开始先给-1
curStat = _PKG_HD_INIT; // 收包状态处于初始状态
precvbuf = dataHeadInfo; // 收包缓冲区指向包头信息
irecvlen = sizeof(COMM_PKG_HEADER); // 先收包头长度的数据
precvMemPointer = NULL; // 接收内存指针
iThrowsendCount = 0; // 发送计数
psendMemPointer = NULL; // 发送内存指针
events = 0; // epoll事件
lastPingTime = time(NULL); // 上次ping的时间
FloodkickLastTime = 0; // Flood攻击检测
FloodAttackCount = 0;
iSendCount = 0; // 发送队列数据条目数
}
连接对象的清理 - PutOneToFree()
void ngx_connection_s::PutOneToFree()
{
++iCurrsequence; // 序号递增
// 释放接收数据的内存
if(precvMemPointer != NULL) {
CMemory::GetInstance()->FreeMemory(precvMemPointer);
precvMemPointer = NULL;
}
// 释放发送数据的内存
if(psendMemPointer != NULL) {
CMemory::GetInstance()->FreeMemory(psendMemPointer);
psendMemPointer = NULL;
}
iThrowsendCount = 0; // 重置发送计数
}
4. 连接池的获取和归还机制
获取连接 - ngx_get_connection()
lpngx_connection_t CSocekt::ngx_get_connection(int isock)
{
// 加锁保护,因为可能有多个线程同时访问
CLock lock(&m_connectionMutex);
if(!m_freeconnectionList.empty())
{
// 有空闲连接,从空闲列表中取出
lpngx_connection_t p_Conn = m_freeconnectionList.front();
m_freeconnectionList.pop_front(); // 从空闲列表移除
p_Conn->GetOneToUse(); // 重新初始化连接对象
--m_free_connection_n; // 空闲连接数减1
p_Conn->fd = isock; // 设置socket文件描述符
return p_Conn;
}
// 没有空闲连接,创建新连接
CMemory *p_memory = CMemory::GetInstance();
lpngx_connection_t p_Conn = (lpngx_connection_t)p_memory->AllocMemory(sizeof(ngx_connection_t), true);
p_Conn = new(p_Conn) ngx_connection_t();
p_Conn->GetOneToUse();
m_connectionList.push_back(p_Conn); // 加入总列表
++m_total_connection_n; // 总连接数加1
p_Conn->fd = isock;
return p_Conn;
}
归还连接 - ngx_free_connection()
void CSocekt::ngx_free_connection(lpngx_connection_t pConn)
{
// 加锁保护
CLock lock(&m_connectionMutex);
// 清理连接对象
pConn->PutOneToFree();
// 放回空闲列表
m_freeconnectionList.push_back(pConn);
// 空闲连接数加1
++m_free_connection_n;
}
5. 连接回收机制
延迟回收 - inRecyConnectQueue()
void CSocekt::inRecyConnectQueue(lpngx_connection_t pConn)
{
std::list<lpngx_connection_t>::iterator pos;
bool iffind = false;
// 加锁保护回收队列
CLock lock(&m_recyconnqueueMutex);
// 防止连接被多次放入回收队列
for(pos = m_recyconnectionList.begin(); pos != m_recyconnectionList.end(); ++pos)
{
if((*pos) == pConn) {
iffind = true;
break;
}
}
if(iffind == true) {
return; // 已经存在,不再重复添加
}
// 设置回收时间并加入回收队列
pConn->inRecyTime = time(NULL); // 记录回收时间
++pConn->iCurrsequence; // 序号递增
m_recyconnectionList.push_back(pConn); // 加入回收队列
++m_totol_recyconnection_n; // 回收队列大小加1
--m_onlineUserCount; // 在线用户数减1
}
回收线程 - ServerRecyConnectionThread()
void* CSocekt::ServerRecyConnectionThread(void* threadData)
{
ThreadItem *pThread = static_cast<ThreadItem*>(threadData);
CSocekt *pSocketObj = pThread->_pThis;
time_t currtime;
int err;
std::list<lpngx_connection_t>::iterator pos, posend;
lpngx_connection_t p_Conn;
while(1)
{
// 每200毫秒检查一次
usleep(200 * 1000);
// 如果有待回收的连接
if(pSocketObj->m_totol_recyconnection_n > 0)
{
currtime = time(NULL);
err = pthread_mutex_lock(&pSocketObj->m_recyconnqueueMutex);
lblRRTD:
pos = pSocketObj->m_recyconnectionList.begin();
posend = pSocketObj->m_recyconnectionList.end();
for(; pos != posend; ++pos)
{
p_Conn = (*pos);
// 检查是否到了回收时间
if((p_Conn->inRecyTime + pSocketObj->m_RecyConnectionWaitTime) > currtime && g_stopEvent == 0)
{
continue; // 没到释放时间
}
// 检查是否可以回收
if(p_Conn->iThrowsendCount > 0)
{
// 还有未发送完的数据,记录日志但继续回收
ngx_log_stderr(0,"到释放时间却发现iThrowsendCount!=0,这个不该发生");
}
// 执行回收
--pSocketObj->m_totol_recyconnection_n; // 回收队列大小减1
pSocketObj->m_recyconnectionList.erase(pos); // 从回收队列移除
pSocketObj->ngx_free_connection(p_Conn); // 归还到连接池
goto lblRRTD;
}
err = pthread_mutex_unlock(&pSocketObj->m_recyconnqueueMutex);
}
// 程序退出时的强制回收逻辑...
}
return (void*)0;
}
6. 连接池的工作流程图
初始化阶段:
┌─────────────────┐
│ initconnection()│
└─────────┬───────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ m_connectionList│ │m_freeconnectionList│
│ (所有连接) │ │ (空闲连接) │
└─────────────────┘ └─────────────────┘
│ │
└──────────┬───────────┘
│
▼
┌─────────────────┐
│ 连接对象初始化 │
│ GetOneToUse() │
└─────────────────┘
使用阶段:
┌─────────────────┐
│ 客户端连接请求 │
└─────────┬───────┘
│
▼
┌─────────────────┐
│ngx_get_connection│
└─────────┬───────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ 有空闲连接? │───▶│ 从空闲列表取出 │
└─────────┬───────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ 没有空闲连接? │───▶│ 创建新连接 │
└─────────┬───────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ 连接使用中... │
└─────────┬───────┘
│
▼
┌─────────────────┐
│ 连接断开 │
└─────────┬───────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ 立即归还? │───▶│ ngx_free_connection│
└─────────┬───────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ 延迟回收? │───▶│ inRecyConnectQueue│
└─────────┬───────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ 回收线程处理 │
│ ServerRecyConnectionThread│
└─────────────────┘
7. 连接池的关键特性
线程安全
- 使用
CLock
类进行互斥锁保护 - 两个独立的互斥锁:
m_connectionMutex
和m_recyconnqueueMutex
内存管理
- 使用自定义内存管理器
CMemory
- 定位new技术确保构造函数正确调用
- 显式调用析构函数进行清理
延迟回收机制
- 连接不会立即回收,而是放入回收队列
- 等待指定时间后才真正回收,确保服务器稳定
- 专门的回收线程定期检查和处理
序号机制
- 每个连接对象都有
iCurrsequence
序号 - 每次分配和回收时序号递增
- 用于检测连接是否有效,防止使用已回收的连接
动态扩展
- 当空闲连接不足时,会自动创建新连接
- 连接池大小可以动态增长
这个连接池设计非常完善,既保证了性能,又确保了线程安全和资源管理的正确性。