C++线程池执行步骤分析,总结线程池流程

发布于:2025-07-21 ⋅ 阅读:(17) ⋅ 点赞:(0)

线程池流程总结:

1、构造函数中创建线程,并添加到线程池(构造函数返回时,线程自动启动,并停在等待wait:从线程池取出一个任务处);
2、主线程中添加任务,到任务队列。并用“条件变量”通知一个线程,从线程池取出一个任务;
3、取出任务后,执行线程的任务函数 =》回调添加的“实际的线程函数”;
4、主线程执行完,return返回 =》调用线程池析构函数;
5、“条件变量”通知所有线程停止,使得线程循环退出,并等待所有线程完成任务;
6、主线程main结束。

一、C++线程池1

1、用c++封装线程池

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>

using namespace std;

class ThreadPool {
public:
    ThreadPool(size_t threads) : stop(false) 
    {
        for (size_t i = 0; i < threads; ++i) 
        {
            //一、lambda表达式返回值是一个线程对象,为什么没有看见创建线程的语句?
            // thread是什么时候创建的呢?
            // 
            //二、lambda表达式什么时候执行?
            // 1.当线程池的构造函数返回时,线程池中的线程才开始运行
            // 2.当你创建一个std::thread对象并传入一个函数时(对象的实例化),
            // 这个线程会自动开始执行该函数。因此,通常你不需要显式调用start()方法!
            
            //lambda表达式创建线程,并将线程加入线程池
            workers.emplace_back([this] {
                //线程循环,不断从任务队列中取出任务并执行
                while (true) {
                    //取出任务
                    std::function<void()> task;
                    {
                        //互斥锁保护:任务队列和线程池停止状态
                        std::unique_lock<std::mutex> lock(this->queueMutex);
                        
                        bool empty = this->tasks.empty();
                        bool stopped = this->stop;
                        //等待条件变量通知或线程池停止
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        
                        //线程池停止且任务队列为空,退出线程循环
                        if (this->stop && this->tasks.empty()) 
                            return;
                        //取出任务
                        task = std::move(this->tasks.front());
                        //从任务队列中删除任务
                        this->tasks.pop();
                    }

                    //执行任务
                    task();
                }
            });
        }
    }

    //定义任务入队函数模板
    template<class F, class... Args>
    void enqueue(F&& fun, Args&&... args) // 添加任务到任务队列(传递:线程函数、参数)
    {
        //将任务封装成std::function
        auto task = std::bind(std::forward<F>(fun), std::forward<Args>(args)...);
        {
            //互斥锁保护:任务队列和线程池停止状态
            std::unique_lock<std::mutex> lock(queueMutex);
            //线程池停止
            if (stop) throw std::runtime_error("Enqueue on stopped ThreadPool");
            //将任务加入任务队列
            tasks.emplace(task);
        }
        //通知一个线程
        condition.notify_one();
    }

    // 析构函数:等待所有线程完成任务,并停止线程池
    ~ThreadPool() {
        {
            //互斥锁保护:线程池停止状态
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }
        //通知所有线程
        condition.notify_all();
        //等待所有线程完成任务
        for (std::thread& worker : workers) {
            worker.join();
        }
    }
private:
    std::vector<std::thread> workers;//线程池
    std::queue<std::function<void()>> tasks;//任务队列
    std::mutex queueMutex;//互斥锁(保护任务队列tasks 和线程池停止状态stop)
    std::condition_variable condition;//条件变量(加入任务到任务队列时通知一个线程)
    bool stop;//线程池是否停止
};

2、main测试

int main() {
    ThreadPool pool(2); // 创建一个包含4个线程的线程池

    for (int i = 0; i < 2; ++i) { // 添加2个任务到线程池中执行
        //任务入队函数模板:输出任务编号和线程ID
        pool.enqueue([i] { 
            std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl; 
            });
    }

    system("pause");
    return 0; // 主线程等待所有工作线程完成(在析构函数中处理)
}

3、通过测试,可以看出“线程池执行步骤”:

