简单线程池实现

发布于:2025-04-07 ⋅ 阅读:(23) ⋅ 点赞:(0)

线程池的概念

线程池内部可以预先去进行创建出一批线程,对于每一个线程,它都会周期性的进行我们的任务处理。

线程内部在维护一个任务队列,其中我们外部可以向任务队列里放任务,然后内部的线程从任务队列里取任务,如果任务队列里没有任务时,所有线程全部去休眠,一旦外部生产出来对应的任务,那么就可以唤醒指定的一个或多个线程,让它进行任务的处理。

其实线程池也是一个典型的生产者消费者问题。

线程池是一种线程使用模式。

线程过多会带来调度开销,进而影响缓存局部和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。

线程池的优点

  • 线程池避免了在处理短时间任务时创建与销毁线程的代价。
  • 线程池不仅能够保证内核充分利用,还能防止过分调用。
  • 线程池当中线程的总数是固定的。也就是说,不管未来有多少个任务,线程的数量不会变化,而不是说我有任务就创建线程让线程去处理,这样任务越来越多,线程也会越来越多,这样的话对系统不友好。

注意: 线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景

线程池常见的应用场景如下:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应用户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。

相关解释:

  • 像Web服务器完成网页请求这样的任务,使用线程池技术使非常合适的。因为单个任务小,而任务数量大,你可以想象一个热门网站的点击次数。
  • 处于长时间的任务,比如TeInet连接请求,线程池的优点就不明显了。因为TeInet会话时间比线程的创建时间大多了。
  • 突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存达到极限,出现错误。

线程池的实现

下面我们实现一个简单的线程池,线程池中提供了一个任务队列,以及若干个线程(多线程)。
在这里插入图片描述

  • 线程池中多个线程负责从任务队列中那任务,并将拿到的任务进行处理。
  • 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列中。

设计逻辑

  1. 线程池初始化与启动:
    • 在初始化阶段,线程池会根据给定的线程数创建相应数量的线程,并存储在线程向量中。
    • 这些线程并不会立即启动,而是等到 Start 方法被调用时才会开始运行。
  2. 任务提交:
    • 使用 Push 方法向线程池提交任务。
    • 如果有线程正在等待任务,则会唤醒一个线程来处理新加入的任务。
  3. 线程间通信与同步:
    • 通过互斥锁 _mutex 和条件变量 _cond 实现对任务队列的保护以及线程间的同步。
    • 当队列为空时,工作线程会等待在条件变量上,直到有新的任务被加入或线程池停止运行。
  4. 线程池生命周期管理:
    • 通过 _isRuning 标志控制线程池的运行状态。
    • 通过 StartStop 方法控制线程池的启动和停止。
    • Wait 方法确保所有线程都完成了它们的任务之后才返回。

类结构

  • 模板参数: T 任务类型,是一个可以被调用的对象,如函数、lambda表达式或者实现 operator()的类。
  • 成员变量:
    • _threadNum:线程池中的线程数量。
    • _threads:保存线程池中所有线程的std::vector
    • _task_queue:保存待处理任务的std::queue
    • _mutex:保护_task_queue的互斥锁。
    • _cond:条件变量,用于线程间的同步。
    • _waitNum:当前等待在条件变量上的线程数。
    • _isRunning:标记线程池是否正在运行。

成员函数

  • 构造函数: 初始化线程数量(默认为全局变量 g_defaultThreadNum),互斥锁和条件变量。并生成对应的日志信息。
  • 析构函数: 销毁互斥锁和条件变量。 并生成对应的日志信息。
  • InitThreadPool 创建指定数量的工作线程,每个线程都有一个唯一的名称。线程创建后不会立即开始运行。每个线程通过 std::bind 绑定到 HandlerTask 方法上,并传入线程的名称(用于日志记录)。
  • Start 启动所有之前初始化的线程。这一步才是真正地创建线程。线程启动则代表线程池开始工作了, _isRunning应该被设置为true
  • Stop 线程池结束工作。设置 _isRunningfalse,唤醒所有还在等待的线程,通知线程退出循环。因为修改 _isRunning 会影响还在等待队列中的线程(共享区内部代码),所以最好在修改 _isRunning 前进行加锁。被唤醒的线程,将继续向下执行HandlerTask剩余逻辑。
  • Wait 等待所有线程完成它们的任务并结束。确保线程池完全停止运行。
  • Push 用于向任务队列中添加新任务。如果线程池正在运行,任务将被添加到队列中。如果有现成在等待队列上有任务可用,则唤醒其中一个线程。
  • HandlerTask 线程执行具体任务处理逻辑。它首先检查任务队列是否为空,如果队列为空且线程池仍在运行,则该线程将挂起等待;如果线程池停止运行并且队列为空,线程退出;如果队列中有任务,则取出并执行该任务。

