3.1.3 MYSQL连接池

发布于:2025-03-29 ⋅ 阅读:(22) ⋅ 点赞:(0)

3.1.3 MYSQL连接池

1.维持管理固定数量的链接,复用连接资源

  1. MySQL 连接池的核心是通过 MySQL 驱动 来管理数据库连接。MySQL 服务器端和客户端(如应用程序)需要通过 特定的协议 进行通信,而这个协议需要处理 编码(encode)解码(decode),以保证数据的正确传输。

  2. 连接池对应着MYSQL里面的一个database,一个连接池通常对应一个 database,多个database就要多个连接池,可以访问database中的任何表(有权限)

  3. 在数据库应用中,每次建立和释放数据库连接的成本很高。连接池的主要作用是:

    • 预先创建 一定数量的数据库连接,避免频繁创建销毁连接带来的开销。
    • 复用已有的连接,提高数据库访问效率。
    • 控制最大连接数,防止数据库服务器过载

2. MYSQL网络模型

  1. 主线程的作用
    MySQL 服务器采用 事件驱动 的 Reactor 模型 来管理客户端连接,其主线程的主要任务包括:

    1. 监听新的客户端连接(listenfd 监听文件描述符)
    2. 检查已有连接是否有新数据(select 或其他 I/O 多路复用机制,如 epoll)
    3. 将新的连接分配给工作线程
  2. 同一条连接收到多个请求,串行执行。不同连接之间的请求是可以并行执行的,因为它们是由不同的工作线程处理的


3. MYSQL连接驱动使用

  1. ubuntu安装libmysqlclient-dev
  2. 加载mysql.h以及静态库或者动态库
  3. 初始化连接mysql_library_init
  4. 使用mysql连接驱动与MYSQL进行交互(执行SQL语句)
  5. 释放连接资源mysql_library_end

4. 同步连接池

1. 执行一个sql语句,怎么拿到数据库返回值?

同步 vs. 异步,执行 SQL 语句时:
- 同步 (Synchronous):调用 mysql_query() 直接等待 MySQL 返回结果。
- 异步 (Asynchronous):在 线程池 或 事件驱动 机制中执行 SQL,调用线程不会被阻塞

2. MySQL 执行流程

特征:使用阻塞的io
mysql_query:

  1. sql通过MYSQL协议打包
  2. 数据send/write
  3. read会阻塞线程等待MYSQL的返回
  4. 确定是一个完整的回应包之后,协议解析,将结果返回
    所以是一个耗时操作,需要优化

在这里插入图片描述

3. 同步连接池的使用场景

适用于服务器初始化

  1. 配置加载:如拉取用户权限、系统参数、缓存预加载等。
  2. 数据预处理:如初始化 热点数据,避免后续频繁数据库查询。
  3. 启动阶段任务:必须确保 数据加载完成 后,服务器才能提供服务

4. 连接池的线程管理

  1. 当前线程获取数据库连接
    数据库连接池 维护 一定数量的数据库连接,避免频繁创建/销毁连接的开销。
    线程 从连接池获取可用连接,执行 SQL 语句,完成后释放回池
  2. 线程安全
    加锁管理连接池,防止多个线程同时访问数据库时发生竞争。
    允许多个线程/协程访问数据库,但最大连接数受 数据库服务器配置 限制
对比项 线程(Thread) 协程(Coroutine)
调度方式 操作系统内核 负责调度 用户态 调度(应用层控制)
切换成本 (涉及内核态切换) (仅寄存器、堆栈切换)
并行性 可以利用多核 CPU 单线程运行,但可以高效调度
适用场景 CPU 计算密集型(如多线程计算、数据处理) I/O 密集型(如爬虫、数据库连接池)
资源占用 线程占用大量系统资源(栈、寄存器) 协程占用资源少(轻量级任务)
创建销毁开销 较大(需要系统调用) 较小(在用户态创建/销毁)
编程模型 传统 多线程编程,容易产生 锁竞争、死锁 异步编程,代码更复杂,但性能更高

5. 为什么服务器初始化要同步,不用异步连接池

  1. 服务器启动必须确保所有依赖数据已准备好
    同步执行可以保证数据完整性
    同步连接池会等待 SQL 查询执行完毕,确保所有关键数据已经正确加载。
    如果使用异步连接池,数据库查询可能在服务已启动后仍未完成,导致数据未准备好,影响服务的稳定性。

  2. 需要阻塞等待初始化完成
    避免并发问题
    服务器初始化阶段不需要高并发处理,只需要保证数据正确性。
    如果用异步连接池,某些数据可能在服务器启动后才返回,导致访问时出现未初始化数据错误

