线程池的概念
线程池内部可以预先去进行创建出一批线程,对于每一个线程,它都会周期性的进行我们的任务处理。
线程内部在维护一个任务队列,其中我们外部可以向任务队列里放任务,然后内部的线程从任务队列里取任务,如果任务队列里没有任务时,所有线程全部去休眠,一旦外部生产出来对应的任务,那么就可以唤醒指定的一个或多个线程,让它进行任务的处理。
其实线程池也是一个典型的生产者消费者问题。
线程池是一种线程使用模式。
线程过多会带来调度开销,进而影响缓存局部和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
线程池的优点
- 线程池避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核充分利用,还能防止过分调用。
- 线程池当中线程的总数是固定的。也就是说,不管未来有多少个任务,线程的数量不会变化,而不是说我有任务就创建线程让线程去处理,这样任务越来越多,线程也会越来越多,这样的话对系统不友好。
注意: 线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景
线程池常见的应用场景如下:
- 需要大量的线程来完成任务,且完成任务的时间比较短。
- 对性能要求苛刻的应用,比如要求服务器迅速响应用户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
相关解释:
- 像Web服务器完成网页请求这样的任务,使用线程池技术使非常合适的。因为单个任务小,而任务数量大,你可以想象一个热门网站的点击次数。
- 处于长时间的任务,比如TeInet连接请求,线程池的优点就不明显了。因为TeInet会话时间比线程的创建时间大多了。
- 突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存达到极限,出现错误。
线程池的实现
下面我们实现一个简单的线程池,线程池中提供了一个任务队列,以及若干个线程(多线程)。
- 线程池中多个线程负责从任务队列中那任务,并将拿到的任务进行处理。
- 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列中。
设计逻辑
- 线程池初始化与启动:
- 在初始化阶段,线程池会根据给定的线程数创建相应数量的线程,并存储在线程向量中。
- 这些线程并不会立即启动,而是等到
Start
方法被调用时才会开始运行。
- 任务提交:
- 使用
Push
方法向线程池提交任务。 - 如果有线程正在等待任务,则会唤醒一个线程来处理新加入的任务。
- 使用
- 线程间通信与同步:
- 通过互斥锁
_mutex
和条件变量_cond
实现对任务队列的保护以及线程间的同步。 - 当队列为空时,工作线程会等待在条件变量上,直到有新的任务被加入或线程池停止运行。
- 通过互斥锁
- 线程池生命周期管理:
- 通过
_isRuning
标志控制线程池的运行状态。 - 通过
Start
和Stop
方法控制线程池的启动和停止。 Wait
方法确保所有线程都完成了它们的任务之后才返回。
- 通过
类结构
- 模板参数:
T
任务类型,是一个可以被调用的对象,如函数、lambda表达式或者实现operator()
的类。 - 成员变量:
_threadNum
:线程池中的线程数量。_threads
:保存线程池中所有线程的std::vector
。_task_queue
:保存待处理任务的std::queue
。_mutex
:保护_task_queue
的互斥锁。_cond
:条件变量,用于线程间的同步。_waitNum
:当前等待在条件变量上的线程数。_isRunning
:标记线程池是否正在运行。
成员函数
- 构造函数: 初始化线程数量(默认为全局变量
g_defaultThreadNum
),互斥锁和条件变量。并生成对应的日志信息。 - 析构函数: 销毁互斥锁和条件变量。 并生成对应的日志信息。
InitThreadPool
: 创建指定数量的工作线程,每个线程都有一个唯一的名称。线程创建后不会立即开始运行。每个线程通过std::bind
绑定到HandlerTask
方法上,并传入线程的名称(用于日志记录)。Start
: 启动所有之前初始化的线程。这一步才是真正地创建线程。线程启动则代表线程池开始工作了,_isRunning
应该被设置为true
。Stop
: 线程池结束工作。设置_isRunning
为false
,唤醒所有还在等待的线程,通知线程退出循环。因为修改_isRunning
会影响还在等待队列中的线程(共享区内部代码),所以最好在修改_isRunning
前进行加锁。被唤醒的线程,将继续向下执行HandlerTask
剩余逻辑。Wait
: 等待所有线程完成它们的任务并结束。确保线程池完全停止运行。Push
: 用于向任务队列中添加新任务。如果线程池正在运行,任务将被添加到队列中。如果有现成在等待队列上有任务可用,则唤醒其中一个线程。HandlerTask
: 线程执行具体任务处理逻辑。它首先检查任务队列是否为空,如果队列为空且线程池仍在运行,则该线程将挂起等待;如果线程池停止运行并且队列为空,线程退出;如果队列中有任务,则取出并执行该任务。
这里为了简化代码,将之前封装的 Thread 中的模板去除,也去除该类中的 _data 属性,线程传入的函数参数则只需要传入 线程名 参数即可。同时因为 InitThreadPool
在创建线程时,通过 std::bind
将Thread对象绑定到 HandlerTask
方法上,这样不为这个对象传递执行任务参数
,该对象也能执行 HandlerTask
方法。
// Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include <unistd.h>
namespace ThreadModule
{
using func_t = std::function<void(const std::string& name)>;
class Thread
{
public:
Thread(func_t func, const std::string name = "none-name")
:_func(func), _threadName(name), _stop(true)
{}
void Excute()
{
_func(_threadName);
}
static void* threadRoutine(void* args)
{
Thread* self = static_cast<Thread*>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadRoutine, this);
if(!n)
{
_stop = false;
return true;
}
else
return false;
}
void Detach()
{
if(!_stop)
pthread_detach(_tid);
}
void Join()
{
if(!_stop)
pthread_join(_tid, nullptr);
}
const std::string& name()
{
return _threadName;
}
~Thread()
{}
private:
pthread_t _tid;
std::string _threadName;
func_t _func;
bool _stop;
};
}
#endif
为什么线程池中需要有互斥锁和条件变量?
线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。
线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要判断任务队列当中是否有任务,若此时队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。
注意:
- 当某个线程被唤醒时,其可能是被异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有的线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足唤醒条件,所以在判断任务队列是否为空时,应该使用
while
进行判断,而不是if。 pthread_cond_broadcast
函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部等待的线程都唤醒,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。因此在唤醒线程时最好使用pthread_cond_signal
函数唤醒正在等待的一个线程即可。- 当线程从任务队列中拿到任务后,该任务就已经属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后在进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以我们不要将其放到临界区中。
- 如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。
线程池源代码
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include "Log.hpp"
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadModule;
const static int g_defaultThreadNum = 3;
template<typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeupAll()
{
pthread_cond_broadcast(&_cond);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
public:
ThreadPool(int threadNum = g_defaultThreadNum) : _threadNum(threadNum), _waitNum(0), _isRunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
LOG(INFO, "ThreadPool Construct()");
}
void HandlerTask(std::string name)
{
LOG(INFO, "%s is running...", name.c_str());
while(true) // 一直处理任务,直到队列中没有任务,退出
{
// 1. 访问任务队列,保证队列安全
LockQueue();
// 2. 判断队列中是否有数据
// 2.1 如果队列为空,并且线程池是工作状态,则该线程等待
while(_task_queue.empty() && _isRunning == true)
{
_waitNum++;
ThreadSleep(); // 在条件变量上等待
// 等待结束,继续判断,防止伪唤醒
_waitNum--;
}
// 2.2 如果队列为空,并且线程池退出,则退出循环
if(_task_queue.empty() && _isRunning == false)
{
UnlockQueue();
break;
}
// 2.3 如果队列不为空,并且线程池退出,此时要先将队列中的任务完成,才能退出循环
// 2.4 如果队列不为空,并且线程池未退出,取出任务,完成
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
LOG(DEBUG, "%s, get a task", name.c_str());
// 3. 执行任务
t(); // 重载(), 也可以实现一个方法Run, t.Run()
LOG(DEBUG, "%s has finished a task, result is %s", name.c_str(), t.ResultToString().c_str());
}
}
void InitThreadPool()
{
for(int i = 0; i < _threadNum; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
LOG(INFO, "init thread %s done", name.c_str());
}
}
void Start()
{
_isRunning = true;
for(auto& thread: _threads)
{
thread.Start();
}
}
bool Push(const T& t)
{
bool ret = false;
LockQueue();
if(_isRunning == true)
{
_task_queue.push(t);
if(_waitNum > 0)
ThreadWakeup();
LOG(DEBUG, "Push task success");
ret = true;
}
UnlockQueue();
return ret;
}
void Stop()
{
LockQueue();
_isRunning = false;
ThreadWakeupAll();
UnlockQueue();
}
void Wait()
{
for(auto& thread:_threads)
{
thread.Join();
LOG(INFO, "%s is quit...", thread.name().c_str());
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
LOG(INFO, "ThreadPool Destruct()");
}
private:
int _threadNum;
std::vector<Thread> _threads;
std::queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
int _waitNum;
bool _isRunning;
};
其余详情源代码查看:源代码
结果展示:
今天的分享就到这里了,如果,你感觉这篇博客对你有帮助的话,就点个赞吧!感谢感谢……