1、主线程 ThreadPool pool(2); // 创建一个包含4个线程的线程池
ThreadPool构造函数中:向“线程池workers”中添加lambda表达式形式的线程
//1.1线程循环,不断从任务队列中取出任务并执行
while(true)
{
    //1.2等待:条件变量通知或线程池停止
    this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
}

2、主线程中添加任务,到任务队列 函数模板:输出任务编号和线程ID
pool.enqueue([i] { 
    std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl; 
    });
    
//2.1将任务加入任务队列(先用包装器function包装任务)
tasks.emplace(task);

//2.2条件变量通知一个线程(每加入一个任务,就通知一次线程执行任务!!)
condition.notify_one();

//2.3线程循环中的等待(条件变量通知),满足条件,开始向下执行!
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });

//2.4从任务队列取出任务
task = std::move(this->tasks.front());

 //2.5 执行任务(回调)
 task();
 =》回调“实际的线程函数”
 =》就是添加进来的lambda表达式:
 std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl; 

3、主线程执行完。
 return 0; // 主线程返回
 //3.1 调用线程池析构函数
~ThreadPool() 
{
    //1.设置线程停止标识为true
	stop = true;
	//2.通知所有线程
	condition.notify_all();
	=》
		//线程循环
		while(true)
		{
			//while循环一直在这等待!(lock满足条件,向下执行!)
			this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
			
			//当线程任务执行完成:stop = true;
			//线程池停止且任务队列为空,退出线程循环
			if (this->stop && this->tasks.empty()) 
				return;
		}
	3.//等待所有线程完成任务
	worker.join();
}

4、主线程结束

二、C++线程池2

1、线程池.h头文件

#ifndef _thread_pool_HPP
#define _thread_pool_HPP

#include <vector>
#include <deque>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>

//!
//! convenience macro to log with file and line information
//!
#ifdef __SOLA_LOGGING_ENABLED
#define __SOLA_LOG(level, msg) sola::level(msg, __FILE__, __LINE__);
#else
#define __SOLA_LOG(level, msg)
#endif /* __SOLA_LOGGING_ENABLED */

namespace sola {

class logger_iface {
public:
  //! ctor
  logger_iface(void) = default;
  //! dtor
  virtual ~logger_iface(void) = default;

  //! copy ctor
  logger_iface(const logger_iface&) = default;
  //! assignment operator
  logger_iface& operator=(const logger_iface&) = default;

public:
  //!
  //! debug logging
  //!
  //! \param msg message to be logged
  //! \param file file from which the message is coming
  //! \param line line in the file of the message
  //!
  virtual void debug(const std::string& msg, const std::string& file, std::size_t line) = 0;

  //!
  //! info logging
  //!
  //! \param msg message to be logged
  //! \param file file from which the message is coming
  //! \param line line in the file of the message
  //!
  virtual void info(const std::string& msg, const std::string& file, std::size_t line) = 0;

  //!
  //! warn logging
  //!
  //! \param msg message to be logged
  //! \param file file from which the message is coming
  //! \param line line in the file of the message
  //!
  virtual void warn(const std::string& msg, const std::string& file, std::size_t line) = 0;

  //!
  //! error logging
  //!
  //! \param msg message to be logged
  //! \param file file from which the message is coming
  //! \param line line in the file of the message
  //!
  virtual void error(const std::string& msg, const std::string& file, std::size_t line) = 0;
};

//!
//! default logger class provided by the library
//!
class logger : public logger_iface {
public:
  //!
  //! log level
  //!
  enum class log_level {
    error = 0,
    warn  = 1,
    info  = 2,
    debug = 3
  };

public:
  //! ctor
  logger(log_level level = log_level::info);
  //! dtor
  ~logger(void) = default;

  //! copy ctor
  logger(const logger&) = default;
  //! assignment operator
  logger& operator=(const logger&) = default;

public:
  //!
  //! debug logging
  //!
  //! \param msg message to be logged
  //! \param file file from which the message is coming
  //! \param line line in the file of the message
  //!
  void debug(const std::string& msg, const std::string& file, std::size_t line);

  //!
  //! info logging
  //!
  //! \param msg message to be logged
  //! \param file file from which the message is coming
  //! \param line line in the file of the message
  //!
  void info(const std::string& msg, const std::string& file, std::size_t line);

  //!
  //! warn logging
  //!
  //! \param msg message to be logged
  //! \param file file from which the message is coming
  //! \param line line in the file of the message
  //!
  void warn(const std::string& msg, const std::string& file, std::size_t line);

  //!
  //! error logging
  //!
  //! \param msg message to be logged
  //! \param file file from which the message is coming
  //! \param line line in the file of the message
  //!
  void error(const std::string& msg, const std::string& file, std::size_t line);

private:
  //!
  //! current log level in use
  //!
  log_level m_level;

  //!
  //! mutex used to serialize logs in multithreaded environment
  //!
  std::mutex m_mutex;
};

//!
//! variable containing the current logger
//! by default, not set (no logs)
//!
extern std::unique_ptr<logger_iface> active_logger;

//!
//! debug logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void debug(const std::string& msg, const std::string& file, std::size_t line);

//!
//! info logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void info(const std::string& msg, const std::string& file, std::size_t line);

//!
//! warn logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void warn(const std::string& msg, const std::string& file, std::size_t line);

//!
//! error logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void error(const std::string& msg, const std::string& file, std::size_t line);


class thread_pool{
    public:
      //任务包装器
      typedef std::function<void()> task_t;

      thread_pool(int init_size = 3);
      ~thread_pool();

      void stop();
      void add_task(const task_t&);  //thread safe; 添加任务

    private:
      thread_pool(const thread_pool&);//禁止复制拷贝.
      const thread_pool& operator=(const thread_pool&);
  
      bool is_started() { return m_is_started; }
      void start();//启动线程池

      void thread_loop();//线程循环函数
      task_t take();//取任务函数

private:
      typedef std::vector<std::thread*> threads_t;//线程容器
      typedef std::deque<task_t> tasks_t;//任务队列

      int m_init_threads_size;//初始线程数量

      threads_t m_threads;//线程容器
      tasks_t m_tasks;//任务队列

      std::mutex m_mutex;//互斥锁
      std::condition_variable m_cond;//条件变量
      bool m_is_started;//线程池是否启动
    };

}
#endif

2、线程池cpp文件

#include <assert.h>
#include <iostream>
#include <sstream>
#include "thread_pool.hpp"

namespace sola {

    std::unique_ptr<logger_iface> active_logger = nullptr;

    static const char black[]  = {0x1b, '[', '1', ';', '3', '0', 'm', 0};
    static const char red[]    = {0x1b, '[', '1', ';', '3', '1', 'm', 0};
    static const char yellow[] = {0x1b, '[', '1', ';', '3', '3', 'm', 0};
    static const char blue[]   = {0x1b, '[', '1', ';', '3', '4', 'm', 0};
    static const char normal[] = {0x1b, '[', '0', ';', '3', '9', 'm', 0};

    logger::logger(log_level level)
    : m_level(level) {}

    void
    logger::debug(const std::string& msg, const std::string& file, std::size_t line) {
      if (m_level >= log_level::debug) {
        std::lock_guard<std::mutex> lock(m_mutex);
        std::cout << "[" << black << "DEBUG" << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;
      }
    }

    void
    logger::info(const std::string& msg, const std::string& file, std::size_t line) {
      if (m_level >= log_level::info) {
        std::lock_guard<std::mutex> lock(m_mutex);
        std::cout << "[" << blue << "INFO " << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;
      }
    }

    void
    logger::warn(const std::string& msg, const std::string& file, std::size_t line) {
      if (m_level >= log_level::warn) {
        std::lock_guard<std::mutex> lock(m_mutex);
        std::cout << "[" << yellow << "WARN " << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;
      }
    }

