线程池流程总结:
1、构造函数中创建线程,并添加到线程池(构造函数返回时,线程自动启动,并停在等待wait:从线程池取出一个任务处);
2、主线程中添加任务,到任务队列。并用“条件变量”通知一个线程,从线程池取出一个任务;
3、取出任务后,执行线程的任务函数 =》回调添加的“实际的线程函数”;
4、主线程执行完,return返回 =》调用线程池析构函数;
5、“条件变量”通知所有线程停止,使得线程循环退出,并等待所有线程完成任务;
6、主线程main结束。
一、C++线程池1
1、用c++封装线程池
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
using namespace std;
class ThreadPool {
public:
ThreadPool(size_t threads) : stop(false)
{
for (size_t i = 0; i < threads; ++i)
{
//一、lambda表达式返回值是一个线程对象,为什么没有看见创建线程的语句?
// thread是什么时候创建的呢?
//
//二、lambda表达式什么时候执行?
// 1.当线程池的构造函数返回时,线程池中的线程才开始运行
// 2.当你创建一个std::thread对象并传入一个函数时(对象的实例化),
// 这个线程会自动开始执行该函数。因此,通常你不需要显式调用start()方法!
//lambda表达式创建线程,并将线程加入线程池
workers.emplace_back([this] {
//线程循环,不断从任务队列中取出任务并执行
while (true) {
//取出任务
std::function<void()> task;
{
//互斥锁保护:任务队列和线程池停止状态
std::unique_lock<std::mutex> lock(this->queueMutex);
bool empty = this->tasks.empty();
bool stopped = this->stop;
//等待条件变量通知或线程池停止
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
//线程池停止且任务队列为空,退出线程循环
if (this->stop && this->tasks.empty())
return;
//取出任务
task = std::move(this->tasks.front());
//从任务队列中删除任务
this->tasks.pop();
}
//执行任务
task();
}
});
}
}
//定义任务入队函数模板
template<class F, class... Args>
void enqueue(F&& fun, Args&&... args) // 添加任务到任务队列(传递:线程函数、参数)
{
//将任务封装成std::function
auto task = std::bind(std::forward<F>(fun), std::forward<Args>(args)...);
{
//互斥锁保护:任务队列和线程池停止状态
std::unique_lock<std::mutex> lock(queueMutex);
//线程池停止
if (stop) throw std::runtime_error("Enqueue on stopped ThreadPool");
//将任务加入任务队列
tasks.emplace(task);
}
//通知一个线程
condition.notify_one();
}
// 析构函数:等待所有线程完成任务,并停止线程池
~ThreadPool() {
{
//互斥锁保护:线程池停止状态
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
//通知所有线程
condition.notify_all();
//等待所有线程完成任务
for (std::thread& worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;//线程池
std::queue<std::function<void()>> tasks;//任务队列
std::mutex queueMutex;//互斥锁(保护任务队列tasks 和线程池停止状态stop)
std::condition_variable condition;//条件变量(加入任务到任务队列时通知一个线程)
bool stop;//线程池是否停止
};
2、main测试
int main() {
ThreadPool pool(2); // 创建一个包含4个线程的线程池
for (int i = 0; i < 2; ++i) { // 添加2个任务到线程池中执行
//任务入队函数模板:输出任务编号和线程ID
pool.enqueue([i] {
std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl;
});
}
system("pause");
return 0; // 主线程等待所有工作线程完成(在析构函数中处理)
}
3、通过测试,可以看出“线程池执行步骤”:
1、主线程 ThreadPool pool(2); // 创建一个包含4个线程的线程池
ThreadPool构造函数中:向“线程池workers”中添加lambda表达式形式的线程
//1.1线程循环,不断从任务队列中取出任务并执行
while(true)
{
//1.2等待:条件变量通知或线程池停止
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
}
2、主线程中添加任务,到任务队列 函数模板:输出任务编号和线程ID
pool.enqueue([i] {
std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl;
});
//2.1将任务加入任务队列(先用包装器function包装任务)
tasks.emplace(task);
//2.2条件变量通知一个线程(每加入一个任务,就通知一次线程执行任务!!)
condition.notify_one();
//2.3线程循环中的等待(条件变量通知),满足条件,开始向下执行!
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
//2.4从任务队列取出任务
task = std::move(this->tasks.front());
//2.5 执行任务(回调)
task();
=》回调“实际的线程函数”
=》就是添加进来的lambda表达式:
std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl;
3、主线程执行完。
return 0; // 主线程返回
//3.1 调用线程池析构函数
~ThreadPool()
{
//1.设置线程停止标识为true
stop = true;
//2.通知所有线程
condition.notify_all();
=》
//线程循环
while(true)
{
//while循环一直在这等待!(lock满足条件,向下执行!)
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
//当线程任务执行完成:stop = true;
//线程池停止且任务队列为空,退出线程循环
if (this->stop && this->tasks.empty())
return;
}
3.//等待所有线程完成任务
worker.join();
}
4、主线程结束
二、C++线程池2
1、线程池.h头文件
#ifndef _thread_pool_HPP
#define _thread_pool_HPP
#include <vector>
#include <deque>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
//!
//! convenience macro to log with file and line information
//!
#ifdef __SOLA_LOGGING_ENABLED
#define __SOLA_LOG(level, msg) sola::level(msg, __FILE__, __LINE__);
#else
#define __SOLA_LOG(level, msg)
#endif /* __SOLA_LOGGING_ENABLED */
namespace sola {
class logger_iface {
public:
//! ctor
logger_iface(void) = default;
//! dtor
virtual ~logger_iface(void) = default;
//! copy ctor
logger_iface(const logger_iface&) = default;
//! assignment operator
logger_iface& operator=(const logger_iface&) = default;
public:
//!
//! debug logging
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
virtual void debug(const std::string& msg, const std::string& file, std::size_t line) = 0;
//!
//! info logging
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
virtual void info(const std::string& msg, const std::string& file, std::size_t line) = 0;
//!
//! warn logging
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
virtual void warn(const std::string& msg, const std::string& file, std::size_t line) = 0;
//!
//! error logging
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
virtual void error(const std::string& msg, const std::string& file, std::size_t line) = 0;
};
//!
//! default logger class provided by the library
//!
class logger : public logger_iface {
public:
//!
//! log level
//!
enum class log_level {
error = 0,
warn = 1,
info = 2,
debug = 3
};
public:
//! ctor
logger(log_level level = log_level::info);
//! dtor
~logger(void) = default;
//! copy ctor
logger(const logger&) = default;
//! assignment operator
logger& operator=(const logger&) = default;
public:
//!
//! debug logging
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void debug(const std::string& msg, const std::string& file, std::size_t line);
//!
//! info logging
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void info(const std::string& msg, const std::string& file, std::size_t line);
//!
//! warn logging
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void warn(const std::string& msg, const std::string& file, std::size_t line);
//!
//! error logging
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void error(const std::string& msg, const std::string& file, std::size_t line);
private:
//!
//! current log level in use
//!
log_level m_level;
//!
//! mutex used to serialize logs in multithreaded environment
//!
std::mutex m_mutex;
};
//!
//! variable containing the current logger
//! by default, not set (no logs)
//!
extern std::unique_ptr<logger_iface> active_logger;
//!
//! debug logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void debug(const std::string& msg, const std::string& file, std::size_t line);
//!
//! info logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void info(const std::string& msg, const std::string& file, std::size_t line);
//!
//! warn logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void warn(const std::string& msg, const std::string& file, std::size_t line);
//!
//! error logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void error(const std::string& msg, const std::string& file, std::size_t line);
class thread_pool{
public:
//任务包装器
typedef std::function<void()> task_t;
thread_pool(int init_size = 3);
~thread_pool();
void stop();
void add_task(const task_t&); //thread safe; 添加任务
private:
thread_pool(const thread_pool&);//禁止复制拷贝.
const thread_pool& operator=(const thread_pool&);
bool is_started() { return m_is_started; }
void start();//启动线程池
void thread_loop();//线程循环函数
task_t take();//取任务函数
private:
typedef std::vector<std::thread*> threads_t;//线程容器
typedef std::deque<task_t> tasks_t;//任务队列
int m_init_threads_size;//初始线程数量
threads_t m_threads;//线程容器
tasks_t m_tasks;//任务队列
std::mutex m_mutex;//互斥锁
std::condition_variable m_cond;//条件变量
bool m_is_started;//线程池是否启动
};
}
#endif
2、线程池cpp文件
#include <assert.h>
#include <iostream>
#include <sstream>
#include "thread_pool.hpp"
namespace sola {
std::unique_ptr<logger_iface> active_logger = nullptr;
static const char black[] = {0x1b, '[', '1', ';', '3', '0', 'm', 0};
static const char red[] = {0x1b, '[', '1', ';', '3', '1', 'm', 0};
static const char yellow[] = {0x1b, '[', '1', ';', '3', '3', 'm', 0};
static const char blue[] = {0x1b, '[', '1', ';', '3', '4', 'm', 0};
static const char normal[] = {0x1b, '[', '0', ';', '3', '9', 'm', 0};
logger::logger(log_level level)
: m_level(level) {}
void
logger::debug(const std::string& msg, const std::string& file, std::size_t line) {
if (m_level >= log_level::debug) {
std::lock_guard<std::mutex> lock(m_mutex);
std::cout << "[" << black << "DEBUG" << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;
}
}
void
logger::info(const std::string& msg, const std::string& file, std::size_t line) {
if (m_level >= log_level::info) {
std::lock_guard<std::mutex> lock(m_mutex);
std::cout << "[" << blue << "INFO " << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;
}
}
void
logger::warn(const std::string& msg, const std::string& file, std::size_t line) {
if (m_level >= log_level::warn) {
std::lock_guard<std::mutex> lock(m_mutex);
std::cout << "[" << yellow << "WARN " << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;
}
}
void
logger::error(const std::string& msg, const std::string& file, std::size_t line) {
if (m_level >= log_level::error) {
std::lock_guard<std::mutex> lock(m_mutex);
std::cerr << "[" << red << "ERROR" << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;
}
}
void
debug(const std::string& msg, const std::string& file, std::size_t line) {
if (active_logger)
active_logger->debug(msg, file, line);
}
void
info(const std::string& msg, const std::string& file, std::size_t line) {
if (active_logger)
active_logger->info(msg, file, line);
}
void
warn(const std::string& msg, const std::string& file, std::size_t line) {
if (active_logger)
active_logger->warn(msg, file, line);
}
void
error(const std::string& msg, const std::string& file, std::size_t line) {
if (active_logger)
active_logger->error(msg, file, line);
}
static std::string
get_tid(){
std::stringstream tmp;
tmp << std::this_thread::get_id();
return tmp.str();
}
//线程池构造函数
thread_pool::thread_pool(int init_size)
:m_init_threads_size(init_size),
m_mutex(),
m_cond(),
m_is_started(false)
{
//构造函数中自动启动线程池
start();
}
thread_pool::~thread_pool()
{
//如果线程池已经启动,则先停止
if(m_is_started)
{
stop();
}
}
//启动线程池
void thread_pool::start()
{
assert(m_threads.empty());
m_is_started = true;
m_threads.reserve(m_init_threads_size);//预先给线程容器分配空间
for (int i = 0; i < m_init_threads_size; ++i)
{
//创建线程并加入线程容器(线程函数为thread_loop)
m_threads.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));
}
}
//停止线程池
void thread_pool::stop()
{
__SOLA_LOG(debug, "thread_pool::stop() stop.");
{
std::unique_lock<std::mutex> lock(m_mutex);
m_is_started = false;
//通知所有线程停止
m_cond.notify_all();
__SOLA_LOG(debug, "thread_pool::stop() notifyAll().");
}
for (threads_t::iterator it = m_threads.begin(); it != m_threads.end() ; ++it)
{
(*it)->join(); //等待线程退出
delete *it; //释放线程资源
}
m_threads.clear();//清空线程容器
}
//线程池线程函数
void thread_pool::thread_loop()
{
__SOLA_LOG(debug, "thread_pool::threadLoop() tid : " + get_tid() + " start.");
//线程池线程循环
while(m_is_started)
{
//从任务队列中取出一个任务
task_t task = take();
//如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)
if(task)
{
//执行任务:回调实际的任务函数
task();
}
}
__SOLA_LOG(debug, "thread_pool::threadLoop() tid : " + get_tid() + " exit.");
}
//向线程池添加任务
void thread_pool::add_task(const task_t& task)
{
std::unique_lock<std::mutex> lock(m_mutex);
/*while(m_tasks.isFull())
{//when m_tasks have maxsize
cond2.notify_one();
}
*/
//向任务队列中添加任务
m_tasks.push_back(task);
//通知一个线程
m_cond.notify_one();
}
//从线程池取出一个任务
thread_pool::task_t thread_pool::take()
{
std::unique_lock<std::mutex> lock(m_mutex);
//always use a while-loop, due to spurious wakeup
//如果任务队列为空 + 线程池没有停止,则等待
while(m_tasks.empty() && m_is_started)
{
__SOLA_LOG(debug, "thread_pool::take() tid : " + get_tid() + " wait.");
m_cond.wait(lock);
}
__SOLA_LOG(debug, "thread_pool::take() tid : " + get_tid() + " wakeup.");
task_t task;
tasks_t::size_type size = m_tasks.size();
//如果任务队列不为空 + 线程池没有停止,则取出一个任务
if(!m_tasks.empty() && m_is_started)
{
task = m_tasks.front();
m_tasks.pop_front();
assert(size - 1 == m_tasks.size());
/*if (TaskQueueSize_ > 0)
{
cond2.notify_one();
}*/
}
return task;
}
}
3、测试main函数
#include <iostream>
#include <chrono>
#include <condition_variable>
#include "thread_pool.hpp"
std::mutex g_mutex;
void priorityFunc()
{
for (int i = 1; i < 4; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> lock(g_mutex);
std::cout << "priorityFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;
}
}
void testFunc()
{
// loop to print character after a random period of time
for (int i = 1; i < 4; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> lock(g_mutex);
std::cout << "testFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;
}
}
int main()
{
sola::active_logger = std::unique_ptr<sola::logger>(new sola::logger(sola::logger::log_level::debug));
sola::thread_pool thread_pool;
// add tasks to the thread pool
for(int i = 0; i < 2 ; i++)
thread_pool.add_task(testFunc);
system("pause");
return 0;
}
4、执行流程
1、主线程构造函数
thread_pool thread_pool;
{
//构造函数中自动启动线程池
start();
}
start()
{
//创建线程并加入线程容器(线程函数为thread_loop)
m_threads.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));
}
//线程池线程函数
void thread_pool::thread_loop()
{
//线程池线程循环
while(m_is_started)
{
//从任务队列中取出一个任务
task_t task = take();
//如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)
if(task)
{
//执行任务:回调实际的任务函数
task();
}
}
}
//等待:从线程池取出一个任务
thread_pool::task_t thread_pool::take()
{
//如果任务队列为空 + 线程池没有停止,则等待
while(m_tasks.empty() && m_is_started)
{
m_cond.wait(lock);
}
}
2、主线程中添加任务,到任务队列
thread_pool.add_task(testFunc);
thread_pool::add_task(testFunc)
{
//向任务队列中添加任务
m_tasks.push_back(task);
//通知一个线程
m_cond.notify_one();
}
//wait条件满足:从线程池取出一个任务
thread_pool::task_t thread_pool::take()
{
//如果任务队列为空 + 线程池没有停止,则等待
while(m_tasks.empty() && m_is_started)
{
m_cond.wait(lock);
}
//如果任务队列不为空 + 线程池没有停止,则取出一个任务
if(!m_tasks.empty() && m_is_started)
{
task = m_tasks.front();
m_tasks.pop_front();
}
return task;
}
//线程池线程函数:执行线程的任务函数
void thread_pool::thread_loop()
{
//线程池线程循环
while(m_is_started)
{
//从任务队列中取出一个任务
task_t task = take();
//如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)
if(task)
{
//执行任务:回调实际的任务函数
task();
}
}
}
//task()回调任务函数
void testFunc()
{
}
3、主线程执行完。
return 0; // 主线程等待所有工作线程完成(在析构函数中处理)
//3.1 调用线程池析构函数
~ThreadPool()
{
//1.调用stop
stop()
{
//1.1通知所有线程停止
m_cond.notify_all();
=》
//1.2使得线程循环退出!
while(true)
{
//while循环一直在这等待!(lock满足条件,向下执行!)
//如果任务队列为空 + 线程池没有停止,则等待
while(m_tasks.empty() && m_is_started)
{
m_cond.wait(lock);
}
//当线程任务执行完成:stop = true;
//线程池停止且任务队列为空,退出线程循环
//如果任务队列不为空 + 线程池没有停止,则取出一个任务
if(!m_tasks.empty() && m_is_started)
{
task = m_tasks.front();
m_tasks.pop_front();
}
return task;
}
1.3//等待所有线程完成任务
(*it)->join(); //等待线程退出
}
4、主线程结束