《线程池最终版:使用可变参模板和future优化重构设计》

发布于:2025-03-21 ⋅ 阅读:(17) ⋅ 点赞:(0)

之前设计的线程池,我们设计了很多复杂的东西,比如线程池里提供了一个抽象的Task基类,用户使用的时候,继承基类,再重写run方法,如果要传参数,可以通过派生类的构造函数传,又设计了Result类,Any类,Semaphore类等。

事实上,C++14、C++17提供了Any和Semaphore很多高级的用法,用起来非常简单。

之前设计的线程池提交任务要使用智能指针,定义派生类,还需要传参等等。如何让线程池提交任务更加方便?

int sum1(int a, int b)
{
	return a + b;
}
int sum2(int a, int b, int c)
{
	return a + b + c;
}

int main()
{
	thread t1(sum1, 10, 20);
	thread t2(sum2, 1, 2, 3);
    t1.join();
    t2.join();
	return 0;
}

是不是直接可以将线程要执行的任务,传给线程呢?

/*
如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
  pool.submitTask(sum2,1,2,3);
  submitTask:可变参模板编程
*/

就不需要设计Mytask继承基类Task,重写Run方法,传参的时候,还需要Mytask定义成员变量,构造函数传参,不需要这么麻烦了。直接将用户要执行的方法,作为参数传递给线程,同时使用可变参模板传参数,不受参数的限制。

我们自己写Result及相关类型,有点复杂。

C++11,提供了线程库,thread,可以将任务作为参数传递给线程,但是没办法接收返回值。可以使用packaged_task(类似于function函数对象)来包装一下任务。

#include<iostream>
#include<functional>
#include<thread>
#include<future>
using namespace std;

/*
 如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
  pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
  C++11 线程库 thread packaged_task(function函数对象)
  使用future代替Result,节省线程池代码
*/

int sum1(int a, int b)
{
	return a + b;
}
int sum2(int a, int b, int c)
{
	return a + b + c;
}

int main()
{
	packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
	// future 类似于我们自己实现的 Result
	future<int> res = task.get_future(); // 返回一个future类型的对象
	task(10, 20);

	cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法


	//thread t1(sum1, 10, 20);
	//thread t2(sum2, 1, 2, 3);
	//t1.join();
	//t2.join();
	return 0;
}

packgaed_task与function包装函数不同的是,packaged_task提供了get_future方法可以得到future类型的返回对象。

当任务比较耗时的时候,res.get()方法会阻塞。这也涉及到线程间通信的安全问题,我们自己设计的线程池又实现了Semaphore信号量的wait和post。

我们自己实现的Result类型接收任务对象。

我们自己实现的Result的setVal方法和get方法。

我们自己实现的用于线程间通信安全的Semaphore信号量的wait和post方法。

其实我们自己所实现的就是类似于future的机制。

看到future也有类似setVal和getVal的方法还有信号量的wait方法。既然C++11库中提供了future(异步编程机制),我们直接使用future来代替Result就可以了,节省线程池代码。

future机制

std::future提供了一种在异步操作完成后获取其结果的方式。它可以与多种异步执行机制配合使用,比std::async、std::thread、std::packaged_task等。通过std::future,主线程可以暂停执行,等待异步任务完成并获取其返回值,从而实现异步操作与主线程之间的同步。



#include<iostream>
#include<functional>
#include<thread>
#include<future>
using namespace std;

/*
 如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
  pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
  C++11 线程库 thread packaged_task(function函数对象)
*/

int sum1(int a, int b)
{
	return a + b;
}
int sum2(int a, int b, int c)
{
	return a + b + c;
}

int main()
{
	packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
	// future 类似于我们自己实现的 Result
	future<int> res = task.get_future(); // 返回一个future类型的对象
	// task(10, 20);
	thread t1(std::move(task), 10, 20);
	t1.detach();
	cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法


	//thread t1(sum1, 10, 20);
	//thread t2(sum2, 1, 2, 3);
	//t1.join();
	//t2.join();
	return 0;
}

基于可变参数模板以及future,下面我们来优化重构线程池,这次线程池设计成开源的。

线程池项目最终版

threadpool.hpp:

#pragma once

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include<iostream>
#include<vector>
#include<queue>
#include<memory>
#include<atomic>
#include<mutex>
#include<condition_variable>
#include<functional>
#include<unordered_map>
#include<thread>
#include<future>
using namespace std;

// 任务队列数量的上限阈值
const int TASK_MAX_THRESHHOLD = INT32_MAX;
// 线程数量的上限阈值
const int THREAD_MAX_THRESHHOLD = 1024;
// 线程最大空闲时间
const int THREAD_MAX_IDLE_TIME = 10; // 单位:秒

// 线程池支持的模式
enum PoolMode
{
	MODE_FIXED, // 1.fixed模式  线程数量固定
	MODE_CACHED,// 2.cached模式 线程数量可动态增长
};

// 线程类型
class Thread
{
public:
	// 线程函数对象类型
	using ThreadFunc = std::function<void(int)>;

	// 线程构造
	Thread(ThreadFunc func)
		:func_(func)
		, threadId_(generateId_++)
	{}
	// 线程析构
	~Thread() = default;

	// 线程启动
	void start()
	{
		// 创建一个线程来执行线程函数
		std::thread t(func_, threadId_);
		t.detach();// 设置线程分离
	}

	// 获取线程id
	int getId() const
	{
		return threadId_;
	}
private:
	static int generateId_;
	ThreadFunc func_;
	int threadId_; // 保存线程id
};


// 线程池类型
class ThreadPool
{
public:
	// 线程池构造
	ThreadPool()
		: initThreadSize_(0)
		, taskSize_(0)
		, idleThreadSize_(0)
		, curThreadSize_(0)
		, taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD)
		, threadSizeHold_(THREAD_MAX_THRESHHOLD)
		, poolMode_(PoolMode::MODE_FIXED)
		, isPoolRunning_(false)
	{}

	// 线程池析构
	 ~ThreadPool()
	 {
		isPoolRunning_ = false;
		// 等待线程池里面所有的线程返回 
		std::unique_lock<std::mutex> lock(taskQueMtx_);
		notEmpty_.notify_all();
		// 唤醒所有阻塞等待的线程
		exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
	 }

	// 设置线程池的工作模式
	void setMode(PoolMode mode)
	{
		if (checkRunningState())
			return;
		poolMode_ = mode;
	}

	// 设置task任务队列的上线阈值
	void setTaskQueMaxThreshHold(int threshhold)
	{
		if (checkRunningState())
			return;
		taskQueMaxThreshHold_ = threshhold;
	}

	// 设置线程池cached模式下线程阈值
	void setThreadSizeThreshHold(int threshhold)
	{
		if (checkRunningState())
			return;
		if (poolMode_ == PoolMode::MODE_CACHED)
		{
			threadSizeHold_ = threshhold;
		}

	}

	// 给线程池提交任务
	// 使用可变参模板编程,让submitTask可以接收任意任务函数和任意数量的参数
	template<typename Func,typename... Args>
	auto submitTask(Func&& func, Args&&... args) -> std::future<decltype(func(args...))> // 推导submitTask的返回值
	{
		// 打包任务,放入任务队列里面
		using RType = decltype(func(args...));
		auto task = std::make_shared<std::packaged_task<RType()>> (std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
		std::future<RType> result = task->get_future();

		// 获取锁
		std::unique_lock<std::mutex> lock(taskQueMtx_);

		// 用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败,返回
		if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreshHold_; }))
		{
			// 表示notFull_等待1s钟,条件仍然不满足
			std::cerr << "task queue is full, submit task faile." << std::endl;
			auto task = std::make_shared<std::packaged_task<RType()>>([]()->RType{return RType(); });
			return task->get_future();
		}

		// 如果有空余,把任务放进任务队列
		// taskQue_.emplace(sp);
		// using Task = std::function<void()>
		// 不能直接将task放进任务队列,因为task是有返回值的,返回值类型不同。
		// 我们任务队列设计的时候,接收的任务对象就是返回值为void的任务。
		// 但是我们怎么把带返回值的任务传进去呢?
		// 增加一个中间层,返回值是void不带参数的Lambda表达式函数对象,将实际要执行的任务封装起来即可。
		taskQue_.emplace([task]() {(*task)(); });

		taskSize_++;

		// 因为新放了任务,任务队列肯定不为空了,在notEmpty_上进行通知,赶快分配线程执行任务吧
		notEmpty_.notify_all();

		// cached模式 任务处理比较紧急 场景:小而快的任务 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程出来
		if (poolMode_ == PoolMode::MODE_CACHED
			&& taskSize_ > idleThreadSize_
			&& curThreadSize_ < threadSizeHold_)
		{
			// 这行打印用于测试
			std::cout << ">>> create new thread..." << std::endl;

			// 创建新线程
			auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
			// threads_.emplace_back(std::move(ptr));
			// threads_[curThreadSize_]->start();
			int threadId = ptr->getId();
			threads_.emplace(threadId, std::move(ptr));
			// 启动线程
			threads_[threadId]->start();
			// 修改线程数量相关变量
			curThreadSize_++;
			idleThreadSize_++;
		}

		// 返回任务的Result对象
		// return task->getResult();
		return result;
	} 


	// 开启线程池
	void start(int initThreadSize = std::thread::hardware_concurrency())
	{
		// 设置线程池的运行状态
		isPoolRunning_ = true;

		// 记录初始线程个数
		initThreadSize_ = initThreadSize;
		// 记录线程池里面线程的总数量
		curThreadSize_ = initThreadSize;

		// 创建线程对象  std::vector<Thread*> threads_;
		for (int i = 0; i < initThreadSize_; i++)
		{
			// 创建thread线程对象的时候,把线程函数给到thread线程对象
			auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
			int threadId = ptr->getId();
			threads_.emplace(threadId, std::move(ptr));
			// threads_.emplace_back(std::move(ptr));
		}

		// 启动所有线程	 std::vector<Thread*> threads_;
		for (int i = 0; i < initThreadSize_; i++)
		{
			threads_[i]->start();// 需要去执行一个线程函数
			idleThreadSize_++;
		}
	}
	ThreadPool(const ThreadPool&) = delete;
	ThreadPool& operator=(const ThreadPool&) = delete;
