Rust从入门到精通之精通篇:24.高级异步编程

发布于:2025-03-26 ⋅ 阅读:(33) ⋅ 点赞:(0)

高级异步编程

在 Rust 精通篇中,我们将深入探索 Rust 的高级异步编程技术。Rust 的异步编程模型基于 Future 特征和异步运行时,提供了高效的非阻塞 I/O 和并发处理能力。在本章中,我们将超越基础知识,探索如何构建高性能异步系统和自定义执行器。

异步编程回顾

在深入高级主题之前,让我们简要回顾 Rust 的异步编程模型:

use std::time::Duration;
use tokio::time;

#[tokio::main]
async fn main() {
    // 创建两个异步任务
    let task1 = async {
        time::sleep(Duration::from_millis(100)).await;
        println!("任务 1 完成");
        1
    };
    
    let task2 = async {
        time::sleep(Duration::from_millis(50)).await;
        println!("任务 2 完成");
        2
    };
    
    // 并发执行两个任务
    let (result1, result2) = tokio::join!(task1, task2);
    println!("结果: {} + {} = {}", result1, result2, result1 + result2);
}

Rust 的异步编程基于以下核心概念:

  1. Future 特征:表示可能尚未完成的计算
  2. async/await 语法:简化异步代码的编写
  3. 异步运行时:如 Tokio、async-std 等,负责执行和调度异步任务
  4. 任务(Task):可独立调度的异步执行单元

Future 深入理解

Future 特征的内部机制

Future 特征是 Rust 异步编程的核心:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

poll 方法是 Future 的核心,它尝试将异步计算推进到完成状态:

  • 如果 Future 已完成,返回 Poll::Ready(result)
  • 如果 Future 尚未完成,返回 Poll::Pending 并安排在事件发生时重新调用 poll

手动实现 Future

下面是一个简单的 Future 实现示例:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = ();
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.when {
            println!("Future 已完成");
            Poll::Ready(())
        } else {
            // 安排在未来某个时刻重新调用 poll
            let waker = cx.waker().clone();
            let when = self.when;
            
            std::thread::spawn(move || {
                let now = Instant::now();
                if now < when {
                    std::thread::sleep(when - now);
                }
                waker.wake();
            });
            
            println!("Future 尚未完成");
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let future = Delay {
        when: Instant::now() + Duration::from_secs(1),
    };
    
    println!("等待 Future 完成...");
    future.await;
    println!("主函数结束");
}

Pin 和 Unpin

Pin 类型在异步 Rust 中至关重要,它防止自引用结构在内存中被移动:

use std::marker::PhantomPinned;
use std::pin::Pin;

// 自引用结构体
struct SelfReferential {
    data: String,
    // 指向 data 字段的指针
    ptr_to_data: *const String,
    // 标记此类型不能安全地实现 Unpin
    _marker: PhantomPinned,
}

impl SelfReferential {
    fn new(data: String) -> Pin<Box<Self>> {
        let mut boxed = Box::new(SelfReferential {
            data,
            ptr_to_data: std::ptr::null(),
            _marker: PhantomPinned,
        });
        
        let ptr = &boxed.data as *const String;
        boxed.ptr_to_data = ptr;
        
        // 将 Box 转换为 Pin<Box>
        Pin::new(boxed)
    }
    
    fn get_data(self: Pin<&Self>) -> &str {
        // 安全:数据不会被移动,因为它被固定了
        let self_ref = unsafe { self.get_ref() };
        &self_ref.data
    }
    
    fn get_ptr(self: Pin<&Self>) -> *const String {
        self.ptr_to_data
    }
}

fn main() {
    let pinned = SelfReferential::new("hello".to_string());
    
    // 验证指针确实指向数据
    let data_ptr = &pinned.data as *const String;
    let ptr = pinned.as_ref().get_ptr();
    
    println!("数据指针: {:?}", data_ptr);
    println!("存储的指针: {:?}", ptr);
    println!("数据: {}", pinned.as_ref().get_data());
    
    assert_eq!(data_ptr, ptr);
}

异步运行时深入剖析

执行器(Executor)工作原理

异步执行器负责调度和运行 Future,下面是一个简单执行器的实现:

use futures::future::BoxFuture;
use futures::task::{waker_ref, ArcWake};
use futures::Future;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::task::{Context, Poll};

// 任务结构,包含一个 Future
struct Task {
    future: Mutex<Option<BoxFuture<'static, ()>>>,
    sender: Sender<Arc<Task>>,
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 将自己发送到任务队列,以便重新执行
        let cloned = arc_self.clone();
        arc_self.sender.send(cloned).expect("任务队列已满");
    }
}