    void
    logger::error(const std::string& msg, const std::string& file, std::size_t line) {
      if (m_level >= log_level::error) {
        std::lock_guard<std::mutex> lock(m_mutex);
        std::cerr << "[" << red << "ERROR" << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;
      }
    }

    void
    debug(const std::string& msg, const std::string& file, std::size_t line) {
      if (active_logger)
        active_logger->debug(msg, file, line);
    }

    void
    info(const std::string& msg, const std::string& file, std::size_t line) {
      if (active_logger)
        active_logger->info(msg, file, line);
    }

    void
    warn(const std::string& msg, const std::string& file, std::size_t line) {
      if (active_logger)
        active_logger->warn(msg, file, line);
    }

    void
    error(const std::string& msg, const std::string& file, std::size_t line) {
      if (active_logger)
        active_logger->error(msg, file, line);
    }

    static std::string
    get_tid(){
      std::stringstream tmp;
      tmp << std::this_thread::get_id();
      return tmp.str();
    }

    //线程池构造函数
    thread_pool::thread_pool(int init_size)
      :m_init_threads_size(init_size),
      m_mutex(),
      m_cond(),
      m_is_started(false)
    {
        //构造函数中自动启动线程池
        start();
    }

    thread_pool::~thread_pool()
    {
        //如果线程池已经启动,则先停止
        if(m_is_started)
        {
            stop();
        }
    }

    //启动线程池
    void thread_pool::start()
    {
        assert(m_threads.empty());
        m_is_started = true;
        m_threads.reserve(m_init_threads_size);//预先给线程容器分配空间
        for (int i = 0; i < m_init_threads_size; ++i)
        {
            //创建线程并加入线程容器(线程函数为thread_loop)
            m_threads.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));
        }
    }

    //停止线程池
    void thread_pool::stop()
    {
        __SOLA_LOG(debug, "thread_pool::stop() stop.");
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_is_started = false;
            //通知所有线程停止
            m_cond.notify_all();
            __SOLA_LOG(debug, "thread_pool::stop() notifyAll().");
        }

        for (threads_t::iterator it = m_threads.begin(); it != m_threads.end() ; ++it)
        {
            (*it)->join(); //等待线程退出
            delete *it; //释放线程资源
        }
        m_threads.clear();//清空线程容器
    }

    //线程池线程函数
    void thread_pool::thread_loop()
    {
        __SOLA_LOG(debug, "thread_pool::threadLoop() tid : " + get_tid() + " start.");
        //线程池线程循环
        while(m_is_started)
        {
            //从任务队列中取出一个任务
            task_t task = take();
            //如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)
            if(task)
            {
                //执行任务:回调实际的任务函数
                task();
            }
        }
        __SOLA_LOG(debug, "thread_pool::threadLoop() tid : " + get_tid() + " exit.");
    }

    //向线程池添加任务
    void thread_pool::add_task(const task_t& task)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        /*while(m_tasks.isFull())
        {//when m_tasks have maxsize
            cond2.notify_one();
        }
        */
        //向任务队列中添加任务
        m_tasks.push_back(task);
        //通知一个线程
        m_cond.notify_one();
    }

    //从线程池取出一个任务
    thread_pool::task_t thread_pool::take()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        //always use a while-loop, due to spurious wakeup
        //如果任务队列为空 + 线程池没有停止,则等待
        while(m_tasks.empty() && m_is_started)
        {
            __SOLA_LOG(debug, "thread_pool::take() tid : " + get_tid() + " wait.");
            m_cond.wait(lock);
        }

        __SOLA_LOG(debug, "thread_pool::take() tid : " + get_tid() + " wakeup.");

        task_t task;
        tasks_t::size_type size = m_tasks.size();
        //如果任务队列不为空 + 线程池没有停止,则取出一个任务
        if(!m_tasks.empty() && m_is_started)
        {
            task = m_tasks.front();
            m_tasks.pop_front();
            assert(size - 1 == m_tasks.size());
            /*if (TaskQueueSize_ > 0)
            {
                cond2.notify_one();
            }*/
        }

        return task;
    }
}

3、测试main函数

#include <iostream>
#include <chrono>
#include <condition_variable>
#include "thread_pool.hpp"

std::mutex g_mutex;

void priorityFunc()
{
  for (int i = 1; i < 4; ++i)
  {
      std::this_thread::sleep_for(std::chrono::seconds(1));
      std::lock_guard<std::mutex> lock(g_mutex);
      std::cout << "priorityFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;
  }

}

void testFunc()
{
    // loop to print character after a random period of time
    for (int i = 1; i < 4; ++i)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::lock_guard<std::mutex> lock(g_mutex);
        std::cout << "testFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;
    }

}


int main()
{
    sola::active_logger = std::unique_ptr<sola::logger>(new sola::logger(sola::logger::log_level::debug));

    sola::thread_pool thread_pool;

    // add tasks to the thread pool
    for(int i = 0; i < 2 ; i++)
        thread_pool.add_task(testFunc);

    system("pause");
    return 0;
}

4、执行流程

1、主线程构造函数
thread_pool thread_pool;
{
	//构造函数中自动启动线程池
	start();
}

start()
{
	//创建线程并加入线程容器(线程函数为thread_loop)
	m_threads.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));
}

//线程池线程函数
void thread_pool::thread_loop()
{
	//线程池线程循环
	while(m_is_started)
	{
		//从任务队列中取出一个任务
		task_t task = take();
		//如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)
		if(task)
		{
			//执行任务:回调实际的任务函数
			task();
		}
	}
}

//等待:从线程池取出一个任务
thread_pool::task_t thread_pool::take()
{
    //如果任务队列为空 + 线程池没有停止,则等待
    while(m_tasks.empty() && m_is_started)
    {
        m_cond.wait(lock);
    }
}

2、主线程中添加任务,到任务队列
thread_pool.add_task(testFunc);

thread_pool::add_task(testFunc)
{
	//向任务队列中添加任务
	m_tasks.push_back(task);
	//通知一个线程
	m_cond.notify_one();
}

//wait条件满足:从线程池取出一个任务
thread_pool::task_t thread_pool::take()
{
    //如果任务队列为空 + 线程池没有停止,则等待
    while(m_tasks.empty() && m_is_started)
    {
        m_cond.wait(lock);
    }

	//如果任务队列不为空 + 线程池没有停止,则取出一个任务
	if(!m_tasks.empty() && m_is_started)
	{
		task = m_tasks.front();
		m_tasks.pop_front();
	}
	return task;
}

//线程池线程函数:执行线程的任务函数
void thread_pool::thread_loop()
{
	//线程池线程循环
	while(m_is_started)
	{
		//从任务队列中取出一个任务
		task_t task = take();
		//如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)
		if(task)
		{
			//执行任务:回调实际的任务函数
			task();
		}
	}
}

//task()回调任务函数
void testFunc()
{
}

3、主线程执行完。
 return 0; // 主线程等待所有工作线程完成(在析构函数中处理)
 //3.1 调用线程池析构函数
~ThreadPool() 
{
//1.调用stop
stop()
{
	//1.1通知所有线程停止
	m_cond.notify_all();

	=》
	//1.2使得线程循环退出!
	while(true)
	{
		//while循环一直在这等待!(lock满足条件,向下执行!)
		//如果任务队列为空 + 线程池没有停止,则等待
		while(m_tasks.empty() && m_is_started)
		{
			m_cond.wait(lock);
		}
		
		//当线程任务执行完成:stop = true;
		//线程池停止且任务队列为空,退出线程循环
		//如果任务队列不为空 + 线程池没有停止,则取出一个任务
		if(!m_tasks.empty() && m_is_started)
		{
			task = m_tasks.front();
			m_tasks.pop_front();
		}
		return task;
	}
	1.3//等待所有线程完成任务
	 (*it)->join(); //等待线程退出
}

4、主线程结束


网站公告

今日签到

点亮在社区的每一天
去签到