传统异步回调代码的问题以及什么是协程
同步线性模型
// 1. 理想中的“线性”逻辑(同步思维)
void fetchUserData() {
Connection conn = connectToDatabase(); // 阻塞
User user = getUser(conn, "userId"); // 阻塞
Avatar avatar = getAvatar(user); // 阻塞
display(avatar);
closeConnection(conn); // 阻塞
}
首先,最早期的“同步线性逻辑”模型设计粗糙,连接的生命周期与线程的生命周期强绑定,缺乏更细粒度的管理。
这种模式无法应对著名的 C10k问题 (即同时处理1万个客户端连接)。线程数量与连接数呈1:1线性增长,而线程资源是有限的,很快就会达到操作系统瓶颈。
异步回调模型
为了突破C10k问题,人们提出了事件驱动的异步模型。其核心思想是:
“用少量的线程(甚至一个线程),通过非阻塞I/O和事件循环,来管理成千上万个连接。”
也就是著名的 Reactor模式。[可参考文档 ]
但如果一个任务中包含多个异步回调过程,其代码编写时可能是下面这样的:
// 2. 回调下的“碎片化”现实
void fetchUserData() {
async_connectToDatabase("db_url", [](Error e, Connection conn) {
if (e) { /* handle error */ return; }
async_getUser(conn, "userId", [](Error e, User user) {
if (e) { /* handle error */ return; }
async_getAvatar(user, [](Error e, Avatar avatar) {
if (e) { /* handle error */ return; }
display(avatar);
// 关闭连接!但如果外部操作失败了怎么办??
async_closeConnection(conn, [](Error e) { /* ... */ });
});
});
});
}
不难理解,回调破坏了代码的自然顺序流和局部上下文,将一段本应线性的逻辑拆解成多个在不同时间、不同调用栈上执行的碎片。这导致:
代码可读性差与结构维护困难(“回调地狱” - Callback Hell):
逻辑流断裂: 你必须从下往上、从里往外地阅读代码,才能拼凑出完整的业务逻辑(“先连接数据库,成功后获取用户,成功后获取头像,最后显示”)。
缩进灾难: 每增加一个异步步骤,代码就嵌套一层,很快变得无法管理。
变量传递: 外层的数据(如 conn)需要手动向内层传递,依赖闭包捕获。如果嵌套很深,需要哪些变量会变得很不直观。
错误处理复杂且重复
样板代码泛滥: 每个回调都必须以 if (error) { … } 开始,重复且啰嗦。
错误处理不统一: 很难在同一个地方对所有可能的错误进行统一处理。你可能在每个层级都以不同的方式处理相同的错误。
容易遗漏: 很容易忘记检查某个回调的错误,导致程序在发生异常时静默失败或行为异常,难以追踪问题根源。
调试体验极其反直觉
调用栈断裂: 当在 async_getAvatar 的回调里触发断点时,你的调用栈(Call Stack)几乎是空的。你只能看到事件循环(或线程池)是如何调用到这个匿名函数的,完全丢失了“是谁发起了 getAvatar?”以及“更早之前是谁发起了 getUser?”的上下文信息。调试器无法告诉你完整的执行路径。
无法单步跟踪: 你无法从 async_connectToDatabase 一行按“下一步”直接跳到 async_getUser。因为它们之间是“注册回调”而非“调用”,执行是中断的。你必须在一个个回调内部打断点,失去了“步进”整个流程的能力。
上下文丢失: 在回调内部,你只能看到当前回调捕获的变量。想了解整个任务的中间状态非常困难。
流程控制变得困难
- 循环: 在一个回调里启动下一次循环迭代,通常需要递归或额外的计数器,代码不再简洁。比如顺序处理一个序列数据,你需要在每一层回调中都将索引传递下去。
void fetchMultipleUsers(vector<string> ids, size_t index = 0) { if (index >= ids.size()) return; async_getUser(conn, ids[index], [index](Error e, User user) { // ... 处理 user ... fetchMultipleUsers(ids, index + 1); // 递归调用,进行下一次循环 }); }
- 条件分支: 简单的 if-else 也需要拆解到不同的回调中。
- 并行与同步: 实现“等待所有A、B、C三个异步操作完成后再执行D”(Promise.all 的功能)需要手动维护计数器或使用第三方库,引入了额外的复杂性。
资源管理生命周期复杂
在上面的例子中,Connection conn 应该在所有操作完成后被关闭。但在深度嵌套的回调中,很难找到一个合适的位置来写关闭连接的代码。你不得不把它写在最内层的回调里(但这不一定正确,如果外部操作失败了怎么办?),或者再次嵌套一个回调。
如果操作中途失败,需要确保所有已分配的资源都被正确清理,这会导致每个错误处理分支上都有大量的清理代码。
协程模型
协程是一种可以暂停执行并在之后恢复执行的函数。与线程不同,协程的挂起和恢复完全由程序员控制,不需要操作系统介入,因此更加轻量级。
我们可以将协程描述为一个可以暂停的函数,想象这么一个执行过程:
你调用该协程,它开始读数据。
发现数据还没来,它说:“我先暂停(挂起) 在这里,等数据来了你再叫我。”
它返回执行权给调用者,调用者可以去做其他事情。
数据到达后,它被恢复,从刚才暂停的地方继续执行,处理接收到的数据。
处理完所有数据后,它最终返回。
使用协程后,之前采用回调方式编写的代码变为如下形式:
// 3.使用协程的异步代码
task<Result> fetchUserData() {
Connection conn = co_await async_connect(host, port);
/* handle error */
Result login_result = co_await async_login(conn, credentials);
/* handle error */
Data data = co_await async_query(conn, query);
/* handle error */
Result process_result = co_await async_process(data);
/* handle error */
Result close_result = co_await async_closeConnection(conn);
/* handle error */
co_return process_result; // 可以返回一个值,类似于函数的return
}
显然,对比于采用异步回调的代码,利用协程写的代码像同步一样简洁,错误处理可用try-catch,且逻辑清晰、易维护。
对比:回调 vs. 协程
特性 | 异步回调 | 协程 |
---|---|---|
代码结构 | 嵌套金字塔,逻辑碎片化 | 扁平线性,如同步代码 |
可读性 | 差,必须反向推理 | 极好,顺序执行 |
错误处理 | 分散、重复、易遗漏 | 集中,可使用 try/catch |
调试 | 调用栈断裂,难以跟踪 | 完整调用栈,可步进调试 |
控制流 | 复杂(循环、条件、并行) | 简单(可直接使用 for 、if 等) |
上下文管理 | 手动传递,依赖闭包 | 自动保存和恢复局部变量 |
心智模型 | 跳跃的、事件驱动的 | 直观的、顺序执行的 |
传统的异步回调模式迫使开发者将业务逻辑的“控制流”和“状态管理”的负担从语言运行时转移到了开发者的大脑中。开发者需要手动模拟编译器为同步代码所做的所有工作(保存上下文、管理状态机、处理跳转),这不仅极易出错,也使得代码难以编写、阅读、调试和维护。
而协程通过语言层面的支持,让编译器来替开发者完成“将线性代码转换为状态机”的繁重工作,使得开发者可以用同步的思维模式和代码风格去编写高效的异步程序,这是其最大的优势所在。
C++协程的原理及实现
初步认识协程的概念
从下图可以很直观地看出为什么我们称协程为可以暂停的函数。
(此图来源于网络)
基于此,我们可以很方便地实现生成器,比如一个斐波那契数列的生成器:
Generator<uint64_t> fibonacci() {
uint64_t a = 0, b = 1;
while (true) {
co_yield a; // 产出当前值,并挂起
auto next = a + b;
a = b;
b = next;
}
}
// 使用,基于范围的for循环
for (auto i : fibonacci()) {
std::cout << i << std::endl;
}
这对于在C++代码中的range-based for循环是非常有用的!特别是对于斐波那契数列这种无限序列,可以做到只有在需要时才产生一个值并返回,而不用提前生成一个序列再进行遍历,节省存储空间,提高效率,代码亦是非常简洁!
C++ 协程的实现
1 三个关键操作
- co_yield:产生一个值并暂停。是一种输出语义,用于提供一个输出值给调用者
- co_await:暂停执行直到awaitable操作完成。是一种输入语义,一般用于将中间结果交给另一个执行者执行另一个异步过程,待该异步过程执行完毕后,该执行者返回到此处继续执行剩下的操作
- co_return:返回最终结果并结束协程。
2 协程函数(Coroutine Function)
怎么知道一个函数是协程?当一个用户编写的函数中出现co_await
、co_yield
或co_return
以上三个关键字中的任意一个时,编译器便知道该段代码是一个协程(coroutine),它会将协程中代码转换为一个“傀儡函数”和一段有限状态机代码(FSM)。
3 一个用于包装协程句柄的类型(Wrapper Type)
当程序调用一个协程时(调用一个函数类似),首先会执行“傀儡函数”来完成初始化,并生成一个返回给调用者的对象,该对象主要是用于包装协程句柄,从而在调用者和协程之间建立联系。从语义上我们可以认为这个对象的类型可定义一个Task:
template<typename T>
class Task<T>; // T是调用者希望协程最终返回给调用者的结果类型
比如有这样一段协程:
Task<int> my_coroutine(int x) {
int result = co_await some_async_operation(x);
co_return result * 2;
}
那么在程序中调用协程的方法便如下所示:
Task<int> task = my_coroutine(0);
乍一看有点抽象,因为my_coroutine(int x)
最终返回的结果类型应该是int,但实际调用my_coroutine(0)
返回了一个Task<int>
。这是因为此处调用my_coroutine(0)
只是完成了协程的前置初始化过程,可能(注意是可能,也可能已经执行了)并没有真正执行my_coroutine(int x)
函数体中的代码,这也是为什么前文形象地称这个过程为“傀儡函数”。
4 协程的Promise对象(Promise Type)
前面我们了解到,编译器将协程函数中的代码转换为有限状态机,现在我们已经执行了”傀儡函数“完成了前置过程,此时该如何让这个状态机跑起来?我们需要在class Task<T>
中定义一个嵌套类型struct promise_type
来定制协程的行为,从而取得对协程的控制权。
promise_type
必须提供以下关键方法:
get_return_object()
: 如何生成返回给调用者的对象(如Task<T>
)。通常在class Task<T>
的构造函数中会保存协程句柄,从而建立联系。initial_suspend()
: 返回一个awaiter,决定协程开始执行时是立即执行还是立即挂起。final_suspend()
: 返回一个awaiter,决定协程在co_return或未捕获异常退出后是最终挂起还是直接销毁。return_value(T value) / return_void()
: 如何处理co_return的值。unhandled_exception()
: 如何处理协程体内未捕获的异常。
一个极简的promise_type定义如下所示:
template<typename T>
class Task<T> {
public:
struct promise_type {
// 协程内部产生的返回值,是调用者真正需要的输出结果
T value_;
// 存储异常
std::exception_ptr exception_;
// 1. 获取返回给外部的对象
Task<T> get_return_object() {
// 使用 from_promise 静态方法将 promise 和句柄关联
return Task<T>{std::coroutine_handle<promise_type>::from_promise(*this)};
}
// 2. 初始挂起策略:立刻挂起,让调用者决定何时开始
std::suspend_always initial_suspend() { return {}; } // 初始化完先挂起
// std::suspend_never initial_suspend() { return {}; } // 初始化完不挂起,直接开始运行协程
// 3. 最终挂起策略:挂起。这样我们可以在协程结束后通过句柄检查状态或获取值。
std::suspend_always final_suspend() noexcept { return {}; }
// 4. 处理 co_yield (等价于 co_await promise.yield_value(...))
std::suspend_always yield_value(T value) {
value_ = std::move(value);
return {}; // 返回一个等待器,告诉协程挂起
}
// 5. 处理 co_return
void return_void() {}
// 6. 处理异常
void unhandled_exception() { exception_ = std::current_exception(); }
}
std::coroutine_handle<promise_type> handle;
};
5 协程句柄(Coroutine Handle)
std::coroutine_handle<promise_type>
是一个轻量级的、无所有权的指针,指向底层的协程状态(协程帧)。它是你手动恢复或销毁一个已挂起协程的唯一凭据,一个遥控器,可以完成的主要操作:handle.resume()
(恢复执行),handle.destroy()
(销毁协程帧),handle.done()
(检查协程是否已执行完)。
6 等待器(Awaitable 和 Awaiter)
等待器决定协程如何挂起和恢复。它定义了co_await
的行为。一个极简的Awaiter定义如下:
struct Awaiter {
// co_await 工作流程的三个关键方法
// 1. 检查操作是否已经完成
bool await_ready() const {
return false; // 返回false说明没完成,需要挂起,即执行 await_suspend 并挂起
}
// 2. 挂起协程,安排异步操作
void await_suspend(std::coroutine_handle<> handle) const {
// 挂起前的安排,比如起一个线程执行异步操作,并在完成后通过handle返回
}
// 3. 恢复后产生的结果
void await_resume() const {
// 可以是一些恢复后的操作,或者返回一个值,也就是 co_await 表达式的返回值
}
};
bool await_ready()
: 操作是否已完成?如果返回true,则协程不会挂起,直接继续执行。void/bool/coroutine_handle<> await_suspend(coroutine_handle<> h)
: “没准备好的话,怎么挂起?” 在这里安排恢复的任务(例如,将恢复操作交给异步I/O的回调)。T await_resume()
: 当协程恢复时,此方法的返回值就是co_await表达式的结果(T可以是void)。
STL中已经帮我们定义了两个Awaiter,也是最常用到的,我们在promise_type定义的讲解中已经见到了:
这两个Awaiter虽然非常简单,但了解清楚也有助于我们理解co_await的运作机制。
此外,还应该了解的是,co_yield a
其实就是co_await promise.yield_value(a)
,本质上还是一个co_await行为,也正因此yield_value()
返回的也是一个Awaiter。
7 协程帧(Coroutine Frame)/ 协程状态(Coroutine State)
Task<int> my_coroutine(int x) {
int result = co_await some_async_operation(x);
co_return result * 2;
}
依然以上面这段极简协程代码为例。当遇到协程调用时,编译器首先会在堆上分配一个结构体,称为“协程帧”。其中包含了:
参数:函数入参(
x
)会被移动或拷贝到这个状态中。局部变量:所有局部变量(如
result
)也成为这个状态的成员。这就是为什么协程挂起后局部变量还能保持的原因。promise_type
对象:struct promise_type
的实例,前面已介绍过。恢复点(Resumption Point):一个整数或标签,用于记录当前协程在状态机中执行到的位置,以便通过handle恢复时能跳转到正确的地方。
其他内部信息。
8 编译器代码转换(以一个极简的协程案例来说明执行过程)
为方便阅读,我们将之前的协程函数及其调用再简单写一遍:
- 调用:
Task<int> task = my_coroutine(0);
//...
task.handle.resume(); // 因为initial_suspend返回挂起,所以需要执行resume来让协程真正开始执行
//...主线程在协程挂起后(开启异步任务之后),可以先去做其他事
//...
//...
if (task.handle.done()) // 在适当的时机,可以通过promise拿到结果值
auto result = task.handle.promise()._value;
//...
- 协程
Task<int> my_coroutine(int x) {
//...
int result = co_await some_async_operation(x); // 完成await_suspend()后主线程会返回
co_return result * 2; // 这部分将由完成异步任务的线程继续执行
}
- promise_type:
struct promise_type {
T _value;
Task<T> get_return_object() { return Task<T>(handle_from_promise(*this)); }
std::suspend_always initial_suspend() { return {}; } // 初始化完先挂起
// std::suspend_never initial_suspend() { return {}; } // 初始化完不挂起,直接开始运行协程
std::suspend_always final_suspend() noexcept { return {}; }
std::suspend_always yield_value(T value) {}
void return_void() {}
void return_value(T value) { _value = value; }
};
将异步任务定义到一个Awaiter中:
struct some_async_operation {
int x;
int result;
bool await_ready() const {
return false; // 返回false说明没完成,需要挂起,即执行 await_suspend 并挂起
}
// 2. 挂起协程,安排异步操作
void await_suspend(std::coroutine_handle<> handle) const {
// 开启异步操作,传入handle便于在完成后调用handle.resume()
// 比如将异步操作函数丢给一个任务队列去执行,并由完成任务的线程执行handle.resume()
}
// 3. 恢复后产生的结果
int await_resume() const { return result; }
};
前面我们已经说过,当用户编写了上面的Task<int> my_coroutine(int x)
代码后,编译器会为你生成一个“傀儡”函数和一段有限状态机代码。这个过程对你是完全透明的。为了便于理解,我们来看一下相关过程的伪代码:
// 注意:这是编译器生成逻辑的伪代码,并非实际代码
Task<int> my_coroutine(int x) {
// 1. 在堆上分配协程帧,并初始化
__coroutine_state* __state = __allocate_coroutine_state();
__state->x = x;
__state->__resume_point = 0; // 起始点标记为0
// 2. 构造promise对象,并从promise中获取返回对象(即task<int>)
__state->__promise.constructor();
Task<int> __return_obj = __state->__promise.get_return_object();
// 3. 调用co_await promise.initial_suspend();
// 这允许协程在开始执行后就立即挂起(lazy启动),或立即执行(eager启动)
__await_suspend_result = await_suspend(__state->__promise.initial_suspend(), ...);
// 4. 根据initial_suspend的结果,决定是立即恢复执行还是返回
if (__await_suspend_result == __suspend_always) {
return __return_obj; // 此时协程被挂起在初始位置,控制权返回给调用者
} else {
__resume(__state); // 跳转到状态机继续执行
}
return __return_obj;
}
// 5. 真正的函数体被转换成一个状态机函数
void __resume(__coroutine_state* __state) {
switch (__state->__resume_point) {
case 0: // 起始点
// ... 执行co_await之前的代码
goto label0;
label0:
// 处理 co_await some_async_operation(x);
__awaitable = some_async_operation(__state->x);
if (!__awaitable.await_ready()) {
// 如果操作尚未完成,则挂起
__state->__resume_point = 1; // 记录恢复点为1
__awaitable.await_suspend(__state->__coroutine_handle); // 传递一个可恢复此状态的句柄
return; // 挂起并返回,控制权交给await_suspend所指定的地方
}
// 如果操作已经完成,则直接 fallthrough
case 1: // 恢复点1
// 获取await的结果,并赋值给局部变量
__state->result = __awaitable.await_resume();
// ... 执行 result * 2 的逻辑
// 遇到 co_return
__state->__promise.return_value(__state->result * 2);
goto final_suspend;
}
final_suspend:
// 6. 执行co_await promise.final_suspend();
// 这允许协程在结束前执行一些清理工作或通知完成状态
co_await __state->__promise.final_suspend();
// 协程生命周期结束,自动销毁协程状态
}
就这样我们一顿操作猛如虎,用协程完成了一个异步任务并取得了结果。
小结
再总结下协程运用于复杂异步操作场景时的优势:
可读性:代码逻辑是顺序的,符合人类思维,极大提升了可维护性。
错误处理:可以直接使用try/catch来捕获异步操作中的异常,而无需复杂的错误传递机制。
上下文管理:局部变量自然保存在协程状态中,无需手动绑定到闭包里。
尽管C++的协程在使用上有一些抽象,也需要一系列前置操作,不免让人望而生畏,但回过头去再对比一下本文开头对于一个复杂任务采用传统异步回调实现和采用协程实现的对比,你大概率会觉得封装一个合适的协程操作是值得的,因为协程的代码显然在编写和维护上更友好。
传统异步操作:
void fetchUserData() {
async_connectToDatabase("db_url", [](Error e, Connection conn) {
if (e) { /* handle error */ return; }
async_getUser(conn, "userId", [](Error e, User user) {
if (e) { /* handle error */ return; }
async_getAvatar(user, [](Error e, Avatar avatar) {
if (e) { /* handle error */ return; }
display(avatar);
// 关闭连接!但如果外部操作失败了怎么办??
async_closeConnection(conn, [](Error e) { /* ... */ });
});
});
});
}
协程代码
// 3.使用协程的异步代码
Task<Result> fetchUserData() {
Connection conn = co_await async_connectToDatabase("db_url");
/* handle error */
User user = co_await async_getUser(conn, "userId");
/* handle error */
Avatar avatar = co_await async_getAvatar(conn, user);
/* handle error */
Result process_result = display(avatar);
/* handle error */
Result close_result = co_await async_closeConnection(conn);
/* handle error */
co_return process_result;
}
C++20协程是一把锋利的瑞士军刀,它提供了底层的强大原语,而上层的高级抽象则需要你自己或社区来构建。虽然学习曲线稍陡,但它为编写清晰、高效、可维护的异步代码带来了革命性的改变,绝对值得你投入时间学习。
其他应用举例
简化状态机
用线性的代码来表达状态流转,而不是使用复杂的switch-case
:
Task<void> user_session_flow(connection conn) {
co_await authenticate(conn);
// 状态:已认证
while (true) {
auto request = co_await read_request(conn);
// 状态:等待请求
if (request.is_type("quit")) break;
auto response = co_await process_request(request);
// 状态:处理中
co_await write_response(conn, response);
// 状态:等待写入完成
}
// 状态:结束
conn.close();
}
复杂操作的代码简化
(此图来源于网络)
比如要实现交叉遍历两个数组,如果用传统的函数思维实现,最直接的想法就是把两个数组交叉merge后再进行迭代,但显然效率不够高。当然也可以考虑利用static变量来记录数组的编号和索引,但代码显然会比较丑陋。而如果利用协程来实现一个生成器,代码就会相当优雅。
Generator interleave(std::vector<int> a, std::Vector<int> b)
{
for (int i = 0; i < a.size() && i < b.size(); i++) {
co_yield a[i];
co_yield b[i];
}
}
当然,你需要一个预先定义好的Generator,并写好迭代器才能用于range-based for循环,但这通常不是大问题。
还有一种高级的写法如下: