目录
前言:
昨天我们完成了对日志功能的实现,但实际上昨天的代码只是为了给今天我们实现线程池提供助力。
话不多说,让我们开始在系统部分的最后一舞:线程池的实现吧!!
我们要先来了解一下什么是线程池。
一、什么是线程池
线程池是一种多线程处理形式,它预先创建一组线程并管理它们的生命周期,用于执行大量短期异步任务。
他与线程池的设计模式是一样的,二者都是遵循的消费者生产者模型。
大家可以把我们的进程池代码拿过来看,二者其实还是挺相似的。
那么,接下来我们就来实现一下这个线程池,以便帮助大家更好的理解。
二、线程池的实现
我们接下来就来实现一下我们的线程池代码。
首先还是老规矩,我们把我们的命名空间与需要用到的头文件之类的全部加上去。
随后我们先创建一个ThreadPool类
我们的线程池里面首先先把最基础的框架写出来,最后一步一步的来完善具体的接口。
线程池,线程池,那里面一定会有很多线程,所以我们可以先定义一个变量num表示这个线程池里的线程数目,如果可以,我们可以定义一个缺省的值,在线程池进行构造的时候就创建出我们的相应数量的线程。
随后将我们的一系列的接口,比如增加任务,启动线程池,停止线程池的接口名写出来。
这么多线程如何管理呢?
如同我们实现队列一样,我们可以通过容器来进行管理,我们这里就通过vector数组来进行存储,并且,我们创建一个智能指针来对这个线程进行管理(我们这里所使用的线程,条件变量等,都是我们自己封装的),所以我们的代码可以先暂时写成这样:
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once
#include "log.hpp"
#include "sem.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include "Mythread.hpp"
#include <vector>
namespace ThreadPoolModule
{
using thread_t = std::shared_ptr<ThreadModule::Mythread>;
using namespace LogModule;
using namespace SemMudule;
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
static int default_num = 5;
class ThreadPool
{
public:
ThreadPool(int num = default_num)
: _num(num)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>());
}
}
void Start()//线程池的启动
{
}
void Equeue()//新增任务
{
}
void Wait()//线程池的停止
{
}
private:
std::vector<thread_t> _threadpool;
int _num;
};
}
#endif
2.1、线程池的启动与等待停止
我们创建线程池对象的时候会自动调用构造函数,并给我们分配五个线程,那我们怎么样启动我们的线程池开始工作呢?
这里我们就要完善我们的Start接口了。
如何启动?思路是什么?很简单,我们调用的是我们封装的线程类,所以我们可以直接for循环启动啊!!
void Start()
{
for(auto &it:_threadpool)
{
it->start();
}
}
为了测试我们的启动是否成功,我们可以在创建线程的智能指针时传进去一个func_t = std::function<void()>;该类型的参数(这个代码我们是在线程的封装时写进去的),为了简便,我们可以直接写一个lambda表达式:
ThreadPool(int num = default_num)
: _num(num)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>([]()
{
while(true)
{
ENABLE_CONSOLE_LOG();
LOG(LogLevel::INFO)<<"线程池线程开始运行";
sleep(1);
}
}));
}
}
void Start()
{
for(auto &it:_threadpool)
{
it->start();
}
}
test.cc:
#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
int main()
{
ThreadPool tp(3);
tp.Start();
sleep(10);
return 0;
}
运行结果为:
这意味着我们的线程其实就跑起来了.
我们还可以继续往我们的构造函数中写日志来打印相关信息。
与线程池的启动类似,我们同样可以通过一个for循环,来调用封装的线程内部的Join函数接口,实现等待线程。
void Wait()
{
for(auto &it:_threadpool)
{
it->join();
LOG(LogLevel::INFO)<<"等待线程"<<it->Name()<<"...成功";
}
}
2.2、线程池的新增任务
作为一个线程池,你必须实现从外界获取任务并让内部线程调用的功能。
为了管理我们这新增的任务,我们可以使用队列来收集,所以新增任务就可以变成队列的插入。
但是,为了让我们的线程去自动的获取任务,执行任务,我们需要实现一个回调函数,我们会把这个回调函数在线程创建时传给线程,这样每个线程会去运行回调函数,并在回调函数中通过一系列我们之前所学的知识调用任务,运行任务。
我们这里统一的把任务类型规定为
using task_t=std::function<void()>;
并手动实现一个模范任务为:
namespace TaskModule
{
using namespace LogModule;
using task_t=std::function<void()>;
void task()
{
LOG(LogLevel::INFO)<<"任务开始执行...";
}
}
随后新增我们的任务队列变量:
std::queue<T> _taskqueue;
这里我们选择使用模板,也就是说把线程池变成一个模板,方便我们后面执行不同返回值的任务,而不是一味的只能执行task_t
这里的模板划分是根据我们的任务的类型来划分的。
所以涉及到新增任务的Equeue接口自然要使用T来接收可能的任务类型:
void Equeue(T && task)//我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
//所以我们这里要使用模板参数T
//这里的T && task是一个语法:引用折叠
//如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
//如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
//如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
{
LockGuard lock(_mutex);
_taskqueue.push(std::forward<T>(task));//我们这里使用std::forward来进行完美转发
}
这里需要上锁,是因为可能会有多个线程并发访问我们的共享区资源:_taskqueue(比如取任务的线程与新增任务撞车了)。
另外,我们可以先把回调函数声明一下,等会进行实现,并且在线程创建时把我们之前暂时传递的lambda表达式改成我们的回调函数
这是我们现在已经写出的代码,供大家查漏补缺:
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once
#include "log.hpp"
#include "sem.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include <unistd.h>
#include "Mythread.hpp"
#include <queue>
#include <vector>
#include <functional>
namespace TaskModule
{
using namespace LogModule;
using task_t = std::function<void()>;
void task()
{
LOG(LogLevel::INFO) << "任务开始执行...";
}
}
namespace ThreadPoolModule
{
using thread_t = std::shared_ptr<ThreadModule::Mythread>;
using namespace LogModule;
using namespace TaskModule;
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
static int default_num = 5;
template <typename T>
class ThreadPool
{
private:
void HandlerTask() // 回调函数,负责在线程初始化时调用
{
while (true)
{
}
}
public:
ThreadPool(int num = default_num)
: _num(num)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this)));
//我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
//我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
//我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
}
}
void Start()
{
for (auto &it : _threadpool)
{
LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
it->start();
}
}
void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
// 所以我们这里要使用模板参数T
// 这里的T && task是一个语法:引用折叠
// 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
// 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
// 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
{
LockGuard lock(_mutex);
_taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
LOG(LogLevel::INFO) << "任务入队列成功";
}
void Wait()
{
for (auto &it : _threadpool)
{
it->join();
LOG(LogLevel::INFO) << "回收线程" << it->Name() << "...成功";
}
}
private:
std::vector<thread_t> _threadpool; // 线程管理数组
std::queue<T> _taskqueue; // 任务队列
int _num; // 线程数量
Mutex _mutex; // 锁
};
}
#endif
目前为止需要注意的点是Equeue时新增的任务需要使用&&,包含了一个小语法引用折叠 。
以及我们想要让线程调用类成员方法,需要进行绑定,提供this指针
2.3、线程池的回调函数实现
我们接下来实现一下线程池的回调函数接口,这个函数就是我们之前写的逻辑,在while循环中不断判断是否有任务,有的话就执行,没的话就进入条件变量中等待。
所以在这里我们需要用到条件变量cond,锁,进行等待的线程数量,以及一个判断任务队列是否为空的小接口:
Cond _cond;
int wait_num; // 进行等待的线程数量
bool IsEmpty()//判断任务队列是否为空
{
return _taskqueue.empty();
}
void HandlerTask() // 回调函数,负责在线程初始化时调用
{
while (true)
{
T t;
{
LockGuard lock(_mutex);
while (IsEmpty())
{
wait_num++;
_cond.wait(_mutex);
wait_num--;
}
t=_taskqueue.front();//拿出任务
_taskqueue.pop();//删除任务
}
//执行任务不用上锁
t();//执行任务,我们这里规定传进来的任务必须重载()运算符
}
}
值得注意是是我们加入了一个新参数wait_num来记录此时等待的线程数量。
根据我们所学的消费者生产者模型,可以知道,这些等待的线程如何被唤醒?
当然是生产者生产之后,此时一定有数据,所以我们需要对Equeue进行补充,在新增任务后,判断waitnum的数量,进行唤醒工作。
void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
// 所以我们这里要使用模板参数T
// 这里的T && task是一个语法:引用折叠
// 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
// 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
// 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
{
LockGuard lock(_mutex);
_taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
LOG(LogLevel::INFO) << "任务入队列成功";
if(wait_num>0)
{
_cond.SignalAll();//唤醒所有线程
}
}
此时我们写一个主函数代码来测试:
#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
using namespace TaskModule;
int main()
{
ENABLE_CONSOLE_LOG();
// ENABLE_FILE_LOG();
std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();
tp->Start();
int cnt = 10;
char c;
while (true)
{
tp->Equeue(TaskModule::task);
cnt--;
sleep(1);
}
return 0;
}
运行结果为:
2.4、线程池的停止运行
任务新增完毕了,或者我们中途想体质线程池的运行,此时我们应该怎么做呢?
我们要完成我们线程池的Stop()接口,通过调用它来实现线程池的退出程序,最后通过Wait来回收线程。
如何做到?我们应该如何思考。
我们可以通过一个标记变量,来进行判断此时的线程池的运行状态。
bool is_running; // 线程池是否在运行
在初始化时对这个标记变量初始化为false表示运行状态,此时还未运行,真正的运行是在start时,如果我们调用stop,可以先判断是否是出于运行状态,涉及到的函数改变:
ThreadPool(int num = default_num)
: _num(num), is_running(false),wait_num(0)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this,std::placeholders::_1)));
// 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
// 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
// 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
}
}
void Start()
{
is_running=true;
for (auto &it : _threadpool)
{
LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
it->start();
}
}
当我们第一次进入stop,进行判断,结果为true表示还在运行,我们就执行停止工作,将状态置为false,此时我们就不能再新入任务了,所以在Equeue中应该在最前面先判断状态,此外,在stop中,由于我们执行stop时还有历史任务未完成,所以此时我们需要先唤醒所有的线程(如果有),让它们快速把任务执行完,于是全部陷入阻塞状态。
之后我们就可以调用wait进行线程回收了。
涉及到的代码改变为:
void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
// 所以我们这里要使用模板参数T
// 这里的T && task是一个语法:引用折叠
// 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
// 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
// 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
{
LockGuard lock(_mutex);
if(!is_running)
{
return;//如果不为运行状态,就不能新入任务
}
_taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
LOG(LogLevel::INFO) << "任务入队列成功";
if(wait_num>0)
{
_cond.SignalAll();//唤醒所有线程
}
}
void Stop()
{
if(is_running)
{
is_running = false;
//此时不能再入任务队列了
if(wait_num>0)
{
_cond.SignalAll();//唤醒所有线程
}
}
}
值得注意的是,我们的回调函数也会受到运行状态的影响。
我们进行等待判断的逻辑就应该变为:如果任务队列为空,并且我们是在运行状态,我们才能进行等待,如果不是运行状态,但此时没有任务了,我们也要继续执行,因为我们需要在下面新增一个判断。
在我们从任务队列拿任务之前,我们需要判断:此时任务队列是否为空&&是否不再运行。
如果此时该条件中的两个都满足,表示任务执行完毕,应该结束回调函数,所以我们进行break。
此时在本条判断中,不可能出现任务为空,还在运行的情况。因为这样就在上条while循环中出不来了。
所以只会另外出现二种情况:任务不为空,且此时还在运行;任务不为空,且不在运行。
这两种情况我们都需要把任务做完,并且后续的拿任务不会出错。
涉及到的代码:
void HandlerTask() // 回调函数,负责在线程初始化时调用
{
LOG(LogLevel::INFO) << "线程开始执行回调函数逻辑";
while (true)
{
T t;
{
LockGuard lock(_mutex);
while (IsEmpty() && is_running)
{
wait_num++;
_cond.Wait(_mutex);
wait_num--;
}
if (IsEmpty() && !is_running)
{
break; // 退出回调函数
}
// 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
t = _taskqueue.front(); // 拿出任务
_taskqueue.pop(); // 删除任务
}
// 执行任务不用上锁
t(); // 执行任务,我们这里规定传进来的任务必须重载()运算符
}
LOG(LogLevel::INFO) << "线程执行回调函数逻辑结束";
}
至此,我们的线程池就已经写的差不多了,
完整代码如下:
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once
#include "log.hpp"
#include "sem.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include <unistd.h>
#include "Mythread.hpp"
#include <queue>
#include <vector>
#include <functional>
namespace TaskModule
{
using namespace LogModule;
using task_t = std::function<void()>;
void task()
{
LOG(LogLevel::INFO) << "任务开始执行...";
}
}
namespace ThreadPoolModule
{
using thread_t = std::shared_ptr<ThreadModule::Mythread>;
using namespace LogModule;
using namespace TaskModule;
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
static int default_num = 5;
template <typename T>
class ThreadPool
{
private:
bool IsEmpty() // 判断任务队列是否为空
{
return _taskqueue.empty();
}
void HandlerTask() // 回调函数,负责在线程初始化时调用
{
LOG(LogLevel::INFO) << "线程开始执行回调函数逻辑";
while (true)
{
T t;
{
LockGuard lock(_mutex);
while (IsEmpty() && is_running)
{
wait_num++;
_cond.Wait(_mutex);
wait_num--;
}
if (IsEmpty() && !is_running)
{
break; // 退出回调函数
}
// 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
t = _taskqueue.front(); // 拿出任务
_taskqueue.pop(); // 删除任务
}
// 执行任务不用上锁
t(); // 执行任务,我们这里规定传进来的任务必须重载()运算符
}
LOG(LogLevel::INFO) << "线程执行回调函数逻辑结束";
}
public:
ThreadPool(int num = default_num)
: _num(num), is_running(false),wait_num(0)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this,std::placeholders::_1)));
// 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
// 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
// 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
}
}
void Stop()
{
if (is_running)
{
is_running = false;
// 此时不能再入任务队列了
if (wait_num > 0)
{
_cond.SignalAll(); // 唤醒所有线程
}
}
}
void Start()
{
is_running = true;
for (auto &it : _threadpool)
{
LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
it->start();
}
}
void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
// 所以我们这里要使用模板参数T
// 这里的T && task是一个语法:引用折叠
// 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
// 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
// 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
{
LockGuard lock(_mutex);
if (!is_running)
{
return; // 如果不为运行状态,就不能新入任务
}
_taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
LOG(LogLevel::INFO) << "任务入队列成功";
if (wait_num > 0)
{
_cond.SignalAll(); // 唤醒所有线程
}
}
void Wait()
{
for (auto &it : _threadpool)
{
it->join();
LOG(LogLevel::INFO) << "回收线程" << it->Name() << "...成功";
}
}
private:
std::vector<thread_t> _threadpool; // 线程管理数组
std::queue<T> _taskqueue; // 任务队列
int _num; // 线程数量
Mutex _mutex; // 锁
Cond _cond; // 条件变量
int wait_num; // 进行等待的线程数量
bool is_running; // 线程池是否在运行
};
}
#endif
我们可以进行测试:
#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
using namespace TaskModule;
int main()
{
ENABLE_CONSOLE_LOG();
// ENABLE_FILE_LOG();
std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>();
tp->Start();
int cnt = 10;
char c;
while (cnt)
{
tp->Equeue(TaskModule::task);
cnt--;
sleep(1);
}
tp->Stop();
sleep(3);
tp->Wait();
return 0;
}
在这代码中我们默认给线程池创建了五个线程,并每隔一秒分配一个任务。
之后进行停止,三秒后回收,我们看一下日志打印信息:
2.4、优化
我们可以对打印进行优化一下,分辨一下具体是哪个线程执行的任务,所以我们需要改变一下回调函数的参数,传递线程的名字进去。
void HandlerTask(std::string name) // 回调函数,负责在线程初始化时调用
{
LOG(LogLevel::INFO) << "线程开始执行回调函数逻辑";
while (true)
{
T t;
{
LockGuard lock(_mutex);
while (IsEmpty() && is_running)
{
wait_num++;
_cond.Wait(_mutex);
wait_num--;
}
if (IsEmpty() && !is_running)
{
break; // 退出回调函数
}
// 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
t = _taskqueue.front(); // 拿出任务
_taskqueue.pop(); // 删除任务
}
// 执行任务不用上锁
t(name); // 执行任务,我们这里规定传进来的任务必须重载()运算符
}
LOG(LogLevel::INFO) << "线程执行回调函数逻辑结束";
}
public:
ThreadPool(int num = default_num)
: _num(num), is_running(false),wait_num(0)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this,std::placeholders::_1)));
// 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
// 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
// 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
}
}
所以此时我们传递的任务类型也需要增加一个对应的参数,否则会报错,
namespace TaskModule
{
using namespace LogModule;
using task_t = std::function<void(std::string)>;
void task(std::string name)
{
LOG(LogLevel::INFO) << "任务开始执行...";
}
}
不止如此,我们之前写的线程封装代码是这样的:
class Mythread
{
private:
static void *Routine(void*args)
{
Mythread *t = static_cast<Mythread *>(args);
t->_status = TSTATUS::RUNNING;
t->_func();
return nullptr;
}
public:
Mythread(func_t func):_func(func), _status(TSTATUS::NEW), _joinable(true)
{
_name = "Thread-" + std::to_string(number++);
_pid = getpid();
}
bool start()//负责线程的创建
{
if (_status != TSTATUS::RUNNING)
{
int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODO
if (n != 0)
return false;
return true;
}
return false;
}
注意看这一段代码,我们在线程池中的start中轮番调用了我们线程的start,此时会创建线程并让其执行Routine回调函数,这里面的_func就是我们实际上执行的函数,但我们已经把任务函数的参数变成了void (std::string)了,而不再是void (),所以我们此时应该把名字传递进去。
这里的func_t我们当时命名的是
这里其实是不符合现在类型function<void(std::string)>的,所以我们可以把这里用到的func_t换成task_t。
为了两个头文件不重复包含,我们这里新开一个任务头文件Task.hpp:
#include"log.hpp"
#include<string>
#include<functional>
namespace TaskModule
{
using namespace LogModule;
using task_t = std::function<void(std::string)>;
void task(std::string name)
{
LOG(LogLevel::INFO) << "任务开始执行...";
}
}
随后让我们的线程头文件使用task_t而不是func_t.
改动的地方主要有两处:
运行代码:
整体的threadpool代码如下:
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once
#include "log.hpp"
#include "sem.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include <unistd.h>
#include "Mythread.hpp"
#include <queue>
#include "Task.hpp"
#include <vector>
#include <functional>
namespace ThreadPoolModule
{
using thread_t = std::shared_ptr<ThreadModule::Mythread>;
using namespace LogModule;
using namespace TaskModule;
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
static int default_num = 5;
template <typename T>
class ThreadPool
{
private:
bool IsEmpty() // 判断任务队列是否为空
{
return _taskqueue.empty();
}
void HandlerTask(std::string name) // 回调函数,负责在线程初始化时调用
{
LOG(LogLevel::INFO) << "线程"<<name<<"开始执行回调函数逻辑";
while (true)
{
T t;
{
LockGuard lock(_mutex);
while (IsEmpty() && is_running)
{
wait_num++;
_cond.Wait(_mutex);
wait_num--;
}
if (IsEmpty() && !is_running)
{
break; // 退出回调函数
}
// 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
t = _taskqueue.front(); // 拿出任务
_taskqueue.pop(); // 删除任务
}
// 执行任务不用上锁
t(name); // 执行任务,我们这里规定传进来的任务必须重载()运算符
}
LOG(LogLevel::INFO) << "线程"<<name<<"执行回调函数逻辑结束";
}
public:
ThreadPool(int num = default_num)
: _num(num), is_running(false),wait_num(0)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this,std::placeholders::_1)));
// 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
// 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
// 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
}
}
void Stop()
{
if (is_running)
{
is_running = false;
// 此时不能再入任务队列了
if (wait_num > 0)
{
_cond.SignalAll(); // 唤醒所有线程
}
}
}
void Start()
{
is_running = true;
for (auto &it : _threadpool)
{
LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
it->start();
}
}
void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
// 所以我们这里要使用模板参数T
// 这里的T && task是一个语法:引用折叠
// 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
// 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
// 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
{
LockGuard lock(_mutex);
if (!is_running)
{
return; // 如果不为运行状态,就不能新入任务
}
_taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
LOG(LogLevel::INFO) << "任务入队列成功";
if (wait_num > 0)
{
_cond.SignalAll(); // 唤醒所有线程
}
}
void Wait()
{
for (auto &it : _threadpool)
{
it->join();
LOG(LogLevel::INFO) << "回收线程" << it->Name() << "...成功";
}
}
private:
std::vector<thread_t> _threadpool; // 线程管理数组
std::queue<T> _taskqueue; // 任务队列
int _num; // 线程数量
Mutex _mutex; // 锁
Cond _cond; // 条件变量
int wait_num; // 进行等待的线程数量
bool is_running; // 线程池是否在运行
};
}
#endif
task.hpp不再赘述。
三、线程安全的单例模式
3.1、什么是单例模式
某些类, 只应该具有⼀个对象(实例), 就称之为单例。
例如⼀个男⼈只能有⼀个媳妇。
它常用于管理全局资源,如日志系统、数据库连接、线程池等。
3.2、懒汉实现方式与饿汉实现方式
class Singleton
{
public:
static Singleton* getInstance()
{
return instance; // 直接返回预先创建好的实例
}
private:
Singleton() {} // 私有构造
static Singleton* instance; // 静态实例
};
// 程序一启动就初始化(类似“吃完饭立刻洗碗”)
Singleton* Singleton::instance = new Singleton();
template <typename T>
class Singleton
{
static T* inst;
public:
static T* GetInstance()
{
if (inst == NULL)
{
inst = new T();
}
return inst;
}
};
但这个代码存在一个严重问题,在多线程下会不安全。
第一次调用GetInstance时,如果多个线程同时调用,可能会创建出多份T对象事例。
但是后续调用就没问题了
// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
volatile static T *inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T *GetInstance()
{
if (inst == NULL)
{ // 双重判定空指针, 降低锁冲突的概率, 提⾼性能.
lock.lock(); // 使⽤互斥锁, 保证多线程情况下也只调⽤⼀次 new.
if (inst == NULL)
{
inst = new T();
}
lock.unlock();
}
return inst;
}
};
3.3、单例化线程池
如果我们想在原来代码基础上实现一个单例化的线程池,该如何实现?
首先由于是只有一个对象,为了安全我们必须禁掉他的复制拷贝与拷贝构造函数。
并且,我们还需要把构造函数放在private中去,防止外部直接new创建对象。这样一来我们只能通过内部的函数调用来创建。
所以我们需要在public中写一个接口实现创建且只创建一个线程池对象,并且新增一把专门保护视获取实例的锁和一个实例指针,指向该实例,存储并维护这个唯一实例的地址。
并且这个接口必须被static修饰,否则我们就必须要先有一个对象才能调用该成员函数。
另外,我们保护线程安全用到的锁,以及将会被我们初始化的唯一一个实例,也需要被static修饰,这是为了让所有的 ThreadPool<T>
类共享同一个 instance
指针。确保无论调用多少次 getInstance()
,返回的都是同一个实例。
这里需要注意上面的线程安全问题,所以代码如下:
class ThreadPool
{
private:
......
ThreadPool(const ThreadPool &) = delete; // 禁止拷贝函数
ThreadPool<T> &operator=(const ThreadPool &) = delete; // 禁止赋值函数
ThreadPool(int num = default_num) // 将构造函数放在私人权限中防止外部随意创建对象
: _num(num), is_running(false), wait_num(0)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
// 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
// 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
// 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
}
}
public:
static ThreadPool<T> *getInstance()
{
if (instance == NULL)
{
LockGuard lockguard(mutex);
if (instance == NULL)
{
LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";
instance = new ThreadPool<T>();
}
}
return instance;
}
.....
private:
.....
static ThreadPool<T> *instance; //实例指针
static Mutex mutex; // 只用来保护单例
};
注意,static修饰的成员变量的声明在类内,定义在类外:
template <typename T> // static的类成员变量的初始化需要放在类外
ThreadPool<T> *ThreadPool<T>::instance = NULL;
template <typename T>
Mutex ThreadPool<T>::mutex; // 只用来保护单例
测试一下:
#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
using namespace TaskModule;
int main()
{
ENABLE_CONSOLE_LOG();
ThreadPool<task_t>::getInstance()->Start();
char c;
int cnt = 5;
while (cnt)
{
// std::cin >> c;
ThreadPool<task_t>::getInstance()->Equeue(TaskModule::task);
cnt--;
sleep(1);
}
ThreadPool<task_t>::getInstance()->Stop();
ThreadPool<task_t>::getInstance()->Wait();
return 0;
}
单例化线程池的代码如下:
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#pragma once
#include "log.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include <unistd.h>
#include "Mythread.hpp"
#include <queue>
#include "Task.hpp"
#include <vector>
#include <functional>
namespace ThreadPoolModule
{
using thread_t = std::shared_ptr<ThreadModule::Mythread>;
using namespace LogModule;
using namespace TaskModule;
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
static int default_num = 5;
template <typename T>
class ThreadPool
{
private:
bool IsEmpty() // 判断任务队列是否为空
{
return _taskqueue.empty();
}
void HandlerTask(std::string name) // 回调函数,负责在线程初始化时调用
{
LOG(LogLevel::INFO) << "线程" << name << "开始执行回调函数逻辑";
while (true)
{
T t;
{
LockGuard lock(_mutex);
while (IsEmpty() && is_running)
{
wait_num++;
_cond.Wait(_mutex);
wait_num--;
}
if (IsEmpty() && !is_running)
{
break; // 退出回调函数
}
// 此时只会出现有任务的情况,无任务且is_running的情况已经在while循环中处理了,无任务且!is_running的情况再上面if中处理了
t = _taskqueue.front(); // 拿出任务
_taskqueue.pop(); // 删除任务
}
// 执行任务不用上锁
t(name); // 执行任务,我们这里规定传进来的任务必须重载()运算符
}
LOG(LogLevel::INFO) << "线程" << name << "执行回调函数逻辑结束";
}
ThreadPool(const ThreadPool &) = delete; // 禁止拷贝函数
ThreadPool<T> &operator=(const ThreadPool &) = delete; // 禁止赋值函数
ThreadPool(int num = default_num) // 将构造函数放在私人权限中防止外部随意创建对象
: _num(num), is_running(false), wait_num(0)
{
for (int i = 0; i < _num; ++i)
{
_threadpool.push_back(std::make_shared<Mythread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
// 我们这里使用bind的原因是因为HandlerTask是一个类成员函数,只传递函数名会导致函数签名不匹配,
// 我们想要在不是这个类的线程变量中调用这个函数,需要需要 this 提供调用上下文
// 我们需要将this指针绑定到HandlerTask函数中,这样才能在HandlerTask函数中访问到线程池的成员变量
}
}
public:
static ThreadPool<T> *getInstance()
{
if (instance == NULL)
{
LockGuard lockguard(mutex);
if (instance == NULL)
{
LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";
instance = new ThreadPool<T>();
}
}
return instance;
}
void Stop()
{
if (is_running)
{
is_running = false;
// 此时不能再入任务队列了
if (wait_num > 0)
{
_cond.SignalAll(); // 唤醒所有线程
}
}
}
void Start()
{
is_running = true;
for (auto &it : _threadpool)
{
LOG(LogLevel::INFO) << "启动线程" << it->Name() << "...成功";
it->start();
}
}
void Equeue(T &&task) // 我们这里的线程池是一个模板,这个模板的划分是根据我们传进来的任务类型来划分的。
// 所以我们这里要使用模板参数T
// 这里的T && task是一个语法:引用折叠
// 如果我们传进来的是一个左值,那么T && task会被折叠成T & &&,根据引用折叠规则,会被折叠成T &
// 如果我们传进来的是一个右值,那么T && task会被折叠成T && &&,根据引用折叠规则,会被折叠成T &&
// 如果我们传进来的是一个task_t,T&& 就是普通的右值引用(task_t&&)
{
LockGuard lock(_mutex);
if (!is_running)
{
return; // 如果不为运行状态,就不能新入任务
}
_taskqueue.push(std::forward<T>(task)); // 我们这里使用std::forward来进行完美转发
LOG(LogLevel::INFO) << "任务入队列成功";
if (wait_num > 0)
{
_cond.SignalAll(); // 唤醒所有线程
}
}
void Wait()
{
for (auto &it : _threadpool)
{
it->join();
LOG(LogLevel::INFO) << "回收线程" << it->Name() << "...成功";
}
}
private:
std::vector<thread_t> _threadpool; // 线程管理数组
std::queue<T> _taskqueue; // 任务队列
int _num; // 线程数量
Mutex _mutex; // 锁
Cond _cond; // 条件变量
int wait_num; // 进行等待的线程数量
bool is_running; // 线程池是否在运行
static ThreadPool<T> *instance;
static Mutex mutex; // 只用来保护单例
};
template <typename T> // static的类成员变量的初始化需要放在类外
ThreadPool<T> *ThreadPool<T>::instance = NULL;
template <typename T>
Mutex ThreadPool<T>::mutex; // 只用来保护单例
}
#endif
总结:
很高兴告诉大家,我们目前为止,关于linux系统部分的内容,就已经学完了。
剩下一些知识点相比之下并不是很重要,我们也许会放到加餐课中进行学习研究。
从明天开始,我们将会进入Linux的另外一种大山:网络编程。
今天的内容希望对大家有所帮助!!