private:
	// 线程函数
	void threadFunc(int threadid)
	{
		auto lastTime = std::chrono::high_resolution_clock().now();
		// while (isPoolRunning_)
		for (;;)
		{
			Task task;
			{
				// 先获取锁
				std::unique_lock<std::mutex> lock(taskQueMtx_);

				// 下面测试用的
				std::cout << "tid: " << std::this_thread::get_id() << "尝试获取任务..." << std::endl;

				// cached模式下,有可能已经创建了很多线程,但是空闲线程如果超过了60s,就应该把多余的线 程
				// 结束回收掉(超过initThreadSize_的线程要进行回收)
				// 当前时间 - 上一次线程执行完的时间 > 60s
				// 每秒钟返回一次    怎么区分超时返回,还是有任务待执行返回?

				// 锁+双重判断
				while (taskQue_.size() == 0)
				{
					if (!isPoolRunning_)
					{
						// 2.线程正在执行任务 线程池要结束 回收线程资源
						threads_.erase(threadid);
						std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;
						// 唤醒线程池
						exitCond_.notify_all();
						return;
					}
					if (poolMode_ == PoolMode::MODE_CACHED)
					{
						// 条件变量 超时返回了
						if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1)))
						{
							// 当前时间
							auto now = std::chrono::high_resolution_clock().now();
							// 空闲时间
							auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
							if (dur.count() >= THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_)
							{
								// 开始回收当前线程
								// 记录线程数量的相关变量的值修改
								// 把线程对象从线程列表删除
								threads_.erase(threadid);
								curThreadSize_--;
								idleThreadSize_--;
								std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;
								return;
							}
						}
					}
					else
					{
						// 等待任务队列不为空  notEmpty条件 
						notEmpty_.wait(lock);
					}
				}

				idleThreadSize_--;

				// 下面测试用的
				std::cout << "tid: " << std::this_thread::get_id() << "获取任务成功..." << std::endl;

				// 从任务队列获取任务
				task = taskQue_.front();
				taskQue_.pop();
				taskSize_--;

				// 如果依然有剩余任务,继续通知其他线程来获取执行任务,提高了多线程同时并发获取任务、处理任务的能力
				if (taskQue_.size() > 0)
				{
					notEmpty_.notify_all();
				}

				// 取出一个任务之后,进行通知,通知其他用户可以继续提交任务
				notFull_.notify_all();

			}// 一个线程取完任务之后,此刻就应该把锁释放掉,让多线程可以同时并发获取任务、处理任务

			// 当前线程负责执行该任务
			if (task != nullptr)
			{
				// task->run(); // 执行任务,把任务的返回值通过setVal方法给Result
				// task->exec();
				task(); // 执行function<void()>
			}

			idleThreadSize_++;
			lastTime = std::chrono::high_resolution_clock().now(); // 更新线程执行完任务的时间
		}
	}

	// 检查pool的运行状态
	bool checkRunningState() const
	{
		return isPoolRunning_;
	}