5. 异步连接池

在这里插入图片描述

1. 使用场景

适用于服务器正常运行后,所有涉及数据库查询的业务逻辑都应该采用异步连接池,以提高并发能力和响应速度。

2. 核心特点

核心线程数少:可以是一个或少量几个线程,通过事件驱动处理多个请求。

单位时间内请求数受限:核心线程主要用于处理数据库返回值,而非阻塞等待数据库响应。

高吞吐量:避免阻塞,每个线程可以同时管理多个数据库查询。

3. 线程池机制

  1. 线程池管理任务队列:

任务(SQL 查询请求)进入任务队列;
线程池从任务队列中取出任务执行。

2 .发生阻塞的情况:
任务队列为空 → 线程等待新任务;
等待数据库返回 → 线程会切换处理其他任务,不会阻塞等待。

4. 线程安全保障

多线程环境必须保证线程安全:

采用**互斥锁(Mutex)**防止资源竞争;

采用**原子变量(Atomic)**保证操作的原子性。

5. 任务调度方式

具体执行的SQL 任务与数据库连接无关,可以动态分配连接。
允许多个连接同时访问数据库,提高并发处理能力。

5. 连接池实现

1. 连接池类型

同步连接池:请求阻塞等待数据库返回结果。
异步连接池:请求非阻塞,线程继续处理其他任务,数据库返回结果后进行回调处理。

2. SQL 处理流程

SQL 语句在数据库执行前,会经过以下几个阶段:

  1. 输入 SQL 语句:用户提交 SQL 查询。

  2. 解析阶段:
    词法分析(Lexical Analysis):将 SQL 语句拆分为关键字、标识符、运算符等。
    语法分析(Syntax Analysis):构建 SQL 语法树,检查 SQL 是否符合语法规则。

  3. 查询优化:

过滤器(Filter):剔除无效的 SQL 语句,提升查询效率。
优化器(Optimizer):选择最优的执行计划,提高查询性能。
存储引擎执行 SQL:根据执行计划,调用存储引擎执行查询。

  1. 返回结果:数据库执行完成后,返回查询结果。

3. C++11 连接池异步任务管理

在 C++11 及以上版本,异步任务管理可借助 Promise-Future 机制 实现:
promise 任务执行后存储结果
future 读取 promise 传递的结果

#include <iostream>
#include <future>
#include <thread>

int database_query() {
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟 SQL 查询耗时
    return 42; // 查询结果
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread t([&prom]() {
        int result = database_query();
        prom.set_value(result); // 传递查询结果
    });

    std::cout << "Waiting for database query result..." << std::endl;
    std::cout << "Query result: " << fut.get() << std::endl; // 获取 SQL 结果

    t.join();
    return 0;
}

运行流程
主线程创建 promise 并获取 future。
新线程执行 database_query(),查询完成后调用 prom.set_value(result) 传递查询结果。
主线程阻塞等待 fut.get() 获取 SQL 查询结果

#include <vector>
#include <memory>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <mysql.h>

/**
 * @brief 数据库连接池类模板
 * @tparam T 数据库类型
 */
template <class T>
class DatabaseWorkerPool {
public:
    /**
     * @brief 构造函数
     */
    DatabaseWorkerPool();

    /**
     * @brief 析构函数
     */
    ~DatabaseWorkerPool();

    /**
     * @brief 设置连接信息
     * @param infoString 连接字符串
     * @param asyncThreads 异步连接数
     * @param synchThreads 同步连接数
     */
    void SetConnectionInfo(const std::string& infoString, uint8 asyncThreads, uint8 synchThreads);

    /**
     * @brief 打开连接池
     * @return 错误码,0表示成功
     */
    uint32 Open();

    /**
     * @brief 关闭连接池
     */
    void Close();

    /**
     * @brief 获取一个空闲连接
     * @return 数据库连接指针
     */
    T* GetFreeConnection();

    /**
     * @brief 执行查询
     * @param sql SQL语句
     * @return 查询结果
     */
    QueryResult Query(const char* sql);

private:
    /**
     * @brief 打开指定数量的连接
     * @param type 连接类型(同步/异步)
     * @param numConnections 连接数量
     * @return 错误码,0表示成功
     */
    uint32 OpenConnections(int type, uint8 numConnections);

    /**
     * @brief 获取数据库名称
     * @return 数据库名称
     */
    const char* GetDatabaseName() const;

    // 连接类型枚举
    enum {
        IDX_SYNCH = 0,  // 同步连接
        IDX_ASYNC = 1,  // 异步连接
        IDX_MAX         // 最大索引
    };

    // 连接信息结构体
    struct MySQLConnectionInfo {
        std::string host;
        std::string user;
        std::string password;
        std::string database;
        uint16 port;
        std::string ssl;
    };

    std::unique_ptr<MySQLConnectionInfo> _connectionInfo;  // 连接信息
    std::vector<std::unique_ptr<T>> _connections[IDX_MAX]; // 连接池(同步和异步)
    uint8 _async_threads;  // 异步连接数
    uint8 _synch_threads;  // 同步连接数
    std::mutex _mutex;     // 互斥锁
    std::condition_variable _condition; // 条件变量
};

// 实现部分

template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool()
    : _async_threads(0), _synch_threads(0) {
    // 检查MySQL库是否线程安全
    if (!mysql_thread_safe()) {
        throw std::runtime_error("MySQL library is not thread-safe");
    }
}

template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool() {
    Close();
}

template <class T>
void DatabaseWorkerPool<T>::SetConnectionInfo(const std::string& infoString, 
                                           uint8 asyncThreads, uint8 synchThreads) {
    // 解析连接字符串并存储
    _connectionInfo = std::make_unique<MySQLConnectionInfo>();
    // 这里简化处理,实际应该解析infoString
    _connectionInfo->host = "localhost";
    _connectionInfo->user = "root";
    _connectionInfo->password = "";
    _connectionInfo->database = "test";
    _connectionInfo->port = 3306;

    _async_threads = asyncThreads;
    _synch_threads = synchThreads;
}

template <class T>
uint32 DatabaseWorkerPool<T>::Open() {
    if (!_connectionInfo) {
        throw std::runtime_error("Connection info was not set!");
    }

    // 打开异步连接
    uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
    if (error) return error;

    // 打开同步连接
    error = OpenConnections(IDX_SYNCH, _synch_threads);
    return error;
}

template <class T>
uint32 DatabaseWorkerPool<T>::OpenConnections(int type, uint8 numConnections) {
    std::lock_guard<std::mutex> lock(_mutex);

    for (uint8 i = 0; i < numConnections; ++i) {
        // 创建新连接
        auto connection = std::make_unique<T>();
        
        // 初始化连接
        if (!connection->Open(_connectionInfo.get())) {
            return 1; // 简化错误处理
        }

        // 添加到连接池
        _connections[type].push_back(std::move(connection));
    }

    return 0;
}

template <class T>
void DatabaseWorkerPool<T>::Close() {
    std::lock_guard<std::mutex> lock(_mutex);

    // 关闭所有连接
    for (auto& connList : _connections) {
        connList.clear();
    }
}

template <class T>
T* DatabaseWorkerPool<T>::GetFreeConnection() {
    std::unique_lock<std::mutex> lock(_mutex);

    // 优先从同步连接池获取
    for (auto& conn : _connections[IDX_SYNCH]) {
        if (conn->TryLock()) {
            return conn.get();
        }
    }

    // 如果没有可用的同步连接,等待
    _condition.wait(lock, [this]() {
        for (auto& conn : _connections[IDX_SYNCH]) {
            if (conn->TryLock()) {
                return true;
            }
        }
        return false;
    });

    // 再次尝试获取
    for (auto& conn : _connections[IDX_SYNCH]) {
        if (conn->TryLock()) {
            return conn.get();
        }
    }

    return nullptr; // 不应该执行到这里
}

template <class T>
QueryResult DatabaseWorkerPool<T>::Query(const char* sql) {
    auto connection = GetFreeConnection();
    if (!connection) {
        return QueryResult(nullptr);
    }

    // 执行查询
    ResultSet* result = connection->Query(sql);
    connection->Unlock();
    _condition.notify_one(); // 通知其他等待的线程

    if (!result || !result->GetRowCount() || !result->NextRow()) {
        delete result;
        return QueryResult(nullptr);
    }

    return QueryResult(result);
}

template <class T>
const char* DatabaseWorkerPool<T>::GetDatabaseName() const {
    return _connectionInfo ? _connectionInfo->database.c_str() : "";
}
Client DatabaseWorkerPool ConnectionPool MySQLConnection SetConnectionInfo() 存储连接配置 Open() OpenConnections(IDX_ASYNC) 创建连接对象 连接成功 OpenConnections(IDX_SYNCH) 创建连接对象 连接成功 Query(SQL) GetFreeConnection() 返回连接 等待(_condition.wait) alt [有可用连接] [无可用连接] Execute(SQL) 返回结果 ReleaseConnection() _condition.notify_one() 返回结果 Close() 清理所有连接 Client DatabaseWorkerPool ConnectionPool MySQLConnection