文章目录
第6章 数据源池化技术操作解析(补充)
背景
在第6章节,我们实现了数据源的池化方案实现,出于篇幅考虑,没有对池化操作进行详细内容展开学习。
本文对上一章节进行内容补充,主要是梳理相应的池化技术实现
,包括:pushConnection
、popConnection
、forceCloseAll
、pingConnection
的操作处理。
目标
梳理池化技术实现操作细节,理解池化方式的实现,熟悉池化操作的实现流程.
池化操作解析
1. pushConnection 放入连接
实现过程相对比较复杂,对应流程图如图:
- 在 PooledDataSource#pushConnection 数据源回收的处理中,核心在于判断链接是否有效,以及进行相关的空闲链接校验,判断是否把连接回收到 idle 空闲链接列表中,并通知其他线程来抢占。
- 如果现在空闲链接充足,那么这个回收的链接则会进行回滚和关闭的处理中。connection.getRealConnection().close();
- 代码实现:
/**
* 放入链接资源.
*
* @param connection
* @throws SQLException
*/
protected void pushConnection(PooledConnection connection) throws SQLException {
synchronized (state) {
// 从活跃连接中移除
state.activeConnections.remove(connection);
// 判断连接是否有效
if (connection.isValid()) {
// 如果空闲连接小于设定数量,也就是太少时
if (state.idleConnections.size() < poolMaximumIdleConnections
&& connection.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime += connection.getCheckoutTime();
if (!connection.getRealConnection().getAutoCommit()) {
connection.getRealConnection().rollback();
}
// 实例化一个新的DB连接,加入到idle列表.
PooledConnection newConnection = new PooledConnection(connection.getRealConnection(), this);
state.idleConnections.add(newConnection);
newConnection.setCreatedTimestamp(connection.getCreatedTimestamp());
newConnection.setLastUsedTimestamp(connection.getLastUsedTimestamp());
connection.invalidate();
logger.info("Returned connection " + newConnection.getRealHashCode() + " to pool.");
// 通知等待的线程,可以来获取DB连接了.
state.notifyAll();
}
// 如果空闲连接大于设定数量,也就是还比较充足
else {
state.accumulatedCheckoutTime += connection.getCheckoutTime();
if (!connection.getRealConnection().getAutoCommit()) {
connection.getRealConnection().rollback();
}
connection.getRealConnection().close();
logger.info("Closed connection " + connection.getRealHashCode() + ".");
connection.invalidate();
}
} else {
logger.info("A bad connection (" + connection.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
state.badConnectionCount++;
}
}
}
2. popConnection 取出连接
- 这块的逻辑比较复杂,对应的流程图如图所示:
// 取出链接资源.
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
while (conn == null) {
synchronized (state) {
// 如果有空闲链接:返回第一个
if (!state.idleConnections.isEmpty()) {
conn = state.idleConnections.remove(0);
logger.info("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
// 如果无空闲链接:创建新的链接
else {
// 活跃连接数不足
if (state.activeConnections.size() < poolMaximumActiveConnections) {
conn = new PooledConnection(dataSource.getConnection(), this);
logger.info("Created connection " + conn.getRealHashCode() + ".");
}
// 活跃连接数已满
else {
// 取得活跃链接列表的第一个,也就是最老的一个连接
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
// 如果checkout时间过长,则这个链接标记为过期
if (longestCheckoutTime > poolMaximumCheckoutTime) {
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
state.activeConnections.remove(oldestActiveConnection);
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
oldestActiveConnection.getRealConnection().rollback();
}
// 删掉最老的链接,然后重新实例化一个新的链接
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
oldestActiveConnection.invalidate();
logger.info("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
// 如果checkout超时时间不够长,则等待
else {
try {
if (!countedWait) {
state.hadToWaitCount++;
countedWait = true;
}
logger.info("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait);
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
// 获得到链接
if (conn != null) {
if (conn.isValid()) {
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
// 记录checkout时间
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
logger.info("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
// 如果没拿到,统计信息:失败链接 +1
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
// 失败次数较多,抛异常
if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
logger.debug("PooledDataSource: Could not get a good connection to the database.");
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
logger.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
3. forceCloseAll 强制关闭所有连接**
在我们调整数据库配置信息后,需要调用该方法,将所有空闲+活跃的连接全部回收。
对应的流程图如图所示:
- 代码实现
// 强制关闭所有链接
public void forceCloseAll() {
synchronized (state) {
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
// 关闭活跃链接
for (int i = state.activeConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.activeConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception ignore) {
}
}
// 关闭空闲链接
for (int i = state.idleConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.idleConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
} catch (Exception ignore) {
}
}
logger.info("PooledDataSource forcefully closed/removed all connections.");
}
}
4. pingConnection 检查连接是否有效
- 流程图如图所示:
- 代码实现
// 检查链接是否有效
protected boolean pingConnection(PooledConnection conn) {
boolean result = true;
try {
result = !conn.getRealConnection().isClosed();
} catch (SQLException e) {
logger.info("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
result = false;
}
if (result) {
if (poolPingEnabled) {
if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
try {
logger.info("Testing connection " + conn.getRealHashCode() + " ...");
Connection realConn = conn.getRealConnection();
Statement statement = realConn.createStatement();
ResultSet resultSet = statement.executeQuery(poolPingQuery);
resultSet.close();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true;
logger.info("Connection " + conn.getRealHashCode() + " is GOOD!");
} catch (Exception e) {
logger.info("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
try {
conn.getRealConnection().close();
} catch (SQLException ignore) {
}
result = false;
logger.info("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
}
}
}
return result;
}
总结
- 本文我们详细分析了数据源的池化技术实现,主要包括连接的放入(
pushConnection
)、取出(popConnection
)、强制关闭所有连接(forceCloseAll
)以及连接有效性检查(pingConnection
)的操作处理。以下是各个操作的详细解析:- pushConnection(放入连接):
- 该方法负责将连接放回连接池。首先判断连接是否有效,如果有效且空闲连接数量不足,则将其加入空闲连接列表;否则关闭连接。
- 代码中使用了同步机制来确保线程安全,并在连接回收后通知其他线程。
- popConnection(取出连接):
- 该方法通过循环尝试获取连接,若空闲连接存在则返回;若无,则检查活跃连接数是否足够,必要时创建新连接。
- 该过程同样使用了同步机制,并在获取连接时进行有效性检查,确保返回的连接是可用的。
- forceCloseAll(强制关闭所有连接):
- 此方法用于在数据库配置调整后,强制关闭所有活跃和空闲连接。通过遍历连接列表,逐一关闭连接并处理异常。
- pingConnection(检查连接有效性):
- 该方法用于检查连接是否仍然有效。通过执行预设的ping查询来验证连接的可用性,并在连接失效时进行相应处理。
- pushConnection(放入连接):
- 通过对池化技术的实现,我们能够有效管理数据库连接,提升系统性能和资源利用率。连接池化不仅减少了连接创建和销毁的开销,还通过有效的连接管理机制,提高了系统的稳定性和响应速度。这些实现细节为理解MyBatis的底层运作提供了重要的参考。
参考书籍:《手写Mybatis渐进式源码实践》
书籍源代码:https://github.com/fuzhengwei/book-small-mybatis