Qt 线程池设计与实现

发布于:2025-07-30 ⋅ 阅读:(18) ⋅ 点赞:(0)

在 Qt 多线程编程中,线程池是一种高效管理和复用线程的技术,能够减少线程创建和销毁的开销,提高应用性能和响应性。本文将深入探讨 Qt 线程池的设计与实现,包括核心组件、任务调度、线程管理和性能优化等方面。

一、线程池基本原理

1. 线程池核心组件
  • 工作线程:预先创建的一组线程,等待执行任务
  • 任务队列:存储待执行的任务,线程从队列中获取任务执行
  • 管理器:负责线程的创建、销毁和任务的分配调度
2. 线程池工作流程
  1. 初始化时创建一定数量的工作线程
  2. 外部提交任务到任务队列
  3. 工作线程从任务队列获取任务并执行
  4. 任务执行完毕后,线程返回线程池等待下一个任务
  5. 根据系统负载和任务特性,动态调整线程数量

二、简单线程池实现

1. 任务接口定义
class Runnable {
public:
    virtual ~Runnable() {}
    virtual void run() = 0;
};
2. 线程池实现
class ThreadPool {
public:
    explicit ThreadPool(int threadCount = QThread::idealThreadCount())
        : m_stop(false) {
        // 创建工作线程
        for (int i = 0; i < threadCount; ++i) {
            QThread *thread = new QThread(this);
            thread->start();
            m_threads.append(thread);
            
            // 创建工作者并移动到线程
            Worker *worker = new Worker(&m_taskQueue, &m_mutex, &m_condition, &m_stop);
            worker->moveToThread(thread);
            
            // 连接信号槽
            connect(worker, &Worker::taskFinished, this, &ThreadPool::taskFinished);
            m_workers.append(worker);
            
            // 启动工作者
            QMetaObject::invokeMethod(worker, "run", Qt::QueuedConnection);
        }
    }
    
    ~ThreadPool() {
        // 停止线程池
        {
            QMutexLocker locker(&m_mutex);
            m_stop = true;
            m_condition.wakeAll();
        }
        
        // 等待所有线程完成
        foreach (QThread *thread, m_threads) {
            thread->quit();
            thread->wait();
        }
    }
    
    void enqueueTask(Runnable *task) {
        QMutexLocker locker(&m_mutex);
        m_taskQueue.enqueue(task);
        m_condition.wakeOne();
    }
    
signals:
    void taskFinished(Runnable *task);
    
private:
    QList<QThread*> m_threads;
    QList<Worker*> m_workers;
    QQueue<Runnable*> m_taskQueue;
    QMutex m_mutex;
    QWaitCondition m_condition;
    bool m_stop;
};
3. 工作者实现
class Worker : public QObject {
    Q_OBJECT
public:
    explicit Worker(QQueue<Runnable*> *taskQueue, QMutex *mutex, 
                  QWaitCondition *condition, bool *stop, QObject *parent = nullptr)
        : QObject(parent), m_taskQueue(taskQueue), m_mutex(mutex), 
          m_condition(condition), m_stop(stop) {}
    
public slots:
    void run() {
        while (true) {
            Runnable *task = nullptr;
            
            {
                QMutexLocker locker(m_mutex);
                
                // 等待任务或停止信号
                while (m_taskQueue->isEmpty() && !*m_stop) {
                    m_condition->wait(m_mutex);
                }
                
                // 检查是否需要停止
                if (*m_stop && m_taskQueue->isEmpty()) {
                    return;
                }
                
                // 获取任务
                task = m_taskQueue->dequeue();
            }
            
            // 执行任务
            if (task) {
                task->run();
                emit taskFinished(task);
            }
        }
    }
    
signals:
    void taskFinished(Runnable *task);
    
private:
    QQueue<Runnable*> *m_taskQueue;
    QMutex *m_mutex;
    QWaitCondition *m_condition;
    bool *m_stop;
};

三、高级线程池实现

1. 带优先级的任务队列
template <typename T>
class PriorityQueue {
public:
    void enqueue(const T &task, int priority = 0) {
        QMutexLocker locker(&m_mutex);
        
        // 按优先级插入任务
        for (auto it = m_tasks.begin(); it != m_tasks.end(); ++it) {
            if (priority > it->second) {
                m_tasks.insert(it, qMakePair(task, priority));
                return;
            }
        }
        
        // 优先级最低,插入到末尾
        m_tasks.append(qMakePair(task, priority));
        m_condition.wakeOne();
    }
    
    T dequeue() {
        QMutexLocker locker(&m_mutex);
        
        // 等待任务
        while (m_tasks.isEmpty()) {
            m_condition.wait(&m_mutex);
        }
        
        // 获取最高优先级任务
        T task = m_tasks.takeFirst().first();
        return task;
    }
    
    bool isEmpty() const {
        QMutexLocker locker(&m_mutex);
        return m_tasks.isEmpty();
    }
    
private:
    QList<QPair<T, int>> m_tasks;
    mutable QMutex m_mutex;
    QWaitCondition m_condition;
};
2. 动态调整线程数量的线程池
class DynamicThreadPool {
public:
    explicit DynamicThreadPool(int minThreads = 4, int maxThreads = 16)
        : m_minThreads(minThreads), m_maxThreads(maxThreads), 
          m_activeThreads(0), m_stop(false) {
        // 创建最小数量的线程
        for (int i = 0; i < minThreads; ++i) {
            createWorker();
        }
    }
    
    ~DynamicThreadPool() {
        // 停止线程池
        {
            QMutexLocker locker(&m_mutex);
            m_stop = true;
            m_condition.wakeAll();
        }
        
        // 等待所有线程完成
        foreach (QThread *thread, m_threads) {
            thread->quit();
            thread->wait();
        }
    }
    
    void enqueueTask(Runnable *task) {
        QMutexLocker locker(&m_mutex);
        
        // 如果任务队列太长且线程数未达到最大值,创建新线程
        if (m_taskQueue.size() > m_activeThreads && m_threads.size() < m_maxThreads) {
            createWorker();
        }
        
        m_taskQueue.enqueue(task);
        m_condition.wakeOne();
    }
    
private:
    void createWorker() {
        QThread *thread = new QThread(this);
        thread->start();
        m_threads.append(thread);
        
        Worker *worker = new Worker(&m_taskQueue, &m_mutex, &m_condition, 
                                  &m_stop, &m_activeThreads);
        worker->moveToThread(thread);
        
        connect(worker, &Worker::taskStarted, this, &DynamicThreadPool::taskStarted);
        connect(worker, &Worker::taskFinished, this, &DynamicThreadPool::taskFinished);
        connect(worker, &Worker::idleTimeout, this, &DynamicThreadPool::workerIdleTimeout);
        
        m_workers.append(worker);
        QMetaObject::invokeMethod(worker, "run", Qt::QueuedConnection);
    }
    
    void workerIdleTimeout(Worker *worker) {
        QMutexLocker locker(&m_mutex);
        
        // 如果线程数超过最小值,删除空闲线程
        if (m_threads.size() > m_minThreads) {
            int index = m_workers.indexOf(worker);
            if (index >= 0) {
                QThread *thread = m_threads.takeAt(index);
                m_workers.takeAt(index)->deleteLater();
                
                thread->quit();
                thread->wait();
                thread->deleteLater();
            }
        }
    }
    
private:
    QList<QThread*> m_threads;
    QList<Worker*> m_workers;
    QQueue<Runnable*> m_taskQueue;
    QMutex m_mutex;
    QWaitCondition m_condition;
    int m_minThreads;
    int m_maxThreads;
    int m_activeThreads;
    bool m_stop;
};

四、使用 QtConcurrent 的线程池

1. QtConcurrent 基础用法
#include <QtConcurrent>

// 定义一个函数
int compute(int value) {
    // 模拟耗时计算
    QThread::sleep(1);
    return value * value;
}

// 使用 QtConcurrent 运行函数
void useQtConcurrent() {
    // 在全局线程池中运行任务
    QFuture<int> future = QtConcurrent::run(compute, 42);
    
    // 可以继续执行其他代码...
    
    // 获取结果
    int result = future.result();
    qDebug() << "Result:" << result;
}

