前言:本文将要封装的线程池相关接口是基于之前一篇文章中的实现(文章链接:Linux多线程编程的艺术:封装线程、锁、条件变量和信号量的工程实践-CSDN博客)。不过无需担心,即使您没有阅读过前文,只要具备线程、互斥锁和条件变量等基础知识,完全可以理解本文内容。
目录
一、理解线程池
池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后⾃⼰管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较⼤的开销,不如提前申请好了,这样使⽤时就会变得⾮常快捷,⼤⼤提⾼程序运⾏效率。
线程池的意义:
降低资源消耗:复用已创建的线程,减少线程创建销毁的开销。
提高响应速度:任务到达时可以直接执行,无需等待线程创建。
提高线程可管理性:统一分配、调优和监控线程。
二、框架设计
线程池本质就是一个类,它管理了批量的线程和任务,如何管理?先描述,再组织。我们使用数组来存储线程,队列来存储任务(任务本质就是函数,lambda,或仿函数等)。
其次线程之间需要保持互斥,线程和任务队列需要保持同步,所以还需要锁,条件变量。
核心成员变量:
- 数组:用来存储线程。
- 队列:用来存储任务。
- 锁
- 条件变量
核心成员函数
- HandlerTask:从队列中获取并执行任务,让单个线程执行。
- ThreadPool:构造函数,用于创建批量线程,并将HandlerTask函数交给线程。
- Push:外部给线程池派发任务(即入队)。
- Join:线程等待。
如下:
#define NUM 5 // 线程个数
namespace my_threadpool
{
template <typename T> //任务类型
class ThreadPool
{
public:
ThreadPool(int num = NUM) {}
void HandlerTask()
bool Push(const T &task);
void Join();
private:
vector<thread<T>> _threads;
queue<T> _taskq;
Mutex _lock;
Cond _cond;
};
}
当然这些只是核心成员,在后面实现这些函数时还会延伸出更多的成员变量和成员函数。
线程池相当于生产者消费者模型中的交易场所和消费者这一部分,关于生产者消费者模型:
多线程编程实战:基于阻塞队列与环形队列的生产者消费者模型详解_环形数组 生产者消费者-CSDN博客
三、成员函数实现
构造函数 ThreadPool(int num = NUM)
循环创建num个线程,用lambda包装HandlerTask()作为线程函数,然后push到数组中。
ThreadPool(int num = NUM)
{
for (int i = 0; i < num; i++)
{
_threads.emplace_back([this]()
{
HandlerTask();
});
}
}
这里也可以不使用lambda表达式。
void HandlerTask()
为了代码的健壮性,我们添加一个参数(成员变量_isruning)用来标记线程池是否正在运行,初始化为false,在程序执行到HandlerTask后设为true。这个参数用来防止用户对已经回收的线程池进行任务派发或线程回收。
HandlerTask是单个线程在执行,线程要通过这个函数获取并执行任务,这个逻辑需要重复执行,所以做一个死循环。接下来到队列中取任务,队列是临界资源需要加锁,这里使用RAII模式。其次为了和队列保持同步,当队列为空时需要进行等待。如下:
void HandlerTask()
{
_isrunning = true;
while (true)
{
T tk;
LockGuard lock(_lock);
{
while (_taskq.empty())
{
_cond.Wait(_lock);
}
tk = _taskq.front();
_taskq.pop();
}
tk();
}
}
注意:必须选择循环等待,因为可能会有这样的情况:只派发了一个任务但多个线程一起被唤醒,进行循环判断可以解决。
这里HandlerTask函数实现不是最终形态,在后文会根据需求进行改进。
Push实现
向任务队列推送任务,要保证_isrunning为true,其次访问临界资源要加锁,然后向队列里push任务。因为队列有任务了,接下来需要将线程唤醒,这里我们还是优雅一点,不要粗鲁地直接把线程都唤醒。
添加变量_sleepnum来记录等待的线程个数,如果所有线程都在等待再进行唤醒,并且只唤醒一个就行。
_sleepnum在HandlerTask函数中进行维护,即:
while (_taskq.empty())
{
_sleepnum++;
_cond.Wait(_lock);
_sleepnum--;
}
封装一个函数用来唤醒一个线程,如下:
void WakeOnceThread()
{
_cond.Signal();
}
Push实现如下:
bool Push(const T &task)
{
if (_isrunning)
{
LockGuard lock(_lock);
{
_taskq.push(task);
if (_sleepnum == _threads.size())
WakeOnceThread();// 唤醒一个线程
}
return true;
}
return false;
}
Join实现
线程池的回收,遍历每个线程的join就行,在此之前要满足两个条件:
- 线程池_isrunning为true。
- 保证任务队列里的任务已经做完了,然后让线程执行的函数退出。
对第1点直接用if判断就能解决。
针对第2点,我们可以封装一个Stop函数用来等任务做完后,让线程函数退出。
Stop的实现
Stop函数的主要目的是让线程函数(HandlerTask)退出,所以就需要把全部线程唤醒,让它们往下执行,当然这还不够,我们要让它退出循环判断队空的逻辑,这个时候_isrunning就派上大用处了,把_Stop函数中把isrunning设为false。在HandlerTask函数中如果_isrunning为false则退出函数,当然还要保证把队列任务做完,具体实现如下:
void Stop()
{
_isrunning = false;
WakeAllThread();
cout << "所有线程被唤醒" << endl;
}
void HandlerTask()
{
_isrunning = true;
while (true)
{
T tk;
LockGuard lock(_lock);
{
while (_taskq.empty() && _isrunning)
{
_sleepnum++;
_cond.Wait(_lock);
_sleepnum--;
}
if (!_taskq.empty())
{
tk = _taskq.front();
_taskq.pop();
}
else break;
}
tk();
}
}
Join实现如下:
void Join()
{
if (!_isrunning) return;
Stop();
for (auto &thread : _threads)
{
thread.Join();
cout << "线程" << thread.getname() << "等待成功" << endl;
}
}
Stop,WakeOneThread,WakeAllThread,HandlerTask这些函数都是为实现主函数而延伸的,将这些函数设为私有更为适宜,不要暴露给用户。
四、单例模式设计
对于线程池我们希望它只能创建一份,即单例模式。
单例模式:一个类只能实例化出一个对象,类似⼀个男⼈只能有⼀个媳妇。 单例模式的实现有两种方法:
- 饿汉模式:吃完饭, ⽴刻洗碗, 这种就是饿汉⽅式. 因为下⼀顿吃的时候可以⽴刻拿着碗就能吃饭。
- 懒汉模式:吃完饭, 先把碗放下, 然后下⼀顿饭⽤到这个碗了再洗碗, 就是懒汉⽅式。
饿汉模式会在程序运行后就为线程池开辟空间,创建线程池。而懒汉模式只是做简单的声明,到程序执行到需要用线程池的代码时再进行创建。
这里我们选择使用懒汉模式,因为它延迟加载的特点,可以增加内存资源的利用率(在申请这块内存之前可以先用来存储其他数据,完成很多事情)。
单例模式需要注意的事项:
- 把构造函数设为私有,不让它默认生成拷贝构造函数,拷贝赋值函数。
- 定义全局的静态指针变量(这是针对懒汉模式,饿汉模式直接定义对象实体)。
- 需要定义一个获取单例对象的方法。
在类成员变量中声明:
- static ThreadPool<T> *_pthreadpool;
在类外,命名空间中定义:
- template <typename T>
- ThreadPool<T>* ThreadPool<T>::_pthreadpool = nullptr;
接下来是封装一个获取对象的函数。这个函数有很多细节需要注意:
如果对象已经创建(指针不为空)直接返回,如果不是则进行创建对象,注意这里需要加锁,因为可能有多个线程使用线程池(生产者消费者模型),它们同时执行该函数。加锁后还需要判断指针是否为空,因为加锁只保证了不同时创建,其他线程拿到锁后可能会再次创建。
注意这里的锁是拿给外部线程用的,而不是拿给线程池用的锁,所以同样地把这个锁定义成静态全局变量,如:
在类成员变量中声明:
- static Mutex _lock2;
在类外,命名空间中定义:
- template <typename T>
- Mutex ThreadPool<T>::_lock2;
static ThreadPool<T> *GetTreadPool()
{
if (_pthreadpool == nullptr)
{
LockGuard lock(_lock2);
if (_pthreadpool == nullptr)
{
_pthreadpool = new ThreadPool<T>();
cout << "ThreadPool创建" << endl;
}
}
return _pthreadpool;
}
最后有一点,在调用GetTreadPool成员函数之前是没有对象的,所以不能通过对象来访问,这里就将它设为静态成员函数。
五、测试
测试代码:
using task_t = std::function<void()>;
void Download()
{
cout << "我是一个下载任务..." <<endl;
}
int main()
{
int count=5;
while(count--)
{
ThreadPool<task_t>::GetTreadPool()->Push(Download);
sleep(1);
}
ThreadPool<task_t>::GetTreadPool()->Join();
return 0;
}
测试结果:
六、源码
ThreadPool.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include <vector>
#include "Cond.hpp"
#include "Mutex.hpp"
#include "Thread.hpp"
using namespace std;
using namespace my_cond;
using namespace my_mutex;
using namespace my_thread;
#define NUM 5
namespace my_threadpool
{
template <typename T>
class ThreadPool
{
private:
ThreadPool(const ThreadPool<T> &) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
ThreadPool(int num = NUM)
: _sleepnum(0), _isrunning(false)
{
for (int i = 0; i < num; i++)
{
_threads.emplace_back(
[this]()
{
HandlerTask();
});
}
}
void WakeAllThread()
{
LockGuard lock(_lock);
_cond.Broadcast();
}
void WakeOnceThread()
{
_cond.Signal();
}
void HandlerTask()
{
_isrunning = true;
while (true)
{
T tk;
LockGuard lock(_lock);
{
while (_taskq.empty() && _isrunning)
{
_sleepnum++;
_cond.Wait(_lock);
_sleepnum--;
}
if (!_taskq.empty())
{
tk = _taskq.front();
_taskq.pop();
}
else
break;
}
tk();
}
}
void Stop()
{
_isrunning = false;
WakeAllThread();
cout << "所有线程被唤醒" << endl;
}
~ThreadPool()
{
}
public:
static ThreadPool<T> *GetTreadPool()
{
if (_pthreadpool == nullptr)
{
LockGuard lock(_lock2);
if (_pthreadpool == nullptr)
{
cout << "ThreadPool创建" << endl;
_pthreadpool = new ThreadPool<T>();
}
}
return _pthreadpool;
}
bool Push(const T &task)
{
if (_isrunning)
{
LockGuard lock(_lock);
{
_taskq.push(task);
if (_sleepnum == _threads.size())
WakeOnceThread(); // 唤醒一个线程
}
return true;
}
return false;
}
void Join()
{
if (!_isrunning)
return;
Stop();
for (auto &thread : _threads)
{
thread.Join();
cout << "线程" << thread.getname() << "等待成功" << endl;
}
}
private:
vector<thread> _threads;
queue<T> _taskq;
Mutex _lock;
Cond _cond;
int _sleepnum;
bool _isrunning;
static ThreadPool<T> *_pthreadpool;
static Mutex _lock2;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::_pthreadpool = nullptr;
template <typename T>
Mutex ThreadPool<T>::_lock2;
}
test.cc
#include "ThreadPool.hpp"
#include <functional>
using namespace my_threadpool;
using task_t = std::function<void()>;
void Download()
{
cout << "我是一个下载任务..." <<endl;
}
int main()
{
int count=5;
while(count--)
{
ThreadPool<task_t>::GetTreadPool()->Push(Download);
sleep(1);
}
ThreadPool<task_t>::GetTreadPool()->Join();
return 0;
}
需要其他头文件的小伙伴到下文自取: