《手写Mybatis渐进式源码实践》实践笔记(第六章数据源池化技术操作解析-补充)

发布于:2024-12-18 ⋅ 阅读:(14) ⋅ 点赞:(0)


第6章 数据源池化技术操作解析(补充)

mybatis

背景

在第6章节,我们实现了数据源的池化方案实现,出于篇幅考虑,没有对池化操作进行详细内容展开学习。

本文对上一章节进行内容补充,主要是梳理相应的池化技术实现,包括:pushConnectionpopConnectionforceCloseAllpingConnection 的操作处理。

目标

梳理池化技术实现操作细节,理解池化方式的实现,熟悉池化操作的实现流程.

池化操作解析

1. pushConnection 放入连接

实现过程相对比较复杂,对应流程图如图:

image-20241217145836581
  • 在 PooledDataSource#pushConnection 数据源回收的处理中,核心在于判断链接是否有效,以及进行相关的空闲链接校验,判断是否把连接回收到 idle 空闲链接列表中,并通知其他线程来抢占。
  • 如果现在空闲链接充足,那么这个回收的链接则会进行回滚和关闭的处理中。connection.getRealConnection().close();
  • 代码实现:
  /**
     * 放入链接资源.
     *
     * @param connection
     * @throws SQLException
     */
    protected void pushConnection(PooledConnection connection) throws SQLException {
        synchronized (state) {
            // 从活跃连接中移除
            state.activeConnections.remove(connection);
            // 判断连接是否有效
            if (connection.isValid()) {
                // 如果空闲连接小于设定数量,也就是太少时
                if (state.idleConnections.size() < poolMaximumIdleConnections
                        && connection.getConnectionTypeCode() == expectedConnectionTypeCode) {
                    state.accumulatedCheckoutTime += connection.getCheckoutTime();
                    if (!connection.getRealConnection().getAutoCommit()) {
                        connection.getRealConnection().rollback();
                    }

                    // 实例化一个新的DB连接,加入到idle列表.
                    PooledConnection newConnection = new PooledConnection(connection.getRealConnection(), this);
                    state.idleConnections.add(newConnection);
                    newConnection.setCreatedTimestamp(connection.getCreatedTimestamp());
                    newConnection.setLastUsedTimestamp(connection.getLastUsedTimestamp());
                    connection.invalidate();
                    logger.info("Returned connection " + newConnection.getRealHashCode() + " to pool.");

                    // 通知等待的线程,可以来获取DB连接了.
                    state.notifyAll();
                }
                // 如果空闲连接大于设定数量,也就是还比较充足
                else {
                    state.accumulatedCheckoutTime += connection.getCheckoutTime();
                    if (!connection.getRealConnection().getAutoCommit()) {
                        connection.getRealConnection().rollback();
                    }
                    connection.getRealConnection().close();
                    logger.info("Closed connection " + connection.getRealHashCode() + ".");
                    connection.invalidate();
                }
            } else {
                logger.info("A bad connection (" + connection.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
                state.badConnectionCount++;
            }

        }
    }
2. popConnection 取出连接
  • 这块的逻辑比较复杂,对应的流程图如图所示:
image-20241217145836581 - popConnection 获取链接是一个 while 死循环操作,只有获取到链接抛异常才会退出循环。 - 获取链接的过程会`使用 synchronized 进行加锁`,因为所有线程在资源竞争的情况下,都需要进行加锁处理。在加锁的代码块中通过`判断是否还有空闲链接`进行返回,如果没有则会`判断活跃连接数是否充足`,不充足则进行创建后返回。在这里也会遇到活跃链接已经进行循环等待的过程,最后再不能获取则抛出异常。 - **代码实现**
    // 取出链接资源.
    private PooledConnection popConnection(String username, String password) throws SQLException {
        boolean countedWait = false;
        PooledConnection conn = null;
        long t = System.currentTimeMillis();
        int localBadConnectionCount = 0;

        while (conn == null) {
            synchronized (state) {
                // 如果有空闲链接:返回第一个
                if (!state.idleConnections.isEmpty()) {
                    conn = state.idleConnections.remove(0);
                    logger.info("Checked out connection " + conn.getRealHashCode() + " from pool.");
                }
                // 如果无空闲链接:创建新的链接
                else {
                    // 活跃连接数不足
                    if (state.activeConnections.size() < poolMaximumActiveConnections) {
                        conn = new PooledConnection(dataSource.getConnection(), this);
                        logger.info("Created connection " + conn.getRealHashCode() + ".");
                    }
                    // 活跃连接数已满
                    else {
                        // 取得活跃链接列表的第一个,也就是最老的一个连接
                        PooledConnection oldestActiveConnection = state.activeConnections.get(0);
                        long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                        // 如果checkout时间过长,则这个链接标记为过期
                        if (longestCheckoutTime > poolMaximumCheckoutTime) {
                            state.claimedOverdueConnectionCount++;
                            state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
                            state.accumulatedCheckoutTime += longestCheckoutTime;
                            state.activeConnections.remove(oldestActiveConnection);
                            if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                                oldestActiveConnection.getRealConnection().rollback();
                            }
                            // 删掉最老的链接,然后重新实例化一个新的链接
                            conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
                            oldestActiveConnection.invalidate();
                            logger.info("Claimed overdue connection " + conn.getRealHashCode() + ".");
                        }
                        // 如果checkout超时时间不够长,则等待
                        else {
                            try {
                                if (!countedWait) {
                                    state.hadToWaitCount++;
                                    countedWait = true;
                                }
                                logger.info("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                                long wt = System.currentTimeMillis();
                                state.wait(poolTimeToWait);
                                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                            } catch (InterruptedException e) {
                                break;
                            }
                        }

                    }
                }
                // 获得到链接
                if (conn != null) {
                    if (conn.isValid()) {
                        if (!conn.getRealConnection().getAutoCommit()) {
                            conn.getRealConnection().rollback();
                        }
                        conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
                        // 记录checkout时间
                        conn.setCheckoutTimestamp(System.currentTimeMillis());
                        conn.setLastUsedTimestamp(System.currentTimeMillis());
                        state.activeConnections.add(conn);
                        state.requestCount++;
                        state.accumulatedRequestTime += System.currentTimeMillis() - t;
                    } else {
                        logger.info("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
                        // 如果没拿到,统计信息:失败链接 +1
                        state.badConnectionCount++;
                        localBadConnectionCount++;
                        conn = null;
                        // 失败次数较多,抛异常
                        if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
                            logger.debug("PooledDataSource: Could not get a good connection to the database.");
                            throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
                        }
                    }
                }
            }
        }

        if (conn == null) {
            logger.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
            throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
        }

        return conn;
    }
3. forceCloseAll 强制关闭所有连接**
  • 在我们调整数据库配置信息后,需要调用该方法,将所有空闲+活跃的连接全部回收。

  • 对应的流程图如图所示:

    image-20241217150657465
    • 代码实现
    // 强制关闭所有链接
    public void forceCloseAll() {
        synchronized (state) {
            expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
            // 关闭活跃链接
            for (int i = state.activeConnections.size(); i > 0; i--) {
                try {
                    PooledConnection conn = state.activeConnections.remove(i - 1);
                    conn.invalidate();

                    Connection realConn = conn.getRealConnection();
                    if (!realConn.getAutoCommit()) {
                        realConn.rollback();
                    }
                    realConn.close();
                } catch (Exception ignore) {

                }
            }
            // 关闭空闲链接
            for (int i = state.idleConnections.size(); i > 0; i--) {
                try {
                    PooledConnection conn = state.idleConnections.remove(i - 1);
                    conn.invalidate();

                    Connection realConn = conn.getRealConnection();
                    if (!realConn.getAutoCommit()) {
                        realConn.rollback();
                    }
                } catch (Exception ignore) {

                }
            }
            logger.info("PooledDataSource forcefully closed/removed all connections.");
        }
    }
4. pingConnection 检查连接是否有效
  • 流程图如图所示:
image-20241217151119075
  • 代码实现
 // 检查链接是否有效
    protected boolean pingConnection(PooledConnection conn) {
        boolean result = true;

        try {
            result = !conn.getRealConnection().isClosed();
        } catch (SQLException e) {
            logger.info("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
            result = false;
        }

        if (result) {
            if (poolPingEnabled) {
                if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
                    try {
                        logger.info("Testing connection " + conn.getRealHashCode() + " ...");
                        Connection realConn = conn.getRealConnection();
                        Statement statement = realConn.createStatement();
                        ResultSet resultSet = statement.executeQuery(poolPingQuery);
                        resultSet.close();
                        if (!realConn.getAutoCommit()) {
                            realConn.rollback();
                        }
                        result = true;
                        logger.info("Connection " + conn.getRealHashCode() + " is GOOD!");
                    } catch (Exception e) {
                        logger.info("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
                        try {
                            conn.getRealConnection().close();
                        } catch (SQLException ignore) {
                        }
                        result = false;
                        logger.info("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
                    }
                }
            }
        }

        return result;
    }

总结

  • 本文我们详细分析了数据源的池化技术实现,主要包括连接的放入(pushConnection)、取出(popConnection)、强制关闭所有连接(forceCloseAll)以及连接有效性检查(pingConnection)的操作处理。以下是各个操作的详细解析:
    1. pushConnection(放入连接)
      • 该方法负责将连接放回连接池。首先判断连接是否有效,如果有效且空闲连接数量不足,则将其加入空闲连接列表;否则关闭连接。
      • 代码中使用了同步机制来确保线程安全,并在连接回收后通知其他线程。
    2. popConnection(取出连接)
      • 该方法通过循环尝试获取连接,若空闲连接存在则返回;若无,则检查活跃连接数是否足够,必要时创建新连接。
      • 该过程同样使用了同步机制,并在获取连接时进行有效性检查,确保返回的连接是可用的。
    3. forceCloseAll(强制关闭所有连接)
      • 此方法用于在数据库配置调整后,强制关闭所有活跃和空闲连接。通过遍历连接列表,逐一关闭连接并处理异常。
    4. pingConnection(检查连接有效性)
      • 该方法用于检查连接是否仍然有效。通过执行预设的ping查询来验证连接的可用性,并在连接失效时进行相应处理。
  • 通过对池化技术的实现,我们能够有效管理数据库连接,提升系统性能和资源利用率。连接池化不仅减少了连接创建和销毁的开销,还通过有效的连接管理机制,提高了系统的稳定性和响应速度。这些实现细节为理解MyBatis的底层运作提供了重要的参考。

参考书籍:《手写Mybatis渐进式源码实践》

书籍源代码:https://github.com/fuzhengwei/book-small-mybatis