在C++中,相关的接口被如下组织:
promise & future
要在C++中使用该组异步接口,首先需要定义一个promise结构体,它表示某一个数据的输入端。//头文件
#include <future>
//定义一个promise,传递int类型的未来值:
std::promise<int> p;
//定义一个future,并于p绑定(直接调用了p的接口获取)
std::future<int> f = p.get_future();
当某个值准备好后,调用promise的set_value接口,设置好未来值,随后,就能使用future的接口将该值读取出来。
#include <future>
#include <iostream>
int main()
{
std::promise<int> p;
std::future<int> f = p.get_future();
p.set_value(42); // 这个值对p而言是只写的
std::cout << f.get() << std::endl; // 这个值对f而言是只读的
}
假设我们将p传入到某一个线程,在这个线程内部,调用p的set_value接口,在另一个线程内部,调用f的get接口,就可以实现线程间的异步同步处理。
#include <future>
#include <iostream>
#include <thread>
int main()
{
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(
[](std::promise<int> p) {
p.set_value(42);
},
std::move(p));
std::cout << f.get() << std::endl;
t.join();
}
需要注意的是,对应的一套 p.set_value 和 f.get 只能被使用一次,否则会抛出future_error的exception。
wait、wait_for、wait_until
由于异步之间效率的不同,某一个未来值可能迟迟不出现,这就会出现另一个线程的阻塞等待,C++也提供了相应的接口,并给出了超时语义。- void wait():阻塞式的等待,直到future的值可以被读取
- future_status wait_for(const std::chrono::duration<…> &):等待存在最大时间限制,返回future_status
- future_status_wait_until(const std::chrono::time_point<…> &):等待到某个时间,返回future_status
future_status可以有以下取值:
future_status | 定义 |
---|---|
deferred | 这是一个惰性估值(async中会使用) |
ready | 已经可以读取值 |
timeout | 等待超时 |
shared_future
std::future 是不可拷贝的,不是线程安全的,只能通过移动(std::move) 语义传递。只能由单一消费者消费结果,如果存在对一个future的多个线程同时访问需求,需要使用到shared_futrue。shared_future 支持共享访问,允许多个线程共享访问异步操作的结果,它允许被拷贝,每个线程都可以对不同的shared_future拷贝获取相同的结果。
#include <iostream>
#include <future>
#include <thread>
// 模拟一个耗时计算的函数
int compute()
{
return 42;
}
int main()
{
// 创建一个 std::promise 和与之关联的 std::future
std::promise<int> promise;
std::future<int> fut = promise.get_future();
// 将 std::future 转换为 std::shared_future
std::shared_future<int> shared_fut = fut.share();
// 在另一个线程中设置 promise 的值
std::thread t([&promise]() {
int result = compute(); // 计算结果
promise.set_value(result); // 将结果传递给 promise
});
// 多个地方获取结果
std::cout << "Thread 1: " << shared_fut.get() << std::endl;
std::cout << "Thread 2: " << shared_fut.get() << std::endl;
// 共享的 future 可以拷贝
std::shared_future<int> shared_fut_copy = shared_fut;
std::cout << "Thread 3 (copy): " << shared_fut_copy.get() << std::endl;
t.join(); // 等待线程结束
return 0;
}
output:
Thread 1: 42
Thread 2: 42
Thread 3 (copy): 42
packaged_task
在上面的例子中,我们总是希望在某个方法中异步的完成它的值,因此,我们需要设计出这个方法,packaged_task类试图将promise和实现promise的方法进一步封装为一个完整的结构。#include <future>
#include <iostream>
int compute(int a, int b) {
return 42 + a + b;
}
int main() {
// 相当于定义了promise 并绑定了对应的方法
std::packaged_task<int(int, int)> task(compute);
std::future<int> f = task.get_future();
// task内部重载了(),将直接调用compute
task(3, 4);
std::cout << f.get() << std::endl;
return 0;
}
简化版实现如下:
#include <exception>
#include <functional>
#include <future>
template <typename Func>
class my_packaged_task;
template <typename Ret, typename... Args>
class my_packaged_task<Ret(Args...)> {
private:
std::promise<Ret> promise_;
std::function<Ret(Args...)> func_;
public:
my_packaged_task(std::function<Ret(Args...)> func)
: func_(std::move(func)) {}
// 重载 () 操作
void operator()(Args&&... args) {
try {
promise_.set_value(func_(std::forward<Args&&>(args)...));
} catch (...) {
promise_.set_exception(std::current_exception());
}
}
std::future<Ret> get_future() {
return promise_.get_future();
}
};
因此,promise被进一步封装,对外不再可见,当需要调用预设的方法为future赋值时,只需要简单的调用task重载后的函数即可。
async
在将promise封装为packaged_task后,我们依然有大量的需求是创建一个线程,去执行异步操作,在计算机的世界里面,没有什么是再封一层做不到的,如果有,那一定就是再封两层,所以我们将packaged_task和thread再向上封装,就形成了最后的async接口,这个接口直接返回future,然后在**某个时间点**完成异步操作。某个时间点取决于 async 当前的执行策略:
- std::launch::async:建立一个线程执行指定的异步操作回传到future。
2. std::launch::deferred:将该操作的触发时间,延迟到future.get被调用的时候。
分析如下代码:
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
// 模拟一个耗时任务
int compute(int x) {
std::cout << "Start computation with x = " << x << " in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作
return x * x;
}
int main() {
// 使用 std::async 的 std::launch::async 模式
std::future<int> async_future = std::async(std::launch::async, compute, 10);
// 使用 std::async 的 std::launch::deferred 模式
std::future<int> deferred_future = std::async(std::launch::deferred, compute, 20);
// 主线程继续执行其他任务
std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;
// deferred 模式任务尚未启动,只有调用 `get()` 时才会运行
std::cout << "Waiting for deferred result..." << std::endl;
std::cout << "Deferred result: " << deferred_future.get() << std::endl; // 在调用 get() 时任务才运行
// 查看任务是否已启动(async 模式的任务已经开始)
std::cout << "Waiting for async result..." << std::endl;
std::cout << "Async result: " << async_future.get() << std::endl; // 获取结果并等待 async 模式完成
return 0;
}
output:
Main thread ID: 135193224783680
Waiting for deferred result...
Deferred result: Start computation with x = 20 in thread 135193224783680
Start computation with x = 10 in thread 135193218250304
400
Waiting for async result...
Async result: 100
可以观察到,在输出deferred模式的future值时会感受到2秒的明显停顿且thread id和主线程相同,而async模式的future值瞬间输出,输出的线程thread id和主线程不同,因为deferred模式下,并没有额外的线程产生,依靠主线程执行了对future的赋值操作,在async模式下,会额外创建一个新的线程异步执行操作。