// 简单的执行器
struct Executor {
    sender: Sender<Arc<Task>>,
    receiver: Receiver<Arc<Task>>,
}

impl Executor {
    fn new() -> Self {
        let (sender, receiver) = channel();
        Executor { sender, receiver }
    }
    
    // 生成新任务
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let task = Arc::new(Task {
            future: Mutex::new(Some(Box::pin(future))),
            sender: self.sender.clone(),
        });
        
        self.sender.send(task).expect("任务队列已满");
    }
    
    // 运行执行器
    fn run(&self) {
        while let Ok(task) = self.receiver.recv() {
            // 创建 waker 和上下文
            let waker = waker_ref(&task);
            let mut context = Context::from_waker(&waker);
            
            // 尝试推进 Future
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                match Future::poll(Pin::new(&mut future), &mut context) {
                    Poll::Pending => {
                        // Future 尚未完成,放回任务中
                        *future_slot = Some(future);
                    }
                    Poll::Ready(()) => {
                        // Future 已完成,丢弃它
                        // 不需要放回 future_slot
                    }
                }
            }
        }
    }
}

fn main() {
    let executor = Executor::new();
    
    // 生成一些任务
    executor.spawn(async {
        println!("任务 1 开始");
        // 模拟异步操作
        futures::future::ready(()).await;
        println!("任务 1 完成");
    });
    
    executor.spawn(async {
        println!("任务 2 开始");
        // 模拟异步操作
        futures::future::ready(()).await;
        println!("任务 2 完成");
    });
    
    // 运行执行器
    executor.run();
}

事件循环与反应器(Reactor)

完整的异步运行时通常包含执行器和反应器两部分:

  • 执行器:负责调度和运行 Future
  • 反应器:负责监听 I/O 事件并唤醒相关任务

下面是一个简化的反应器示例:

use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;
use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use std::task::Waker;

// 简化的反应器
struct Reactor {
    poll: Poll,
    wakers: Mutex<HashMap<Token, Waker>>,
}

impl Reactor {
    fn new() -> io::Result<Self> {
        Ok(Reactor {
            poll: Poll::new()?,
            wakers: Mutex::new(HashMap::new()),
        })
    }
    
    // 注册 I/O 资源和唤醒器
    fn register(&self, source: &mut TcpListener, token: Token, waker: Waker) -> io::Result<()> {
        self.poll.registry().register(source, token, Interest::READABLE)?;
        self.wakers.lock().unwrap().insert(token, waker);
        Ok(())
    }
    
    // 运行一次事件循环
    fn run_once(&self) -> io::Result<()> {
        let mut events = Events::with_capacity(1024);
        self.poll.poll(&mut events, None)?;
        
        for event in events.iter() {
            if let Some(waker) = self.wakers.lock().unwrap().get(&event.token()) {
                waker.wake_by_ref();
            }
        }
        
        Ok(())
    }
}

高级异步模式

流(Stream)处理

Stream 特征类似于 Future,但可以产生多个值:

use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    // 创建一个简单的流
    let mut stream = stream::iter(1..=5);
    
    // 使用 next 方法逐个处理流中的元素
    while let Some(value) = stream.next().await {
        println!("值: {}", value);
    }
    
    // 使用组合子处理流
    let sum = stream::iter(1..=10)
        .map(|x| x * 2)
        .filter(|x| futures::future::ready(*x % 3 == 0))
        .fold(0, |acc, x| async move { acc + x })
        .await;
    
    println!("总和: {}", sum);
}

并发控制模式

信号量(Semaphore)

信号量用于限制并发任务数量:

use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time;

#[tokio::main]
async fn main() {
    // 创建一个容量为 3 的信号量
    let semaphore = Arc::new(Semaphore::new(3));
    let mut handles = vec![];
    
    for id in 0..8 {
        let semaphore = Arc::clone(&semaphore);
        let handle = tokio::spawn(async move {
            // 获取许可
            let permit = semaphore.acquire().await.unwrap();
            println!("任务 {} 获取许可,开始执行", id);
            
            // 模拟工作
            time::sleep(Duration::from_secs(2)).await;
            
            println!("任务 {} 完成", id);
            // 许可在 permit 被丢弃时自动释放
            drop(permit);
        });
        
        handles.push(handle);
    }
    
    // 等待所有任务完成
    for handle in handles {
        handle.await.unwrap();
    }
}
超时处理

为异步操作设置超时:

use std::time::Duration;
use tokio::time;

async fn long_running_task() -> String {
    time::sleep(Duration::from_secs(5)).await;
    "任务完成".to_string()
}