private:
	// std::vector<std::unique_ptr<Thread>> threads_;  // 线程列表
	std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程列表

	int initThreadSize_;  // 初始的线程数量
	int threadSizeHold_;  // 线程数量上限阈值
	std::atomic_int curThreadSize_; // 记录当前线程池里面的线程总数量
	std::atomic_int idleThreadSize_; // 记录空闲线程的数量


	// Task任务 =》函数对象
	// 放进任务队列里的任务返回值设计成void,因为不确定任务的返回值。参数可以不传,可以直接绑定给要执行的函数。 
	using Task = std::function<void()>;
	std::queue<Task> taskQue_; // 不需要使用智能指针了,因为之前是用户传入的任务,生命周期不确定。现在这个任务是我们自己线程池封装维护的。

	std::atomic_int taskSize_; // 任务的数量
	int taskQueMaxThreshHold_; // 任务队列数量的上线阈值

	std::mutex taskQueMtx_; // 保证任务队列的线程安全
	std::condition_variable notFull_; // 表示任务队列不满
	std::condition_variable notEmpty_; // 表示任务队列不空
	std::condition_variable exitCond_; // 等待线程资源全部回收

	PoolMode poolMode_; // 当前线程池的工作模式
	std::atomic_bool isPoolRunning_; // 表示线程池的启动状态

};




#endif

 线程池项目-最终版.cpp:



#include<iostream>
#include<functional>
#include<thread>
#include<future>
using namespace std;

#include"threadpool.hpp"

int Thread::generateId_ = 0;

/*
 如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
  pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
  C++11 线程库 thread packaged_task(function函数对象)
*/

int sum1(int a, int b)
{
	return a + b;
}
int sum2(int a, int b, int c)
{
	return a + b + c;
}

int main()
{
	ThreadPool pool;
	pool.start(4);

	future<int> res1 = pool.submitTask(sum1, 1, 2);
	future<int> res2 = pool.submitTask(sum2, 1, 2, 3);
	future<int> res3 = pool.submitTask([](int begin, int end)->int {
			int sum = 0;
			for (int i = begin; i <= end; i++)
				sum += i;
			return sum;
		}, 1, 100);

	cout << res1.get() << endl;
	cout << res2.get() << endl;
	cout << res3.get() << endl;


	//packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
	 future 类似于我们自己实现的 Result
	//future<int> res = task.get_future(); // 返回一个future类型的对象
	 task(10, 20);
	//thread t1(std::move(task), 10, 20);
	//t1.detach();
	//cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法


	//thread t1(sum1, 10, 20);
	//thread t2(sum2, 1, 2, 3);
	//t1.join();
	//t2.join();
	return 0;
}

 再来测试一下cached模式。



