高级异步编程
在 Rust 精通篇中,我们将深入探索 Rust 的高级异步编程技术。Rust 的异步编程模型基于 Future 特征和异步运行时,提供了高效的非阻塞 I/O 和并发处理能力。在本章中,我们将超越基础知识,探索如何构建高性能异步系统和自定义执行器。
异步编程回顾
在深入高级主题之前,让我们简要回顾 Rust 的异步编程模型:
use std::time::Duration;
use tokio::time;
#[tokio::main]
async fn main() {
// 创建两个异步任务
let task1 = async {
time::sleep(Duration::from_millis(100)).await;
println!("任务 1 完成");
1
};
let task2 = async {
time::sleep(Duration::from_millis(50)).await;
println!("任务 2 完成");
2
};
// 并发执行两个任务
let (result1, result2) = tokio::join!(task1, task2);
println!("结果: {} + {} = {}", result1, result2, result1 + result2);
}
Rust 的异步编程基于以下核心概念:
- Future 特征:表示可能尚未完成的计算
- async/await 语法:简化异步代码的编写
- 异步运行时:如 Tokio、async-std 等,负责执行和调度异步任务
- 任务(Task):可独立调度的异步执行单元
Future 深入理解
Future 特征的内部机制
Future
特征是 Rust 异步编程的核心:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
poll
方法是 Future 的核心,它尝试将异步计算推进到完成状态:
- 如果 Future 已完成,返回
Poll::Ready(result)
- 如果 Future 尚未完成,返回
Poll::Pending
并安排在事件发生时重新调用poll
手动实现 Future
下面是一个简单的 Future 实现示例:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.when {
println!("Future 已完成");
Poll::Ready(())
} else {
// 安排在未来某个时刻重新调用 poll
let waker = cx.waker().clone();
let when = self.when;
std::thread::spawn(move || {
let now = Instant::now();
if now < when {
std::thread::sleep(when - now);
}
waker.wake();
});
println!("Future 尚未完成");
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let future = Delay {
when: Instant::now() + Duration::from_secs(1),
};
println!("等待 Future 完成...");
future.await;
println!("主函数结束");
}
Pin 和 Unpin
Pin
类型在异步 Rust 中至关重要,它防止自引用结构在内存中被移动:
use std::marker::PhantomPinned;
use std::pin::Pin;
// 自引用结构体
struct SelfReferential {
data: String,
// 指向 data 字段的指针
ptr_to_data: *const String,
// 标记此类型不能安全地实现 Unpin
_marker: PhantomPinned,
}
impl SelfReferential {
fn new(data: String) -> Pin<Box<Self>> {
let mut boxed = Box::new(SelfReferential {
data,
ptr_to_data: std::ptr::null(),
_marker: PhantomPinned,
});
let ptr = &boxed.data as *const String;
boxed.ptr_to_data = ptr;
// 将 Box 转换为 Pin<Box>
Pin::new(boxed)
}
fn get_data(self: Pin<&Self>) -> &str {
// 安全:数据不会被移动,因为它被固定了
let self_ref = unsafe { self.get_ref() };
&self_ref.data
}
fn get_ptr(self: Pin<&Self>) -> *const String {
self.ptr_to_data
}
}
fn main() {
let pinned = SelfReferential::new("hello".to_string());
// 验证指针确实指向数据
let data_ptr = &pinned.data as *const String;
let ptr = pinned.as_ref().get_ptr();
println!("数据指针: {:?}", data_ptr);
println!("存储的指针: {:?}", ptr);
println!("数据: {}", pinned.as_ref().get_data());
assert_eq!(data_ptr, ptr);
}
异步运行时深入剖析
执行器(Executor)工作原理
异步执行器负责调度和运行 Future,下面是一个简单执行器的实现:
use futures::future::BoxFuture;
use futures::task::{waker_ref, ArcWake};
use futures::Future;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::task::{Context, Poll};
// 任务结构,包含一个 Future
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
sender: Sender<Arc<Task>>,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// 将自己发送到任务队列,以便重新执行
let cloned = arc_self.clone();
arc_self.sender.send(cloned).expect("任务队列已满");
}
}
// 简单的执行器
struct Executor {
sender: Sender<Arc<Task>>,
receiver: Receiver<Arc<Task>>,
}
impl Executor {
fn new() -> Self {
let (sender, receiver) = channel();
Executor { sender, receiver }
}
// 生成新任务
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let task = Arc::new(Task {
future: Mutex::new(Some(Box::pin(future))),
sender: self.sender.clone(),
});
self.sender.send(task).expect("任务队列已满");
}
// 运行执行器
fn run(&self) {
while let Ok(task) = self.receiver.recv() {
// 创建 waker 和上下文
let waker = waker_ref(&task);
let mut context = Context::from_waker(&waker);
// 尝试推进 Future
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
match Future::poll(Pin::new(&mut future), &mut context) {
Poll::Pending => {
// Future 尚未完成,放回任务中
*future_slot = Some(future);
}
Poll::Ready(()) => {
// Future 已完成,丢弃它
// 不需要放回 future_slot
}
}
}
}
}
}
fn main() {
let executor = Executor::new();
// 生成一些任务
executor.spawn(async {
println!("任务 1 开始");
// 模拟异步操作
futures::future::ready(()).await;
println!("任务 1 完成");
});
executor.spawn(async {
println!("任务 2 开始");
// 模拟异步操作
futures::future::ready(()).await;
println!("任务 2 完成");
});
// 运行执行器
executor.run();
}
事件循环与反应器(Reactor)
完整的异步运行时通常包含执行器和反应器两部分:
- 执行器:负责调度和运行 Future
- 反应器:负责监听 I/O 事件并唤醒相关任务
下面是一个简化的反应器示例:
use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;
use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use std::task::Waker;
// 简化的反应器
struct Reactor {
poll: Poll,
wakers: Mutex<HashMap<Token, Waker>>,
}
impl Reactor {
fn new() -> io::Result<Self> {
Ok(Reactor {
poll: Poll::new()?,
wakers: Mutex::new(HashMap::new()),
})
}
// 注册 I/O 资源和唤醒器
fn register(&self, source: &mut TcpListener, token: Token, waker: Waker) -> io::Result<()> {
self.poll.registry().register(source, token, Interest::READABLE)?;
self.wakers.lock().unwrap().insert(token, waker);
Ok(())
}
// 运行一次事件循环
fn run_once(&self) -> io::Result<()> {
let mut events = Events::with_capacity(1024);
self.poll.poll(&mut events, None)?;
for event in events.iter() {
if let Some(waker) = self.wakers.lock().unwrap().get(&event.token()) {
waker.wake_by_ref();
}
}
Ok(())
}
}
高级异步模式
流(Stream)处理
Stream
特征类似于 Future
,但可以产生多个值:
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// 创建一个简单的流
let mut stream = stream::iter(1..=5);
// 使用 next 方法逐个处理流中的元素
while let Some(value) = stream.next().await {
println!("值: {}", value);
}
// 使用组合子处理流
let sum = stream::iter(1..=10)
.map(|x| x * 2)
.filter(|x| futures::future::ready(*x % 3 == 0))
.fold(0, |acc, x| async move { acc + x })
.await;
println!("总和: {}", sum);
}
并发控制模式
信号量(Semaphore)
信号量用于限制并发任务数量:
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time;
#[tokio::main]
async fn main() {
// 创建一个容量为 3 的信号量
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for id in 0..8 {
let semaphore = Arc::clone(&semaphore);
let handle = tokio::spawn(async move {
// 获取许可
let permit = semaphore.acquire().await.unwrap();
println!("任务 {} 获取许可,开始执行", id);
// 模拟工作
time::sleep(Duration::from_secs(2)).await;
println!("任务 {} 完成", id);
// 许可在 permit 被丢弃时自动释放
drop(permit);
});
handles.push(handle);
}
// 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
}
超时处理
为异步操作设置超时:
use std::time::Duration;
use tokio::time;
async fn long_running_task() -> String {
time::sleep(Duration::from_secs(5)).await;
"任务完成".to_string()
}
#[tokio::main]
async fn main() {
// 使用 timeout 包装异步操作
match time::timeout(Duration::from_secs(2), long_running_task()).await {
Ok(result) => println!("任务结果: {}", result),
Err(_) => println!("任务超时"),
}
// 使用 select! 实现超时
tokio::select! {
result = long_running_task() => {
println!("任务结果: {}", result);
}
_ = time::sleep(Duration::from_secs(2)) => {
println!("任务超时");
}
}
}
取消和超时
在 Tokio 中实现任务取消:
use tokio::sync::oneshot;
use tokio::time;
use std::time::Duration;
async fn cancelable_task(mut cancel_rx: oneshot::Receiver<()>) -> Option<String> {
tokio::select! {
_ = &mut cancel_rx => {
println!("任务被取消");
None
}
_ = async {
// 模拟长时间运行的任务
time::sleep(Duration::from_secs(3)).await;
"任务完成".to_string()
} => Some("任务完成".to_string()),
}
}
#[tokio::main]
async fn main() {
// 创建取消通道
let (cancel_tx, cancel_rx) = oneshot::channel();
// 生成可取消的任务
let handle = tokio::spawn(cancelable_task(cancel_rx));
// 等待一段时间后取消任务
time::sleep(Duration::from_secs(1)).await;
cancel_tx.send(()).unwrap();
// 等待任务完成
let result = handle.await.unwrap();
println!("结果: {:?}", result);
}
自定义异步 I/O
实现异步读取器(AsyncRead)
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;
// 自定义异步读取器
struct MyAsyncReader {
buffer: Vec<u8>,
position: usize,
}
impl MyAsyncReader {
fn new(data: Vec<u8>) -> Self {
MyAsyncReader {
buffer: data,
position: 0,
}
}
}
impl AsyncRead for MyAsyncReader {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.position >= self.buffer.len() {
return Poll::Ready(Ok(()));
}
let remaining = &self.buffer[self.position..];
let amt = std::cmp::min(remaining.len(), buf.remaining());
buf.put_slice(&remaining[..amt]);
self.position += amt;
Poll::Ready(Ok(()))
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
use tokio::io::AsyncReadExt;
let data = b"Hello, async world!";
let mut reader = MyAsyncReader::new(data.to_vec());
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
println!("读取的数据: {}", String::from_utf8_lossy(&buffer));
Ok(())
}
实现异步写入器(AsyncWrite)
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
// 自定义异步写入器
struct MyAsyncWriter {
buffer: Vec<u8>,
}
impl MyAsyncWriter {
fn new() -> Self {
MyAsyncWriter {
buffer: Vec::new(),
}
}
fn get_written_data(&self) -> &[u8] {
&self.buffer
}
}
impl AsyncWrite for MyAsyncWriter {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.buffer.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
use tokio::io::AsyncWriteExt;
let mut writer = MyAsyncWriter::new();
writer.write_all(b"Hello, async world!").await?;
println!("写入的数据: {}", String::from_utf8_lossy(writer.get_written_data()));
Ok(())
}
异步设计模式
背压(Backpressure)处理
背压是指当生产者产生数据的速度超过消费者处理的速度时,系统需要限制生产者的速度或缓冲数据:
use tokio::sync::mpsc;
use tokio::time;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 创建有界通道,提供背压机制
let (tx, mut rx) = mpsc::channel(5);
// 生产者任务
let producer = tokio::spawn(async move {
for i in 1..=20 {
// 当通道已满时,send 会等待,实现背压
if let Err(e) = tx.send(i).await {
println!("发送错误: {}", e);
return;
}
println!("发送: {}", i);
time::sleep(Duration::from_millis(100)).await;
}
});
// 消费者任务
let consumer = tokio::spawn(async move {
while let Some(value) = rx.recv().await {
println!("接收: {}", value);
// 消费者处理较慢
time::sleep(Duration::from_millis(300)).await;
}
});
// 等待任务完成
let _ = tokio::join!(producer, consumer);
}
优雅关闭(Graceful Shutdown)
实现异步系统的优雅关闭:
use tokio::signal;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use std::time::Duration;
// 工作任务结构
struct Worker {
id: usize,
shutdown_rx: oneshot::Receiver<()>,
}
impl Worker {
async fn run(&mut self) {
loop {
tokio::select! {
_ = &mut self.shutdown_rx => {
println!("工作者 {} 收到关闭信号", self.id);
// 执行清理操作
time::sleep(Duration::from_millis(500)).await;
println!("工作者 {} 已清理资源并关闭", self.id);
return;
}
_ = time::sleep(Duration::from_secs(1)) => {
println!("工作者 {} 正在处理...", self.id);
}
}
}
}
}
#[tokio::main]
async fn main() {
// 创建关闭通知通道
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
// 启动工作者
let mut worker_handles = vec![];
let mut shutdown_senders = vec![];
for i in 0..3 {
let (tx, rx) = oneshot::channel();
shutdown_senders.push(tx);
let mut worker = Worker {
id: i,
shutdown_rx: rx,
};
worker_handles.push(tokio::spawn(async move {
worker.run().await;
}));
}
// 等待关闭信号
tokio::select! {
_ = signal::ctrl_c() => {
println!("收到 Ctrl+C,开始优雅关闭");
}
_ = time::sleep(Duration::from_secs(5)) => {
println!("模拟关闭信号,开始优雅关闭");
}
}
// 发送关闭信号给所有工作者
for tx in shutdown_senders {
let _ = tx.send(());
}
// 通知关闭监听器
let _ = shutdown_tx.send(()).await;
// 等待所有工作者完成
for handle in worker_handles {
let _ = handle.await;
}
println!("所有工作者已关闭,程序退出");
}
性能优化技术
任务本地存储(Task-Local Storage)
use tokio::task::LocalKey;
use std::cell::RefCell;
thread_local! {
static COUNTER: RefCell<u32> = RefCell::new(0);
}
#[tokio::main]
async fn main() {
// 在主任务中访问
COUNTER.with(|counter| {
*counter.borrow_mut() += 1;
println!("主任务计数: {}", *counter.borrow());
});
// 在子任务中访问
let task1 = tokio::spawn(async {
COUNTER.with(|counter| {
*counter.borrow_mut() += 1;
println!("任务 1 计数: {}", *counter.borrow());
});
});
let task2 = tokio::spawn(async {
COUNTER.with(|counter| {
*counter.borrow_mut() += 1;
println!("任务 2 计数: {}", *counter.borrow());
});
});
let _ = tokio::join!(task1, task2);
// 再次在主任务中访问
COUNTER.with(|counter| {
println!("最终主任务计数: {}", *counter.borrow());
});
}
异步栈展开(Async Stack Unwinding)
use std::panic;
use tokio::task;
async fn might_panic(should_panic: bool) -> Result<(), String> {
if should_panic {
panic!("任务发生恐慌");
}
Ok(())
}
#[tokio::main]
async fn main() {
// 设置恐慌钩子
panic::set_hook(Box::new(|info| {
println!("捕获到恐慌: {}", info);
}));
// 使用 catch_unwind 捕获异步任务中的恐慌
let result = task::spawn(async {
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
might_panic(true)
}));
match result {
Ok(future) => {
// 继续处理 future
println!("Future 创建成功");
future
}
Err(e) => {
println!("捕获到恐慌: {:?}", e);
async { Ok(()) }
}
}
}).await;
println!("任务结果: {:?}", result);
// 另一种方法:使用 tokio::spawn 的错误处理
let handle = task::spawn(async {
might_panic(true).await
});
match handle.await {
Ok(result) => println!("任务成功: {:?}", result),
Err(e) => println!("任务失败: {}", e),
}
}
实际应用案例
构建高性能 HTTP 服务器
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use std::error::Error;
async fn handle_connection(mut stream: TcpStream) -> Result<(), Box<dyn Error>> {
let mut buffer = [0; 1024];
// 读取请求
let n = stream.read(&mut buffer).await?;
let request = String::from_utf8_lossy(&buffer[..n]);
println!("收到请求:\n{}", request);
// 构造响应
let response = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 12\r\n\r\nHello, Rust!";
// 发送响应
stream.write_all(response.as_bytes()).await?;
stream.flush().await?;