线程池
本节重点:
- 设计日志和线程池。
- 理解线程安全和可重入,掌握锁相关概念。
一.线程池
在写线程池之前,我们要做如下准备:
- 准备线程的封装。
- 准备锁和条件变量的封装。
- 引入日志,对线程进行封装。
1.日志和策略模式
- 日志:记录系统和软件运行中发生事件的文件,主要作用是监控运行状态、记录异常信息,帮助快速定位问题并支持程序员进行问题修复。它是系统维护、故障排查和安全管理的重要工具。
- 日志格式中的某些指标是必须有:时间戳、日志等级、日志内容。存在几个指标是可选的:文件名行号、进程,线程相关id信息等。
- 日志有现成的解决方案:spdlog、glog、Boost.Log、Log4cxx等。日志位于 /var/log/ 路径下
- 设计模式:在软件开发过程中,针对反复出现的问题所总结归纳出的通用解决方案。
策略模式:
- 抽象策略类(基类):包含一个或多个纯虚函数,用于声明具体策略类需要实现的接口。
- 具体策略类(派生类):重写了抽象策略类中定义的接口,每个具体策略类代表一个具体的接口。
- 上下文类:持有一个抽象策略类的指针/引用,负责根据需要选择和使用具体的策略类。
抽象策略类的作用:定义统一接口,运行时多态,提高代码的可维护性和可扩展性。
这里采用 设计模式 - 策略模式 来进行日志的设计,我们想要的日志格式如下:
[可读性很好的时间] [日志等级] [进程pid] [打印对应日志的文件名][行号] - 消息内容, 支持可变参数
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [9] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [10] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [11] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [12] - hello world
// Log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem> // C++17文件系统
#include <fstream> // 文件流
#include <sstream> // 字符串流
#include <memory>
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"
namespace LogModule
{
using namespace MutexModule;
// 获取系统时间
std::string CurrentTime()
{
time_t time_stamp = ::time(nullptr); // 获取时间戳
struct tm curr;
localtime_r(&time_stamp, &curr); // 将时间戳转化为可读性强的信息
char buffer[1024];
snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",
curr.tm_year + 1900,
curr.tm_mon + 1,
curr.tm_mday,
curr.tm_hour,
curr.tm_min,
curr.tm_sec);
return buffer;
}
// 日志文件: 默认路径和默认文件名
const std::string defaultlogpath = "./log/";
const std::string defaultlogname = "log.txt";
// 日志等级
enum class LogLevel
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string Level2String(LogLevel level)
{
switch (level)
{
case LogLevel::DEBUG:
return "DEBUG";
case LogLevel::INFO:
return "INFO";
case LogLevel::WARNING:
return "WARNING";
case LogLevel::ERROR:
return "ERROR";
case LogLevel::FATAL:
return "FATAL";
default:
return "NONE";
}
}
// 3. 策略模式: 刷新策略
class LogStrategy
{
public:
virtual ~LogStrategy() = default; //???
// 纯虚函数: 无法实例化对象, 派生类可以重载该函数, 实现不同的刷新方式
virtual void SyncLog(const std::string &message) = 0;
};
// 3.1 控制台策略
class ConsoleLogStrategy : public LogStrategy
{
public:
ConsoleLogStrategy() {}
~ConsoleLogStrategy() {}
void SyncLog(const std::string &message) override
{
LockGuard lockguard(_mutex);
std::cout << message << std::endl;
}
private:
Mutex _mutex;
};
// 3.2 文件级(磁盘)策略
class FileLogStrategy : public LogStrategy
{
public:
FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname)
: _logpath(logpath), _logname(logname)
{
// 判断_logpath目录是否存在
if (std::filesystem::exists(_logpath))
{
return;
}
try
{
std::filesystem::create_directories(_logpath);
}
catch (std::filesystem::filesystem_error &e)
{
std::cerr << e.what() << "\n";
}
}
~FileLogStrategy() {}
void SyncLog(const std::string &message) override
{
LockGuard lockguard(_mutex);
std::string log = _logpath + _logname;
std::ofstream out(log, std::ios::app); // 以追加的方式打开文件
if (!out.is_open())
{
return;
}
out << message << "\n"; // 将信息刷新到out流中
out.close();
}
private:
std::string _logpath;
std::string _logname;
Mutex _mutex;
};
// 4. 日志类: 构建日志字符串, 根据策略进行刷新
class Logger
{
public:
Logger()
{
// 默认往控制台上刷新
_strategy = std::make_shared<ConsoleLogStrategy>();
}
~Logger() {}
void EnableConsoleLog()
{
_strategy = std::make_shared<ConsoleLogStrategy>();
}
void EnableFileLog()
{
_strategy = std::make_shared<FileLogStrategy>();
}
// 内部类: 记录完整的日志信息
class LogMessage
{
public:
LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger)
: _currtime(CurrentTime()), _level(level), _pid(::getpid())
, _filename(filename), _line(line), _logger(logger)
{
std::stringstream ssbuffer;
ssbuffer << "[" << _currtime << "] "
<< "[" << Level2String(_level) << "] "
<< "[" << _pid << "] "
<< "[" << _filename << "] "
<< "[" << _line << "] - ";
_loginfo = ssbuffer.str();
}
~LogMessage()
{
if(_logger._strategy)
{
_logger._strategy->SyncLog(_loginfo);
}
}
template <class T>
LogMessage &operator<<(const T &info)
{
std::stringstream ssbuffer;
ssbuffer << info;
_loginfo += ssbuffer.str();
return *this;
}
private:
std::string _currtime; // 当前日志时间
LogLevel _level; // 日志水平
pid_t _pid; // 进程pid
std::string _filename; // 文件名
uint32_t _line; // 日志行号
Logger &_logger; // 负责根据不同的策略进行刷新
std::string _loginfo; // 日志信息
};
// 故意拷贝, 形成LogMessage临时对象, 后续在被<<时,会被持续引用,
// 直到完成输入,才会自动析构临时LogMessage, 至此完成了日志的刷新,
// 同时形成的临时对象内包含独立日志数据, 未来采用宏替换, 获取文件名和代码行数
LogMessage operator()(LogLevel level, const std::string &filename, int line)
{
return LogMessage(level, filename, line, *this);
}
private:
// 纯虚类不能实例化对象, 但是可以定义指针
std::shared_ptr<LogStrategy> _strategy; // 日志刷新策略方案
};
// 定义全局logger对象
Logger logger;
// 编译时进行宏替换: 方便随时获取行号和文件名
#define LOG(level) logger(level, __FILE__, __LINE__)
// 提供选择使用何种日志策略的方法
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}
// Main.cc
#include <iostream>
#include "Log.hpp"
using namespace LogModule;
int main()
{
// 往显示器中写入
ENABLE_CONSOLE_LOG();
LOG(LogLevel::DEBUG) << "hello world";
LOG(LogLevel::DEBUG) << "hello world";
LOG(LogLevel::DEBUG) << "hello world";
LOG(LogLevel::DEBUG) << "hello world";
// 往文件中写入
ENABLE_FILE_LOG();
LOG(LogLevel::DEBUG) << "hello world";
LOG(LogLevel::DEBUG) << "hello world";
LOG(LogLevel::DEBUG) << "hello world";
LOG(LogLevel::DEBUG) << "hello world";
return 0;
}
xzy@hcss-ecs-b3aa:~$ ./testLog
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [9] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [10] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [11] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [12] - hello world
2.线程池
- 线程池:创建一定数量线程,这些线程处于等待任务的状态。如果没有任务,线程在条件变量下等待,直到有任务到来。当有新的任务到来时,线程池会唤醒一个线程执行任务。当任务执行完毕后,线程不会被销毁,而是返回到线程池中等待下一个任务,从而实现线程的复用。
线程池使用场景:
- 高并发场景:在处理大量并发请求的场景中,如 Web 服务器、数据库服务器等,使用线程池可以有效地处理并发请求,提高系统的吞吐量。
- 任务执行频繁的场景:当程序中需要频繁地执行一些小任务时,使用线程池可以避免频繁地创建和销毁线程,提高程序的效率。
- 需要控制线程数量的场景:在一些对系统资源有限制的场景中,如嵌入式系统、移动设备等,使用线程池可以控制线程的数量,避免系统资源耗尽。
这里我们实现:创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口。
1.Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "Log.hpp"
using namespace LogModule;
void MySql(std::string name)
{
LOG(LogLevel::DEBUG) << "我是一个数据任务, 我正在被执行" << "[" << name << "]";
}
void UpLoad(std::string name)
{
LOG(LogLevel::DEBUG) << "我是一个上传任务, 我正在被执行" << "[" << name << "]";
}
void DownLoad(std::string name)
{
LOG(LogLevel::DEBUG) << "我是一个下载任务, 我正在被执行" << "[" << name << "]";
}
using task_t = std::function<void(std::string name)>;
std::vector<task_t> tasks;
2.Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
namespace ThreadModule
{
using func_t = std::function<void(std::string)>;
static int number = 1;
// 强类型枚举: 枚举的成员名称被限定在枚举类型的作用域内
enum class TSTATUS
{
NEW,
RUNNING,
STOP
};
class Thread
{
private:
// 成员方法: 需要加上static表示不需要this指针, 否则回调函数报错
// 而要执行_func()函数又需要由this指针, 所以Routine函数传this指针
static void *Routine(void *args)
{
Thread *t = static_cast<Thread *>(args);
t->_func(t->Name());
return nullptr;
}
void EnableDetach() { _joinable = false; }
public:
Thread(func_t func)
: _func(func), _status(TSTATUS::NEW), _joinable(true)
{
_name = "Thread-" + std::to_string(number++);
_pid = getpid();
}
~Thread() {}
// 线程创建
bool Start()
{
if (_status != TSTATUS::RUNNING)
{
int n = pthread_create(&_tid, nullptr, Routine, this);
if (n != 0)
return false;
_status = TSTATUS::RUNNING;
return true;
}
return false;
}
// 线程退出
bool Stop()
{
if (_status == TSTATUS::RUNNING)
{
int n = ::pthread_cancel(_tid);
if (n != 0)
return false;
_status = TSTATUS::STOP;
return true;
}
return false;
}
// 线程等待
bool Join()
{
if (_joinable)
{
int n = ::pthread_join(_tid, nullptr);
if (n != 0)
return false;
_status = TSTATUS::STOP;
return true;
}
return false;
}
// 线程分离
bool Detach()
{
EnableDetach();
int n = ::pthread_detach(_tid);
if (n != 0)
return false;
return true;
}
// 线程是否分离
bool IsJoinable() { return _joinable; }
std::string Name() { return _name; }
private:
std::string _name;
pthread_t _tid;
pid_t _pid;
bool _joinable; // 线程是否是分离的, 默认不是
func_t _func;
TSTATUS _status;
};
}
3.ThreadPool.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <memory>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Log.hpp"
namespace ThreadPoolModule
{
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
using namespace LogModule;
using thread_t = std::shared_ptr<Thread>;
const static int defaultnum = 5;
template <class T>
class ThreadPool
{
private:
bool IsEmpty() { return _taskq.empty(); }
void HandlerTask(std::string name)
{
LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask执行逻辑";
while (true)
{
// 1. 拿任务: 访问共享资源, 需要加锁
T task;
{
LockGuard lockguard(_mutex);
while (IsEmpty() && _isrunning) // while替代if: 防止伪唤醒
{
_wait_num++;
_cond.Wait(_mutex); // 没任务时: 线程在条件变量上阻塞等待
_wait_num--;
}
// 2. 任务队列不为空 && 线程池退出
if (IsEmpty() && !_isrunning)
break;
task = _taskq.front();
_taskq.pop();
}
// 3. 处理任务: 并发处理, 不需要持有锁
task(name);
}
LOG(LogLevel::INFO) << "线程: " << name << ", 退出";
}
public:
ThreadPool(int num = defaultnum)
: _num(num), _wait_num(0), _isrunning(false)
{
for (int i = 0; i < _num; i++)
{
// 在类中: bind类的公有方法, 需要取地址 + 传入this指针
// 在类外: bind类的公有方法, 需要取地址 + 传入类的匿名对象
_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // push_back()会调用移动构造
LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功";
}
}
~ThreadPool() {}
void Equeue(const T &in)
{
LockGuard lockguard(_mutex);
if (!_isrunning) return;
_taskq.push(in);
if (_wait_num > 0)
{
_cond.Signal(); // 唤醒线程
}
}
void Start()
{
if (_isrunning) return;
_isrunning = true;
for (auto &thread_ptr : _threads)
{
thread_ptr->Start();
LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";
}
}
void Stop()
{
LockGuard lockguard(_mutex);
if (_isrunning)
{
// 1. 不能再新增任务了
_isrunning = false;
// 2. 让线程自己退出(唤醒所有的线程) && 历史任务被执行完
if (_wait_num > 0)
{
_cond.Broadcast();
}
}
}
void Wait()
{
for (auto &thread_ptr : _threads)
{
thread_ptr->Join();
LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";
}
}
private:
int _num; // 线程的个数
std::vector<thread_t> _threads; // 线程池
std::queue<T> _taskq; // 共享资源: 任务队列
int _wait_num; // 等待的线程数目
bool _isrunning; // 线程池是否运行
Mutex _mutex; // 锁
Cond _cond; // 条件变量
};
}
4.ThreadPool.cc
#include <iostream>
#include <memory>
#include <unistd.h>
#include "ThreadPool.hpp"
#include "Task.hpp"
using namespace ThreadPoolModule;
int main()
{
ENABLE_CONSOLE_LOG();
tasks.push_back(MySql);
tasks.push_back(UpLoad);
tasks.push_back(DownLoad);
std::shared_ptr<ThreadPool<task_t>> tp = std::make_shared<ThreadPool<task_t>>(3);
tp->Start();
int cnt = 0;
while(cnt < 6)
{
tp->Equeue(tasks[cnt % 3]);
cnt++;
sleep(1);
}
tp->Stop();
sleep(3);
tp->Wait();
return 0;
}
xzy@hcss-ecs-b3aa:~$ ./thread_pool
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [67] - 构建线程Thread-1对象...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [67] - 构建线程Thread-2对象...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [67] - 构建线程Thread-3对象...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [91] - 启动线程Thread-1...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [91] - 启动线程Thread-2...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [91] - 启动线程Thread-3...成功
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [31] - 线程: Thread-1, 进入HandlerTask执行逻辑
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [31] - 线程: Thread-2, 进入HandlerTask执行逻辑
[2025-03-11 17:32:01] [INFO] [1030242] [ThreadPool.hpp] [31] - 线程: Thread-3, 进入HandlerTask执行逻辑
[2025-03-11 17:32:01] [DEBUG] [1030242] [Task.hpp] [12] - 我是一个数据任务, 我正在被执行[Thread-1]
[2025-03-11 17:32:02] [DEBUG] [1030242] [Task.hpp] [17] - 我是一个上传任务, 我正在被执行[Thread-2]
[2025-03-11 17:32:03] [DEBUG] [1030242] [Task.hpp] [22] - 我是一个下载任务, 我正在被执行[Thread-3]
[2025-03-11 17:32:04] [DEBUG] [1030242] [Task.hpp] [12] - 我是一个数据任务, 我正在被执行[Thread-1]
[2025-03-11 17:32:05] [DEBUG] [1030242] [Task.hpp] [17] - 我是一个上传任务, 我正在被执行[Thread-2]
[2025-03-11 17:32:06] [DEBUG] [1030242] [Task.hpp] [22] - 我是一个下载任务, 我正在被执行[Thread-3]
[2025-03-11 17:32:07] [INFO] [1030242] [ThreadPool.hpp] [55] - 线程: Thread-2, 退出
[2025-03-11 17:32:07] [INFO] [1030242] [ThreadPool.hpp] [55] - 线程: Thread-3, 退出
[2025-03-11 17:32:07] [INFO] [1030242] [ThreadPool.hpp] [55] - 线程: Thread-1, 退出
[2025-03-11 17:32:10] [INFO] [1030242] [ThreadPool.hpp] [116] - 回收线程Thread-1...成功
[2025-03-11 17:32:10] [INFO] [1030242] [ThreadPool.hpp] [116] - 回收线程Thread-2...成功
[2025-03-11 17:32:10] [INFO] [1030242] [ThreadPool.hpp] [116] - 回收线程Thread-3...成功
二.线程安全与重入问题
- 线程安全:就是多个线程在访问共享资源时,能够正确地执行,不会相互干扰或破坏彼此的执行结果。一般而言,多个线程并发同一段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进行操作,并且没有锁保护的情况下,容易出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可入函数。
重入的两种情况:
- 多线程重入函数。
- 信号导致一个执行流重复进入函数。
注意:在多进程中,可能会发生重入,但是访问共享资源会发生写时拷贝,不会出现问题!
三.线程安全的单例模式
- 单例模式:只能创建一个对象!
- 在很多服务器开发场景中,经常需要让服务器加载很多的数据(上百G)到内存中。此时往往要用一个单例的类来管理这些数据。
1.饿汉模式
- 初始化时机:在程序启动时,类被加载到内存后就立即初始化单例对象,无论后续是否会使用到该对象。
- 线程安全性:由于是在程序启动时就完成初始化,在多线程环境下,不存在多个线程同时创建单例对象的问题,所以通常是线程安全的。
// 饿汉模式存在问题
// 1. 多个饿汉模式的单例,某个对象初始化内容较多(读文件),会导致程序启动慢
// 2. A和B两个饿汉,对象初始化存在依赖关系,要求A先初始化,B再初始化,无法保证
class InfoMar
{
public:
static InfoMar* GetInstance()
{
return &_ins;
}
private:
InfoMar()
{
cout << "ip:" << _ip << endl;
cout << "port: " << _port << endl;
}
// 禁用: 拷贝构造和拷贝赋值
InfoMar(const InfoMar&) = delete;
InfoMar& operator=(const InfoMar&) = delete;
private:
string _ip = "127.0.0.1";
int _port = 8080;
// 静态对象与普通对象不同: 不在类中, 而是在静态区, 类域的限制
static InfoMar _ins;
};
// 静态成员: 类内声明, 类外定义
InfoMar InfoMar::_ins;
int main()
{
// 程序运行在该行: 静态对象就被创建好了
return 0;
}
2.懒汉模式
- 初始化时机:单例对象在第一次被使用时才进行初始化,在未被使用之前,不会创建对象。
- 线程安全性:如果不做额外的线程安全处理,在多线程环境下,当多个线程同时访问获取单例对象的方法,且此时单例对象尚未初始化时,就可能会导致多个线程同时创建单例对象,破坏单例模式的唯一性。因此,懒汉模式需要通过加锁等机制来保证线程安全。
// 懒汉模式: 解决饿汉模式的两个问题
class InfoMar
{
public:
static InfoMar* GetInstance()
{
// 若单例为空: 需要加锁创建单例对象
if (_pins == nullptr)
{
mutex.lock();
if (_pins == nullptr)
{
_pins = new InfoMar;
}
mutex.unlock();
}
// 若单例不为空: 直接返回单例对象
return _pins;
}
static void DelInstance()
{
delete _pins;
_pins = nullptr;
}
private:
InfoMar()
{
cout << "ip:" << _ip << endl;
cout << "port: " << _port << endl;
}
// 禁用: 拷贝构造和拷贝赋值
InfoMar(const InfoMar&) = delete;
InfoMar& operator=(const InfoMar&) = delete;
private:
string _ip = "127.0.0.1";
int _port = 8080;
static InfoMar* _pins; // 单例对象
static Mutex mutex; // 保护单例对象
};
InfoMar* InfoMar::_pins = nullptr;
Mutex InfoMar::mutex;
int main()
{
InfoMar::GetInstance();
return 0;
}
3.懒汉模式线程池
1.ThreadPool.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <memory>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Log.hpp"
namespace ThreadPoolModule
{
using namespace MutexModule;
using namespace CondModule;
using namespace ThreadModule;
using namespace LogModule;
using thread_t = std::shared_ptr<Thread>;
const static int defaultnum = 3;
template <class T>
class ThreadPool
{
private:
bool IsEmpty() { return _taskq.empty(); }
void HandlerTask(std::string name)
{
LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask执行逻辑";
while (true)
{
// 1. 拿任务: 访问共享资源, 需要加锁
T task;
{
LockGuard lockguard(_mutex);
while (IsEmpty() && _isrunning) // while替代if: 防止伪唤醒
{
_wait_num++;
_cond.Wait(_mutex); // 没任务时: 线程在条件变量上阻塞等待
_wait_num--;
}
// 2. 任务队列不为空 && 线程池退出
if (IsEmpty() && !_isrunning)
break;
task = _taskq.front();
_taskq.pop();
}
// 3. 处理任务: 并发处理, 不需要持有锁
task(name);
}
LOG(LogLevel::INFO) << "线程: " << name << ", 退出";
}
ThreadPool(int num = defaultnum)
: _num(num), _wait_num(0), _isrunning(false)
{
for (int i = 0; i < _num; i++)
{
// 在类中: bind类的公有方法, 需要取地址 + 传入this指针
// 在类外: bind类的公有方法, 需要取地址 + 传入类的匿名对象
_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // push_back()会调用移动构造
LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功";
}
}
ThreadPool<T>(const ThreadPool<T> &) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
public:
~ThreadPool() {}
// 获取单例对象
static ThreadPool<T> *GetInstance()
{
// 若单例为空: 需要加锁创建单例对象
if(instance == nullptr)
{
LockGuard lockguard(_lock);
if(instance == nullptr)
{
LOG(LogLevel::INFO) << "单例首次被执行, 需要加载对象...";
instance = new ThreadPool<T>();
}
}
// 若单例不为空: 直接返回单例对象
return instance;
}
void Equeue(const T &in)
{
LockGuard lockguard(_mutex);
if (!_isrunning) return;
_taskq.push(in);
if (_wait_num > 0)
{
_cond.Signal(); // 唤醒线程
}
}
void Start()
{
if (_isrunning) return;
_isrunning = true;
for (auto &thread_ptr : _threads)
{
thread_ptr->Start();
LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";
}
}
void Stop()
{
LockGuard lockguard(_mutex);
if (_isrunning)
{
// 1. 不能再新增任务了
_isrunning = false;
// 2. 让线程自己退出(唤醒所有的线程) && 历史任务被执行完
if (_wait_num > 0)
{
_cond.Broadcast();
}
}
}
void Wait()
{
for (auto &thread_ptr : _threads)
{
thread_ptr->Join();
LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";
}
}
private:
int _num; // 线程的个数
std::vector<thread_t> _threads; // 线程池
std::queue<T> _taskq; // 共享资源: 任务队列
int _wait_num; // 等待的线程数目
bool _isrunning; // 线程池是否运行
Mutex _mutex; // 锁
Cond _cond; // 条件变量
static ThreadPool<T> *instance; // 单例对象
static Mutex _lock; // 用来保护单例
};
// 静态成员: 类内声明, 类外定义
template<class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
template<class T>
Mutex ThreadPool<T>::_lock;
}
2.ThreadPool.cc
#include <iostream>
#include <memory>
#include <unistd.h>
#include "ThreadPool.hpp"
#include "Task.hpp"
using namespace ThreadPoolModule;
int main()
{
ENABLE_CONSOLE_LOG();
tasks.push_back(MySql);
tasks.push_back(UpLoad);
tasks.push_back(DownLoad);
ThreadPool<task_t>::GetInstance()->Start();
int cnt = 0;
while(cnt < 6)
{
ThreadPool<task_t>::GetInstance()->Equeue(tasks[cnt % 3]);
cnt++;
sleep(1);
}
ThreadPool<task_t>::GetInstance()->Stop();
sleep(3);
ThreadPool<task_t>::GetInstance()->Wait();
return 0;
}
xzy@hcss-ecs-b3aa:~$ ./thread_pool
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [85] - 单例首次被执行, 需要加载对象...
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [66] - 构建线程Thread-1对象...成功
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [66] - 构建线程Thread-2对象...成功
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [66] - 构建线程Thread-3对象...成功
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [111] - 启动线程Thread-1...成功
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [111] - 启动线程Thread-2...成功
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [111] - 启动线程Thread-3...成功
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [31] - 线程: Thread-3, 进入HandlerTask执行逻辑
[2025-03-12 20:46:40] [DEBUG] [1077712] [Task.hpp] [12] - 我是一个数据任务, 我正在被执行[Thread-3]
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [31] - 线程: Thread-2, 进入HandlerTask执行逻辑
[2025-03-12 20:46:40] [INFO] [1077712] [ThreadPool.hpp] [31] - 线程: Thread-1, 进入HandlerTask执行逻辑
[2025-03-12 20:46:41] [DEBUG] [1077712] [Task.hpp] [17] - 我是一个上传任务, 我正在被执行[Thread-3]
[2025-03-12 20:46:42] [DEBUG] [1077712] [Task.hpp] [22] - 我是一个下载任务, 我正在被执行[Thread-2]
[2025-03-12 20:46:43] [DEBUG] [1077712] [Task.hpp] [12] - 我是一个数据任务, 我正在被执行[Thread-1]
[2025-03-12 20:46:44] [DEBUG] [1077712] [Task.hpp] [17] - 我是一个上传任务, 我正在被执行[Thread-3]
[2025-03-12 20:46:45] [DEBUG] [1077712] [Task.hpp] [22] - 我是一个下载任务, 我正在被执行[Thread-2]
[2025-03-12 20:46:46] [INFO] [1077712] [ThreadPool.hpp] [55] - 线程: Thread-1, 退出
[2025-03-12 20:46:46] [INFO] [1077712] [ThreadPool.hpp] [55] - 线程: Thread-3, 退出
[2025-03-12 20:46:46] [INFO] [1077712] [ThreadPool.hpp] [55] - 线程: Thread-2, 退出
[2025-03-12 20:46:49] [INFO] [1077712] [ThreadPool.hpp] [136] - 回收线程Thread-1...成功
[2025-03-12 20:46:49] [INFO] [1077712] [ThreadPool.hpp] [136] - 回收线程Thread-2...成功
[2025-03-12 20:46:49] [INFO] [1077712] [ThreadPool.hpp] [136] - 回收线程Thread-3...成功
四.死锁的概念
1.死锁
- 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
- 为了发便表述,假设现在线程A,线程B必须同时持有锁1和锁2,才能进行后续资源的访问。
申请一把锁是原子的,但是申请两把锁就不一定了:
造成的结果是:
2.死锁的四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用。
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放。
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺。
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系。
3.避免死锁
破坏死锁的四个必要条件
◦ 破坏循环等待条件问题:资源⼀次性分配,使⽤超时机制、加锁顺序⼀致
五.STL、智能指针和线程安全?
- STL 的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶),因此 STL 默认不是线程安全。如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。
智能指针是否是线程安全的:
- 对于 unique_ptr,不支持拷贝,只支持移动,所以它不会在多个线程之间共享,也就不存在多个线程同时访问导致数据竞争等线程安全问题。
- 对于 shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题。但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证 shared_ptr 能够高效,原子的操作引用计数。
- 注意:这里的线程安全是指智能指针的接口是线程安全的,但是智能指针指向的对象未必是线程安全的!