异步编程的概念
什么是异步?
- 异步编程是一种编程范式,允许程序在等待某些操作时继续执行其它任务,而不是阻塞或等待这些操作完成。
异步编程vs同步编程?
- 在传统的同步编程中,代码按顺序同步执行,每个操作需要等待前一个操作完成。这种方式在处理I/O操作、网络请求、计算密集型任务时可能会导致程序的性能瓶颈。举个例子,当程序需要从网上获取数据时,需要等待数据返回后才能继续执行,等待期间CPU可能处于空闲状态,浪费了资源。
- 而异步编程不同,它允许程序在操作未完成期间继续执行其它任务。例如,程序可以发起一个网络请求,然后继续执行其它操作,等到网络请求完成时,再处理获取的数据。
异步编程的优点?
- 提高性能,并发执行多个任务,更高效地利用CPU资源
- 提高响应速度,程序在等待某些操作完成时继续响应用户输入,提高用户体验
实现异步编程
在介绍异步编程工具之前,我们先来回答一个问题:
为什么主线程无法直接获取新线程的计算结果?
内存隔离性:线程拥有独立的栈空间,新线程中局部变量的生命周期仅限于该线程。来看一下下面这个例子:
int res; // 定义在主线程中
// 引用传递res
thread t([&res]() {
res = 42;
});
t.join();
cout << res << endl; // 可以输出42,但存在风险!
这种方式看似可行,实则存在严重问题:
- 竞态条件:如果没有正确使用join,主线程可能会在t修改res执行前就读取它
- 悬挂引用:如果res是局部变量且线程未及时完成,新线程可能访问已销毁的内存。
同步缺失问题:及时使用全局变量或堆内存,仍需手动同步:
mutex mtx;
int* res = new int(0); // 在堆上创建变量
thread t([&mtx, res](){
unique_lock<mutex> lock(mtx);
*res = 42;
});
// 主线程需要轮询检查res,效率极低
核心机制:std::future
std::future提供了一种标准化的线程间通信机制,其核心原理是共享状态,当异步任务完成后,结果会被写入共享状态,future通过检查该状态安全地传递结果。
std::future是一个模板类,用于表示异步操作的结果,允许开发者在未来的某个时刻查询异步操作的状态、等待操作完成或获取操作结果。通常我们不直接创建future对象 ,而是与std::async、std::packaged_task或std::promise配合使用。
任务启动器:std::async
std::async是一种将任务与std::future关联的简单方法,创建并运行一个异步任务,并返回一个与该任务结果关联的std::future对象。async的任务是否同步运行取决于传递的参数:
- std::launch::deferred:表明该函数会被延迟调用,知道future对象上调用get或wait方法才会开始执行任务
- std::launch::async:表明函数会在创建的新线程上运行
- std::launch::deferred|std::launch::async:内部由操作系统自动选择策略
延迟调用:
#include <iostream>
#include <future>
#include <unistd.h>
using std::cout;
using std::endl;
int myadd(int num1, int num2)
{
cout << "add start!" << endl;
return num1 + num2;
}
int main()
{
cout << "---------1----------" << endl;
std::future<int> fut = std::async(std::launch::deferred, myadd, 1, 2);
sleep(1);
cout << "---------2----------" << endl;
cout << fut.get() << endl;
return 0;
}
执行结果:
不难发现,直到我们调用了get方法,才执行myadd函数
异步执行:
#include <iostream>
#include <future>
#include <unistd.h>
using std::cout;
using std::endl;
int myadd(int num1, int num2)
{
cout << "add start!" << endl;
return num1 + num2;
}
int main()
{
cout << "---------1----------" << endl;
std::future<int> fut = std::async(std::launch::async, myadd, 1, 2);
sleep(1);
cout << "---------2----------" << endl;
cout << fut.get() << endl;
return 0;
}
执行结果:
在调用之后创建的线程立即执行了myadd函数。
结果传递器:std::promise
std::promise是一个用于设置异步操作结果的机制。允许我们在一个线程中设置值或异常,然后再另一个线程中通过future对象检索这些值或异常,通常与std::async、std::thread等结合使用,在异步操作中传递结果。
#include <iostream>
#include <thread>
#include <future>
//通过在线程中对promise对象设置数据,其他线程中通过future获取设置数据的方式实现获取异步任务执行结果的功能
void Add(int num1, int num2, std::promise<int> &prom) {
std::this_thread::sleep_for(std::chrono::seconds(3));
prom.set_value(num1 + num2);
return ;
}
int main()
{
std::promise<int> prom;
std::future<int> fu = prom.get_future();
std::thread thr(Add, 11, 22, std::ref(prom));
int res = fu.get();
std::cout << "sum: " << res << std::endl;
thr.join();
return 0;
}
注意事项
- std::promise的生命周期:需要确保promise对象在future对象需要使用它的时候保持有效,一旦promise对象销毁,任何尝试通过future访问其结果的操作都将失败。
- 线程安全:std::promise的set_value和set_exception方法是线程安全的,但仍应该避免在多个线程中同时调用它们,这意味着设计存在问题。
- 将std::promise对象传给线程函数时,通常使用std::move或std::ref来避免不必要的复制。、
任务封装器:std::packaged_task
std::packaged_task是一个模板类,主要用于将一个可调用对象包装起来,以便异步执行,并能够获取其返回结果。它和std::future、std::thread紧密相关,常用于多线程编程中。
使用std::packaged_task的流程:
- 创建packaged_task对象:创建packaged_task对象需要传递一个可调用对象,将其封装为异步任务。
- 获取future对象:使用get_future方法可以获取与packaged_task关联的future对象,用于获取异步操作的结果。
- 执行任务:通过operator()或调用thread在一个新线程中执行。注意packed_task对象是不能复制的,所以需要通过std::move或智能指针传递。
- 获取结果:主线程种调用future对象的get方法可以等待异步任务完成并获取其返回值。如果任务尚未完成,get方法会阻塞直到结果可用。
#include <iostream>
#include <thread>
#include <future>
#include <memory>
//pakcaged_task的使用
// pakcaged_task 是一个模板类,实例化的对象可以对一个函数进行二次封装,
//pakcaged_task可以通过get_future获取一个future对象,来获取封装的这个函数的异步执行结果
int Add(int num1, int num2) {
std::this_thread::sleep_for(std::chrono::seconds(3));
return num1 + num2;
}
int main()
{
//std::packaged_task<int(int,int)> task(Add);
//std::future<int> fu = task.get_future();
//task(11, 22); task可以当作一个可调用对象来调用执行任务
//但是它又不能完全的当作一个函数来使用
//std::async(std::launch::async, task, 11, 22);
//std::thread thr(task, 11, 22);
//但是我们可以把task定义成为一个指针,传递到线程中,然后进行解引用执行
//但是如果单纯指针指向一个对象,存在生命周期的问题,很有可能出现风险
//思想就是在堆上new对象,用智能指针管理它的生命周期
auto ptask = std::make_shared<std::packaged_task<int(int,int)>>(Add);
std::future<int> fu = ptask->get_future();
std::thread thr([ptask](){
(*ptask)(11, 22);
});
int sum = fu.get();
std::cout << sum << std::endl;
thr.join();
return 0;
}
三种异步工具的比较
std::async
- 自动任务调度:async提供了一种简单方便地方式来创建异步任务,只需要调用async,传入函数和参数,就会自动执行异步任务,并返回future对象用于得到异步操作结果。
- 灵活性有限:尽管简单,但灵活性有限,无法完全控制任务的调度方式(如任务在哪个线程运行)
- 适用场景:适用于简单的异步任务,不需要复杂的任务调度和管理。
std::promise
- 手动设置结果:promise是一种更底层的机制,允许手动设置异步操作的结果,并将结果传递给与之关联的future对象,使用时需要将promise和异步任务的逻辑结合在一起。
- 更多的代码管理:使用promise需要手动管理任务的执行和结果的传递,因此比async更灵活和复杂。
- 使用场景:适用于需要手动控制任务结果传递的场景,或异步任务的结果是由多个步骤或线程决定的。
std::packaged_task
- 封装可调用对象:packaged_task可以将一个可调用对象封装起来,并通过future对象传递执行结果,这使它可以用于复杂的异步任务调度。
- 与其他工具结合使用:packaged_task的设计使得它可以很容易地与std::thread、自定义线程池、任务队列等结合使用,灵活地管理任务的执行。
- 使用场景:适合需要高度灵活的任务管理、封装任务并手动控制任务执行的场景,特别适用于实现自定义线程池。
异步线程池设计方案
线程池需要管理的数据:
- 控制线程停止的变量:支持原子操作,保证关闭线程池操作的线程安全
- 任务池:存放待执行的异步任务
- 互斥锁与条件变量:保证线程安全与同步
- 一批工作线程:用于执行异步任务
线程池的实现思路:
- 在启动时预先创建一批工作线程,执行线程入口函数:不断从任务池中取出任务进行执行,没有任务则等待条件变量就绪
- 用户通过Push方法可以将要执行的任务传入线程池,先将传入的任务封装为packaged_task异步任务后,通过packaged_task的get_future方法可以获得future对象,然后将异步任务放入任务池,唤醒工作线程执行异步任务。
- 将future对象返回给使用者,使用者可以通过get方法获取异步任务的执行结果。
#include <vector>
#include <iostream>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <memory>
using namespace std;
class ThreadPool
{
public:
using Functor = function<void()>;
ThreadPool(int threadNum = 1): _stop(false)
{
// 创建一批执行线程入口函数的线程
for(int i = 0; i < threadNum; i++)
{
_threads.emplace_back(&ThreadPool::entry, this);
}
}
// 万能引用
template<typename F, typename ...Args>
auto Push(F&& func, Args&& ...args) -> future<decltype(func(args...))> // 编译时推导返回值类型
{
using return_type = decltype(func(args...));
// 完美转发
auto tmp_func = bind(forward<F>(func), forward<Args>(args)...);
auto task = make_shared<packaged_task<return_type()>>(tmp_func);
future<return_type> fut = task->get_future();
{
unique_lock<mutex> lock(_mtx);
_tasks.push_back([task](){
(*task)();
});
}
_cv.notify_one();
return fut;
}
~ThreadPool()
{
Stop();
}
void Stop()
{
if(_stop == true) return ;
_stop = true;
_cv.notify_all(); // 唤醒所有线程,进行回收
for(auto& thread : _threads)
{
if(thread.joinable())
{
thread.join(); // 回收线程
}
}
}
private:
// 不断从任务池中取出任务执行
void entry()
{
while(!_stop)
{
vector<Functor> tmp_tasks;
{
unique_lock<mutex> lock(_mtx);
_cv.wait(lock, [this](){
return _stop || !_tasks.empty(); // 当线程池停止(要回收线程)或任务池有任务时唤醒线程
});
tmp_tasks.swap(_tasks);
}
for(auto& task : tmp_tasks)
{
task();
}
}
}
private:
atomic<bool> _stop;
vector<Functor> _tasks;
vector<thread> _threads;
mutex _mtx;
condition_variable _cv;
};
int add(int a, int b)
{
return a + b;
}
int main()
{
ThreadPool pool;
for(int i = 0; i < 10; i++)
{
future<int> fut = pool.Push(add, 10, i);
cout << fut.get() << endl;
}
return 0;
}