为什么C++
有协程
时需要发送者?
这一切都太复杂了!
所以把它具体化.这里展示如何把一个生硬的旧C风格
的异步API
引入发送者
的世界,及你可能想要这样的原因.
C(经典)
异步API
过去,我做了很多Win32
编程.Win32
有多种异步模型
待选,但最简单
的是好的回调
.异步IOAPI
(如ReadFileEx
)的形状如下:
//带回调的旧式异步`CAPI/`(如`Win32`的`ReadFileEx)`
struct overlapped {
//...操作系统`内部结构`在此...
};
using overlapped_callback =
void(int status, int bytes, overlapped* user);
int read_file(FILE*, char* buffer, int bytes,
overlapped* user, overlapped_callback* cb);
协议非常简单
:传入一般的参数
加上两个额外的参数
来调用read_file
:"覆盖"结构的指针和回调
.操作系统将使用覆盖的结构
,但用户也可在其中填充数据
,这样回调
可稍后使用它.像这样:
struct my_overlapped : overlapped {
//...额外`数据`在此......
};
void my_callback(int status, int bytes, overlapped* data) {
auto* my_data = static_cast<my_overlapped*>(data);
//...使用放入`"my_overlapped"`对象中的额外内容......
delete my_data;
//清理
}
void enqueue_read(FILE* pfile) {
//分配和初化`my_data...`
auto* my_data =
new my_overlapped{{}, /*`数据`在此*/ };
int status =
read_file(pfile, buff, bytes, my_data, my_callback);
//...
}
原理:read_file
导致操作系统排队IO
操作,并保存覆盖(overlapped)
和overlapped_callback
指针.IO
完成后,OS
传入覆盖结构指针
并调用回调.
我已写过数百次此代码
了.你可能也有.
这很简单
.它有效.为什么要让它更复杂
,对吧?
C(混乱)
异步API
回调API
没有任何问题.问题是,每个公开异步的库都使用略有不同
的回调API
.如果想链接两个不同库的两个异步操作
,需要编写一堆胶水代码
来按另一个异步抽象
映射该异步抽象
.
这是巴别塔问题.
解决方法是让C++
标准认可一个异步抽象
.然后,所有公开异步的库都可按标准抽象
映射它们的抽象
,这样都可相互通信.
这就是C++
的标准化
委员会对它感兴趣的原因.实际上,只能通过标准
来解决的问题.
C(组合)
异步API
因此引入发送者
.它们很高效
,结构化,且是组合的!,这里展示一些代码,让代码说话.
如果查看上面的read_file,API
,可识别出一些不同部分
:
1,分配异步操作
期望的任何资源.
2,操作时,数据
必须在稳定的地址(即覆盖
结构),
3,初化将异步IO
排入队列的异步操作
,及处理异常,
4,异步操作
完成后执行用户提供的延续
(即回调).
5,回收第1步中分配的任何资源
.
发送者
也有所有这些部件
,但形状统一,因此可通用地使用它们
.事实上,发送者
和C风格API
之间唯一区别是,发送者
中不是一个回调,而是三个:成功,失败和取消
各一个.
第1步:分配
重新构想的read_fileAPI
将如下:
read_file_sender
async_read_file(FILE* file, char* buffer, int size)
{
return {file, buffer, size};
}
工作是将参数放入发送器形状
的对象
中,该对象如下:
namespace stdex = std::execution;
struct read_file_sender
{
using sender_concept = stdex::sender_t;
//`(1)`
using completion_signatures =
//`(2)`
stdex::completion_signatures<
stdex::set_value_t( int, char* ),
stdex::set_error_t( int ) >;
auto connect( stdex::receiver auto rcvr )
//`(3)`
{
return read_file_operation{{}, {}, pfile, buffer, size, std::move(rcvr)};
}
FILE* pfile;
char* buffer;
int size;
};
发送者
的工作是描述异步操作
.(它也是操作状态
的工厂,但这是第2步.在标有"(1)"
的行上,按发送者声明此类型.在标有"(2)"
的行上,声明了完成此异步操作
的方式.
使用函数类型
列表来完成:
stdex::set_value_t( int, char* )
…声明此异步操作
可通过将整
和char*
传递给值回调
来成功完成.记住,有三个回调.还有该:
stdex::set_error_t( int )
…声明此异步操作
可能会通过向错误回调
传递整
来在错误中完成.如果此异步操作
是可取消
的,它将使用stdex::set_stopped_t()
声明.
第2步:数据
在上面标有"(3)"
的行上,连接,成员函数
接受"接收者"
并返回"操作状态"
.接收者
是三个回调
的合并:值,错误和停止
(大致是已取消
).
连接发送者和接收者
的结果是操作状态
.操作状态
,与CAPI
中的覆盖
结构一样,是异步操作
的数据
.它必须保存在持续时间
内的稳定的地址
.
连接
函数返回一个read_file_operation
对象.连接
的调用者
负责确保保活此对象
,且在执行其中一个回调
前不会移动.
read_file_operation
类型如下:
struct immovable {
immovable() = default;
immovable(immovable&&) = delete;
};
template <class Receiver>
struct read_file_operation : overlapped, immovable
//`(1)`
{
static void _callback(int status, int bytes,
//`(2)`
overlapped* data)
{
auto* op =
static_cast<read_file_operation*>(data);
//`(3)`
if (status == OK)
stdex::set_value(std::move(op->rcvr),
//`(4)`
bytes, op->buffer);
else
stdex::set_error(std::move(op->rcvr), status);
}
void start() noexcept
//`(5)`
{
int status =
read_file(pfile, buffer, size, this, &_callback);
if (status != OK)
stdex::set_error(std::move(rcvr), status);
}
FILE* pfile;
char* buffer;
int size;
Receiver rcvr;
};
操作状态
存储初化异步操作
的期望参数
及接收者
(三个回调).按行分析一下.
1,"(1)"
:操作状态
从覆盖
继承,因此可把它的指针传递进read_file
.它还从不可移动(immovable)
的结构继承.虽然不是绝对必要
的,但这可确保不会意外移动操作状态
.
2,"(2)"
:按类静态函数
定义传递给read_file
的overlapped_callback
.
3,"(3)"
:在回调中,将覆盖的指针
下转至指向read_file_operation
对象的指针.
4,"(4)"
:在回调中,检查状态
,看看是否成功完成操作
,并适当调用接收者
的set_value
或set_error
.
5,"(5)"
:在必须要有所有操作状态
的start()
函数中,实际初化读操作
.如果初化失败
,因为不会执行回调,会立即将错误传递给接收者
.
第3步:初化
注意,当调用发送者
版本的async_read_file
函数时,只是在构造一个发送者
.并未开始实际工作.
然后用接收者调用连接
并返回操作状态
,但仍没有开始任何工作.
只是刚刚排好队,确保一切都在稳定的地址
,这样可开始工作.在操作状态
调用.start()
之前,不会初化工作.只有这样,才会调用C风格的read_fileAPI
,从而排队IO
操作.
一旦开始构建发送者
的管道和任务图
,所有这些就很重要.把初化工作
与构建操作状态
分开,可按一个包含整个任务图
期望的所有数据
的状态,聚集大量操作状态
,从而在开始任何工作
前将所有内容
旋转到位.
即可仅使用单个动态分配
或有时不分配
,就启动大量有复杂依赖的异步工作
.
必须说明,当我说连接
的调用者直到执行其中一个回调,要在稳定地址
保持操作状态
时,我有点胡言乱语.
只有在调用.start()
后,这才会变成现实.只要尚未调用.start()
,将发送者
连接到接收者
,然后在地板上删除操作状态
是完全可接受的.
但是调用.start()
后,你就承诺了..start()
发射火箭.没有退路了.
好的,已构造了操作状态
,并在其上调用了.start()
.
第4步:延续
操作系统发挥其IO
神奇.时间流逝.完成IO
后,它将用状态码
,覆盖结构指针
(read_file_operation
),调用_callback
函数,如果成功,读取了字节数
.
_callback
将完成信息传递给连接到发送者
的接收者,圆圈完成.
但是等等,"第5步:释放"呢从一开始就没有真正分配过任何东西!连接
函数按值返回操作状态
.由连接的调用者,无论是谁,来保活它.
可通过在堆上放置它
来完成,此时,他们负责清理它.或,如果此异步操作
是任务图
的一部分,他们可通过在更大的状态
中聚集操作状态
来完成.
第6步:获利!
此时,你会问,这一切的意义何在.发送者和接收者
,麻烦的生命期
期望操作状态
,连接,初化,三个不同回调
,谁想要管理
所有这些?CAPI
要简单得多
.是真!
则,为什么我对这一切如此激动?
async_read_file
的调用者不需要关心这些
.
终端用户
,即async_read_file
的调用者,不关心接收者
和操作状态
.他们只是在协程
中等待发送者
.如下代码
使用stdexec
库中的协程
任务类型.
exec::task< std::string > process_file( FILE* pfile )
{
std::string str;
str.resize( MAX_BUFFER );
auto [bytes, buff] =
co_await async_read_file(pfile, str.data(), str.size());
str.resize( bytes );
co_return str;
}
这有什么神奇?写了一个发送者
,而不是一个可等待
,对吧,但这是工作代码!
这是可从标准异步模型
编程中受益的地方.通用代码,无论是来自标准库
还是来自第三方库
,都将与发送者
一起使用.
上例,stdexec
库中的task<>
类型知道如何等待任何像发送者的东西
.如果你有发送者
,则无需执行任何额外工作
,即可co_await
它.
针对常见异步模式
,P2300
还附带了一小部分
通用异步算法
,如链接(then
),动态选择下个任务
(let_value
),按(when_all
)分组发送者
,及阻止直到发送者完成
(sync_wait
).
可以肯定的是,这是一个很简单
的集合,但它会随着未来的标准
而增长.随着第三方库
开始带该模型,越来越多
的异步代码将协同工作
.
为什么要使用它?
你想用发送者,因为可用其他库
的通用算法
将异步操作
与其他库中的其他操作
拼接在一起.因此,你可无需编写额外的代码行
,在协程
中co_await
异步操作.
为什么C++
有协程
时需要发送者?
我承认实现发送者
比使用普通C风格
回调更复杂
.但是使用发送者
与输入co_await
一样简单,或像传递参数给sync_wait()
此类算法一样简单.
选入发送者
就是选入一个不断增长
的可重用
代码的生态系统
.
因此发送者
很有趣.
毕竟,在发送者
中包装read_file
不难.