#[tokio::main]
async fn main() {
    // 使用 timeout 包装异步操作
    match time::timeout(Duration::from_secs(2), long_running_task()).await {
        Ok(result) => println!("任务结果: {}", result),
        Err(_) => println!("任务超时"),
    }
    
    // 使用 select! 实现超时
    tokio::select! {
        result = long_running_task() => {
            println!("任务结果: {}", result);
        }
        _ = time::sleep(Duration::from_secs(2)) => {
            println!("任务超时");
        }
    }
}

取消和超时

在 Tokio 中实现任务取消:

use tokio::sync::oneshot;
use tokio::time;
use std::time::Duration;

async fn cancelable_task(mut cancel_rx: oneshot::Receiver<()>) -> Option<String> {
    tokio::select! {
        _ = &mut cancel_rx => {
            println!("任务被取消");
            None
        }
        _ = async {
            // 模拟长时间运行的任务
            time::sleep(Duration::from_secs(3)).await;
            "任务完成".to_string()
        } => Some("任务完成".to_string()),
    }
}

#[tokio::main]
async fn main() {
    // 创建取消通道
    let (cancel_tx, cancel_rx) = oneshot::channel();
    
    // 生成可取消的任务
    let handle = tokio::spawn(cancelable_task(cancel_rx));
    
    // 等待一段时间后取消任务
    time::sleep(Duration::from_secs(1)).await;
    cancel_tx.send(()).unwrap();
    
    // 等待任务完成
    let result = handle.await.unwrap();
    println!("结果: {:?}", result);
}

自定义异步 I/O

实现异步读取器(AsyncRead)

use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;

// 自定义异步读取器
struct MyAsyncReader {
    buffer: Vec<u8>,
    position: usize,
}

impl MyAsyncReader {
    fn new(data: Vec<u8>) -> Self {
        MyAsyncReader {
            buffer: data,
            position: 0,
        }
    }
}

impl AsyncRead for MyAsyncReader {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        if self.position >= self.buffer.len() {
            return Poll::Ready(Ok(()));
        }
        
        let remaining = &self.buffer[self.position..];
        let amt = std::cmp::min(remaining.len(), buf.remaining());
        
        buf.put_slice(&remaining[..amt]);
        self.position += amt;
        
        Poll::Ready(Ok(()))
    }
}

#[tokio::main]
async fn main() -> io::Result<()> {
    use tokio::io::AsyncReadExt;
    
    let data = b"Hello, async world!";
    let mut reader = MyAsyncReader::new(data.to_vec());
    
    let mut buffer = Vec::new();
    reader.read_to_end(&mut buffer).await?;
    
    println!("读取的数据: {}", String::from_utf8_lossy(&buffer));
    Ok(())
}

实现异步写入器(AsyncWrite)

use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;

// 自定义异步写入器
struct MyAsyncWriter {
    buffer: Vec<u8>,
}

impl MyAsyncWriter {
    fn new() -> Self {
        MyAsyncWriter {
            buffer: Vec::new(),
        }
    }
    
    fn get_written_data(&self) -> &[u8] {
        &self.buffer
    }
}

impl AsyncWrite for MyAsyncWriter {
    fn poll_write(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        self.buffer.extend_from_slice(buf);
        Poll::Ready(Ok(buf.len()))
    }
    
    fn poll_flush(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
    
    fn poll_shutdown(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

#[tokio::main]
async fn main() -> io::Result<()> {
    use tokio::io::AsyncWriteExt;
    
    let mut writer = MyAsyncWriter::new();
    writer.write_all(b"Hello, async world!").await?;
    
    println!("写入的数据: {}", String::from_utf8_lossy(writer.get_written_data()));
    Ok(())
}

异步设计模式

背压(Backpressure)处理

背压是指当生产者产生数据的速度超过消费者处理的速度时,系统需要限制生产者的速度或缓冲数据:

use tokio::sync::mpsc;
use tokio::time;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建有界通道,提供背压机制
    let (tx, mut rx) = mpsc::channel(5);
    
    // 生产者任务
    let producer = tokio::spawn(async move {
        for i in 1..=20 {
            // 当通道已满时,send 会等待,实现背压
            if let Err(e) = tx.send(i).await {
                println!("发送错误: {}", e);
                return;
            }
            println!("发送: {}", i);
            time::sleep(Duration::from_millis(100)).await;
        }
    });
    
    // 消费者任务
    let consumer = tokio::spawn(async move {
        while let Some(value) = rx.recv().await {
            println!("接收: {}", value);
            // 消费者处理较慢
            time::sleep(Duration::from_millis(300)).await;
        }
    });
    
    // 等待任务完成
    let _ = tokio::join!(producer, consumer);
}

优雅关闭(Graceful Shutdown)

实现异步系统的优雅关闭:

use tokio::signal;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use std::time::Duration;

// 工作任务结构
struct Worker {
    id: usize,
    shutdown_rx: oneshot::Receiver<()>,
}

impl Worker {
    async fn run(&mut self) {
        loop {
            tokio::select! {
                _ = &mut self.shutdown_rx => {
                    println!("工作者 {} 收到关闭信号", self.id);
                    // 执行清理操作
                    time::sleep(Duration::from_millis(500)).await;
                    println!("工作者 {} 已清理资源并关闭", self.id);
                    return;
                }
                _ = time::sleep(Duration::from_secs(1)) => {
                    println!("工作者 {} 正在处理...", self.id);
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    // 创建关闭通知通道
    let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
    
    // 启动工作者
    let mut worker_handles = vec![];
    let mut shutdown_senders = vec![];
    
    for i in 0..3 {
        let (tx, rx) = oneshot::channel();
        shutdown_senders.push(tx);
        
        let mut worker = Worker {
            id: i,
            shutdown_rx: rx,
        };
        
        worker_handles.push(tokio::spawn(async move {
            worker.run().await;
        }));
    }
    
    // 等待关闭信号
    tokio::select! {
        _ = signal::ctrl_c() => {
            println!("收到 Ctrl+C,开始优雅关闭");
        }
        _ = time::sleep(Duration::from_secs(5)) => {
            println!("模拟关闭信号,开始优雅关闭");
        }
    }
    
    // 发送关闭信号给所有工作者
    for tx in shutdown_senders {
        let _ = tx.send(());
    }
    
    // 通知关闭监听器
    let _ = shutdown_tx.send(()).await;
    
    // 等待所有工作者完成
    for handle in worker_handles {
        let _ = handle.await;
    }
    
    println!("所有工作者已关闭,程序退出");
}

性能优化技术

任务本地存储(Task-Local Storage)

use tokio::task::LocalKey;
use std::cell::RefCell;

thread_local! {
    static COUNTER: RefCell<u32> = RefCell::new(0);
}

#[tokio::main]
async fn main() {
    // 在主任务中访问
    COUNTER.with(|counter| {
        *counter.borrow_mut() += 1;
        println!("主任务计数: {}", *counter.borrow());
    });
    
    // 在子任务中访问
    let task1 = tokio::spawn(async {
        COUNTER.with(|counter| {
            *counter.borrow_mut() += 1;
            println!("任务 1 计数: {}", *counter.borrow());
        });
    });
    
    let task2 = tokio::spawn(async {
        COUNTER.with(|counter| {
            *counter.borrow_mut() += 1;
            println!("任务 2 计数: {}", *counter.borrow());
        });
    });
    
    let _ = tokio::join!(task1, task2);
    
    // 再次在主任务中访问
    COUNTER.with(|counter| {
        println!("最终主任务计数: {}", *counter.borrow());
    });
}

异步栈展开(Async Stack Unwinding)

use std::panic;
use tokio::task;

async fn might_panic(should_panic: bool) -> Result<(), String> {
    if should_panic {
        panic!("任务发生恐慌");
    }
    Ok(())
}

#[tokio::main]
async fn main() {
    // 设置恐慌钩子
    panic::set_hook(Box::new(|info| {
        println!("捕获到恐慌: {}", info);
    }));
    
    // 使用 catch_unwind 捕获异步任务中的恐慌
    let result = task::spawn(async {
        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
            might_panic(true)
        }));
        
        match result {
            Ok(future) => {
                // 继续处理 future
                println!("Future 创建成功");
                future
            }
            Err(e) => {
                println!("捕获到恐慌: {:?}", e);
                async { Ok(()) }
            }
        }
    }).await;
    
    println!("任务结果: {:?}", result);
    
    // 另一种方法:使用 tokio::spawn 的错误处理
    let handle = task::spawn(async {
        might_panic(true).await
    });
    
    match handle.await {
        Ok(result) => println!("任务成功: {:?}", result),
        Err(e) => println!("任务失败: {}", e),
    }
}

实际应用案例

构建高性能 HTTP 服务器

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use std::error::Error;

async fn handle_connection(mut stream: TcpStream) -> Result<(), Box<dyn Error>> {
    let mut buffer = [0; 1024];
    
    // 读取请求
    let n = stream.read(&mut buffer).await?;
    let request = String::from_utf8_lossy(&buffer[..n]);
    println!("收到请求:\n{}", request);
    
    // 构造响应
    let response = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 12\r\n\r\nHello, Rust!";
    
    // 发送响应
    stream.write_all(response.as_bytes()).await?;
    stream.flush().await?;