这里为了简化代码,将之前封装的 Thread 中的模板去除,也去除该类中的 _data 属性,线程传入的函数参数则只需要传入 线程名 参数即可。同时因为 InitThreadPool 在创建线程时,通过 std::bind 将Thread对象绑定到 HandlerTask 方法上,这样不为这个对象传递执行任务参数,该对象也能执行 HandlerTask 方法。

// Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__ 

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include <unistd.h>
namespace ThreadModule
{

    using func_t = std::function<void(const std::string& name)>;

    class Thread 
    {
     public:
        Thread(func_t func, const std::string name = "none-name")
            :_func(func), _threadName(name), _stop(true)
        {}

        void Excute()
        {
            _func(_threadName);
        }

        static void* threadRoutine(void* args) 
        {
            Thread* self = static_cast<Thread*>(args);
            self->Excute();
            return nullptr;
        }

        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadRoutine, this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else 
                return false;

        }

        void Detach()
        {
            if(!_stop)
                pthread_detach(_tid);
        }

        void Join()
        {
            if(!_stop)
                pthread_join(_tid, nullptr);
        }

        const std::string& name()
        {
            return _threadName;
        }

        ~Thread()
        {}
     private:
         pthread_t _tid;
         std::string _threadName;
         func_t _func;
         bool _stop;
    };
}

#endif 

为什么线程池中需要有互斥锁和条件变量?

线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。

线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要判断任务队列当中是否有任务,若此时队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。

注意:

  • 当某个线程被唤醒时,其可能是被异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有的线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if。
  • pthread_cond_broadcast函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部等待的线程都唤醒,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。因此在唤醒线程时最好使用 pthread_cond_signal 函数唤醒正在等待的一个线程即可。
  • 当线程从任务队列中拿到任务后,该任务就已经属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后在进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以我们不要将其放到临界区中。
  • 如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。

线程池源代码

#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include "Log.hpp"
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadModule;

const static int g_defaultThreadNum = 3;


template<typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadWakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

public:
    ThreadPool(int threadNum = g_defaultThreadNum) : _threadNum(threadNum), _waitNum(0), _isRunning(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        LOG(INFO, "ThreadPool Construct()");
    }

    void HandlerTask(std::string name)
    {
        LOG(INFO, "%s is running...", name.c_str());
        while(true) // 一直处理任务,直到队列中没有任务,退出
        {
            // 1. 访问任务队列,保证队列安全
            LockQueue();
            // 2. 判断队列中是否有数据
            // 2.1 如果队列为空,并且线程池是工作状态,则该线程等待
            while(_task_queue.empty() && _isRunning == true) 
            {
                _waitNum++;
                ThreadSleep(); // 在条件变量上等待

                // 等待结束,继续判断,防止伪唤醒
                _waitNum--;
            }

            // 2.2 如果队列为空,并且线程池退出,则退出循环
            if(_task_queue.empty() && _isRunning == false)
            {
                UnlockQueue();
                break;                
            }

            // 2.3 如果队列不为空,并且线程池退出,此时要先将队列中的任务完成,才能退出循环
            // 2.4 如果队列不为空,并且线程池未退出,取出任务,完成
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            LOG(DEBUG, "%s, get a task", name.c_str());

            // 3. 执行任务
            t(); // 重载(), 也可以实现一个方法Run, t.Run()
            LOG(DEBUG, "%s has finished a task, result is %s", name.c_str(), t.ResultToString().c_str());
        }
    }

    void InitThreadPool()
    {
        for(int i = 0; i < _threadNum; i++)
        {
            std::string name = "thread-" + std::to_string(i + 1);
            _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
            LOG(INFO, "init thread %s done", name.c_str());
        }
    }

    void Start()
    {
        _isRunning = true;
        for(auto& thread: _threads)
        {
            thread.Start();
        }
    }

    bool Push(const T& t)
    {
        bool ret = false;
        LockQueue();
        if(_isRunning == true)
        {
            _task_queue.push(t);
            if(_waitNum > 0)
                ThreadWakeup();

            LOG(DEBUG, "Push task success");
            ret = true;
        }
        UnlockQueue();
        return ret;
    }

    void Stop()
    {
        LockQueue();
        _isRunning = false;
        ThreadWakeupAll();
        UnlockQueue();
    }

    void Wait()
    {
        for(auto& thread:_threads)
        {
            thread.Join();
            LOG(INFO, "%s is quit...", thread.name().c_str());
        }
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        LOG(INFO, "ThreadPool Destruct()");
    }
private:
    int _threadNum;
    std::vector<Thread> _threads;
    std::queue<T> _task_queue;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    int _waitNum;
    bool _isRunning;
};

其余详情源代码查看:源代码

结果展示:
在这里插入图片描述


今天的分享就到这里了,如果,你感觉这篇博客对你有帮助的话,就点个赞吧!感谢感谢……