#include<iostream>
#include<functional>
#include<thread>
#include<future>
#include<chrono>
using namespace std;

#include"threadpool.hpp"

int Thread::generateId_ = 0;

/*
 如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
  pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
  C++11 线程库 thread packaged_task(function函数对象)
*/

int sum1(int a, int b)
{
	this_thread::sleep_for(std::chrono::seconds(2));
	return a + b;
}
int sum2(int a, int b, int c)
{
	this_thread::sleep_for(std::chrono::seconds(2));
	return a + b + c;
}

int main()
{
	ThreadPool pool;
	pool.setMode(PoolMode::MODE_CACHED);
	pool.start(2);

	future<int> res1 = pool.submitTask(sum1, 1, 2);
	future<int> res2 = pool.submitTask(sum2, 1, 2, 3);
	future<int> res3 = pool.submitTask([](int begin, int end)->int {
			int sum = 0;
			for (int i = begin; i <= end; i++)
				sum += i;
			return sum;
		}, 1, 100);
	
	cout << res1.get() << endl;
	cout << res2.get() << endl;
	cout << res3.get() << endl;


	//packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
	 future 类似于我们自己实现的 Result
	//future<int> res = task.get_future(); // 返回一个future类型的对象
	 task(10, 20);
	//thread t1(std::move(task), 10, 20);
	//t1.detach();
	//cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法


	//thread t1(sum1, 10, 20);
	//thread t2(sum2, 1, 2, 3);
	//t1.join();
	//t2.join();
	return 0;
}

再来测试一次任务提交失败情况。 

 



#include<iostream>
#include<functional>
#include<thread>
#include<future>
#include<chrono>
using namespace std;

#include"threadpool.hpp"

int Thread::generateId_ = 0;

/*
 如何让线程池提交任务更方便
1.pool.submitTask(sum1,10,20);
  pool.submitTask(sum2,1,2,3);
2.我们自己造了一个Result及相关类型,挺复杂
  C++11 线程库 thread packaged_task(function函数对象)
*/

int sum1(int a, int b)
{
	this_thread::sleep_for(std::chrono::seconds(2));
	return a + b;
}
int sum2(int a, int b, int c)
{
	this_thread::sleep_for(std::chrono::seconds(2));
	return a + b + c;
}

int main()
{
	ThreadPool pool;
	pool.start(2);

	future<int> res1 = pool.submitTask(sum1, 1, 2);
	future<int> res2 = pool.submitTask(sum2, 1, 2, 3);
	future<int> res3 = pool.submitTask([](int begin, int end)->int {
			int sum = 0;
			for (int i = begin; i <= end; i++)
				sum += i;
			return sum;
		}, 1, 100);
	future<int> res4 = pool.submitTask([](int begin, int end)->int {
		int sum = 0;
		for (int i = begin; i <= end; i++)
			sum += i;
		return sum;
		}, 1, 100);
	future<int> res5 = pool.submitTask([](int begin, int end)->int {
		int sum = 0;
		for (int i = begin; i <= end; i++)
			sum += i;
		return sum;
		}, 1, 100);
	cout << res1.get() << endl;
	cout << res2.get() << endl;
	cout << res3.get() << endl;
	cout << res4.get() << endl;
	cout << res5.get() << endl;


	//packaged_task<int(int, int)> task(sum1); // 打包一个函数对象
	 future 类似于我们自己实现的 Result
	//future<int> res = task.get_future(); // 返回一个future类型的对象
	 task(10, 20);
	//thread t1(std::move(task), 10, 20);
	//t1.detach();
	//cout << res.get() << endl; // res.get() 类似于Result提供的 供用户调用接收返回值的get()方法


	//thread t1(sum1, 10, 20);
	//thread t2(sum2, 1, 2, 3);
	//t1.join();
	//t2.join();
	return 0;
}

自此,我们将线程池改造成了基于可变参模板和future机制的线程池,使线程池变得更简洁,完成了最终版本的线程池。