// 并行处理容器中的元素
void processContainer() {
    QList<int> inputList = {1, 2, 3, 4, 5};
    
    // 并行处理每个元素
    QFuture<void> future = QtConcurrent::map(inputList, [](int &value) {
        value = value * value;
    });
    
    // 等待所有任务完成
    future.waitForFinished();
    
    qDebug() << "Processed list:" << inputList;
}
2. 使用 QFutureWatcher 监控任务
void useFutureWatcher() {
    // 创建并启动任务
    QFuture<int> future = QtConcurrent::run([]() {
        // 模拟耗时操作
        QThread::sleep(3);
        return 42;
    });
    
    // 创建监听器
    QFutureWatcher<int> *watcher = new QFutureWatcher<int>(this);
    
    // 连接信号槽
    connect(watcher, &QFutureWatcher<int>::finished, this, [this, watcher]() {
        int result = watcher->result();
        qDebug() << "Task finished with result:" << result;
        watcher->deleteLater();
    });
    
    // 设置监听器
    watcher->setFuture(future);
}

五、线程池性能优化

1. 任务队列优化
// 使用无锁队列提高性能
template <typename T>
class LockFreeQueue {
public:
    void enqueue(const T &value) {
        Node *newNode = new Node(value);
        Node *oldTail = m_tail.load();
        
        while (!m_tail.compare_exchange_weak(oldTail, newNode)) {
            // CAS 失败,重试
        }
        
        oldTail->next = newNode;
    }
    
    bool dequeue(T &value) {
        Node *oldHead = m_head.load();
        
        if (oldHead == m_tail.load()) {
            return false;  // 队列为空
        }
        
        Node *newHead = oldHead->next;
        
        if (m_head.compare_exchange_weak(oldHead, newHead)) {
            value = oldHead->data;
            delete oldHead;
            return true;
        }
        
        return false;  // CAS 失败,重试
    }
    
private:
    struct Node {
        T data;
        Node *next;
        
        explicit Node(const T &value) : data(value), next(nullptr) {}
    };
    
    std::atomic<Node*> m_head;
    std::atomic<Node*> m_tail;
};
2. 线程亲和性设置
// 设置线程亲和性
void setThreadAffinity(QThread *thread, int cpuCore) {
    #ifdef Q_OS_LINUX
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpuCore, &cpuset);
    pthread_setaffinity_np(thread->threadId(), sizeof(cpu_set_t), &cpuset);
    #endif
}

// 在线程启动时设置亲和性
void Worker::run() {
    // 设置线程亲和性
    setThreadAffinity(QThread::currentThread(), m_cpuCore);
    
    // 线程执行代码
    // ...
}

六、线程池最佳实践

1. 任务分割策略
// 将大任务分割为多个小任务
void splitTask() {
    const int taskCount = 100;
    const int batchSize = 10;
    
    for (int i = 0; i < taskCount; i += batchSize) {
        threadPool->enqueueTask(new BatchTask(i, batchSize));
    }
}

// 批处理任务实现
class BatchTask : public Runnable {
public:
    BatchTask(int startIndex, int count)
        : m_startIndex(startIndex), m_count(count) {}
    
    void run() override {
        for (int i = 0; i < m_count; ++i) {
            // 处理单个任务
            processItem(m_startIndex + i);
        }
    }
    
private:
    int m_startIndex;
    int m_count;
};
2. 线程池大小配置
// 根据 CPU 核心数配置线程池大小
int determineThreadPoolSize() {
    int coreCount = QThread::idealThreadCount();
    
    // I/O 密集型任务可以设置比 CPU 核心数大的线程数
    if (isIoIntensive()) {
        return coreCount * 2;
    }
    
    // CPU 密集型任务通常设置为 CPU 核心数
    return coreCount;
}

七、总结

线程池是 Qt 多线程编程中的重要技术,能够有效管理线程资源,提高应用性能。本文介绍了线程池的基本原理、实现方法和优化策略,主要内容包括:

  1. 线程池核心组件:工作线程、任务队列和管理器
  2. 线程池实现方式:从简单实现到动态调整线程数量的高级实现
  3. QtConcurrent 线程池:Qt 提供的高级线程池接口
  4. 性能优化技术:无锁队列、线程亲和性设置等
  5. 最佳实践:任务分割、合理配置线程池大小等

在实际开发中,应根据应用的特点和需求选择合适的线程池实现方式,并通过性能测试不断优化线程池配置,以达到最佳的性能和资源利用率。


网站公告

今日签到

点亮在社区的每一天
去签到