高级并发模式
在 Rust 精通篇中,我们将深入探索 Rust 的高级并发编程模式。Rust 的所有权系统和类型系统为并发编程提供了强大的安全保障,使我们能够在编译时捕获大多数并发错误。在本章中,我们将超越基本的并发原语,探索更复杂的并发模式和无锁数据结构。
并发模型回顾
在深入高级主题之前,让我们简要回顾 Rust 的并发模型:
use std::thread;
use std::sync::{Arc, Mutex};
use std::time::Duration;
fn main() {
// 共享状态并发
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
thread::sleep(Duration::from_millis(10));
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终计数: {}", *counter.lock().unwrap());
}
Rust 的并发模型基于以下核心概念:
- 线程安全:通过所有权和类型系统在编译时防止数据竞争
- 消息传递:使用通道(channels)在线程间传递消息
- 共享状态:使用
Mutex
、RwLock
等同步原语安全地共享数据 - Send 和 Sync trait:控制类型在线程间的安全传递和共享
高级同步原语
读写锁(RwLock)
RwLock
允许多个读取器或单个写入器同时访问数据,适用于读多写少的场景:
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// 创建多个读取线程
for i in 0..3 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let data = data.read().unwrap();
println!("读取线程 {}: {:?}", i, *data);
}));
}
// 创建一个写入线程
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut data = data.write().unwrap();
data.push(4);
println!("写入线程: {:?}", *data);
}));
for handle in handles {
handle.join().unwrap();
}
}
条件变量(Condvar)
条件变量允许线程等待特定条件发生:
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
// 消费者线程
let consumer = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut started = lock.lock().unwrap();
// 等待直到条件变为 true
while !*started {
started = cvar.wait(started).unwrap();
}
println!("消费者: 条件已满足,继续执行");
});
// 给生产者一些时间启动
thread::sleep(std::time::Duration::from_secs(1));
// 生产者线程
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
println!("生产者: 条件已设置,通知消费者");
consumer.join().unwrap();
}
屏障(Barrier)
屏障用于同步多个线程,确保它们同时到达某个点:
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for i in 0..3 {
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
println!("线程 {} 正在准备...", i);
thread::sleep(std::time::Duration::from_secs(i));
// 等待所有线程到达屏障
let wait_result = barrier.wait();
// 只有一个线程会收到 BarrierWaitResult::Leader
if wait_result.is_leader() {
println!("所有线程已就绪,继续执行!");
}
println!("线程 {} 继续执行", i);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
无锁数据结构
无锁(lock-free)数据结构通过原子操作而非互斥锁实现线程安全,通常具有更好的性能和可伸缩性。
原子类型
Rust 标准库提供了多种原子类型,如 AtomicBool
、AtomicUsize
等:
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::sync::Arc;
fn main() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
// 无锁递增
counter.fetch_add(1, Ordering::SeqCst);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("最终计数: {}", counter.load(Ordering::SeqCst));
}
内存排序(Memory Ordering)
原子操作的内存排序模型控制操作的可见性和顺序:
Ordering::Relaxed
:最弱的排序,只保证原子性Ordering::Release
:写操作使用,确保之前的操作不会被重排到此操作之后Ordering::Acquire
:读操作使用,确保之后的操作不会被重排到此操作之前Ordering::AcqRel
:结合 Acquire 和 Release 语义Ordering::SeqCst
:最强的排序,提供全局一致的顺序
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
static READY: AtomicBool = AtomicBool::new(false);
static DATA: AtomicBool = AtomicBool::new(false);
thread::spawn(|| {
// 准备数据
DATA.store(true, Ordering::Release);
// 发出信号表明数据已准备好
READY.store(true, Ordering::Release);
});
// 等待数据准备好
while !READY.load(Ordering::Acquire) {
thread::yield_now();
}
// 数据现在可以安全访问
assert!(DATA.load(Ordering::Acquire));
println!("数据已成功同步");
}
实现简单的无锁队列
下面是一个基于原子操作的简单无锁队列实现:
use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr::null_mut;
pub struct Node<T> {
data: T,
next: AtomicPtr<Node<T>>,
}
pub struct LockFreeQueue<T> {
head: AtomicPtr<Node<T>>,
tail: AtomicPtr<Node<T>>,
}
impl<T> LockFreeQueue<T> {
pub fn new() -> Self {
let sentinel = Box::into_raw(Box::new(Node {
data: unsafe { std::mem::zeroed() },
next: AtomicPtr::new(null_mut()),
}));
LockFreeQueue {
head: AtomicPtr::new(sentinel),
tail: AtomicPtr::new(sentinel),
}
}
pub fn enqueue(&self, data: T) {
let new_node = Box::into_raw(Box::new(Node {
data,
next: AtomicPtr::new(null_mut()),
}));
loop {
let tail = self.tail.load(Ordering::Acquire);
let next = unsafe { (*tail).next.load(Ordering::Acquire) };
if tail == self.tail.load(Ordering::Acquire) {
if next.is_null() {
// 尝试将新节点添加到尾部
if unsafe { (*tail).next.compare_exchange(
null_mut(),
new_node,
Ordering::Release,
Ordering::Relaxed,
) }.is_ok() {
// 更新尾指针
let _ = self.tail.compare_exchange(
tail,
new_node,
Ordering::Release,
Ordering::Relaxed,
);
return;
}
} else {
// 尾指针落后,帮助更新
let _ = self.tail.compare_exchange(
tail,
next,
Ordering::Release,
Ordering::Relaxed,
);
}
}
}
}
pub fn dequeue(&self) -> Option<T> {
loop {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
let next = unsafe { (*head).next.load(Ordering::Acquire) };
if head == self.head.load(Ordering::Acquire) {
if head == tail {
if next.is_null() {
// 队列为空
return None;
}
// 尾指针落后,帮助更新
let _ = self.tail.compare_exchange(
tail,
next,
Ordering::Release,
Ordering::Relaxed,
);
} else {
// 读取数据
let data = unsafe { std::ptr::read(&(*next).data) };
// 尝试更新头指针
if self.head.compare_exchange(
head,
next,
Ordering::Release,
Ordering::Relaxed,
).is_ok() {
// 释放旧的哨兵节点
unsafe { Box::from_raw(head) };
return Some(data);
}
}
}
}
}
}
impl<T> Drop for LockFreeQueue<T> {
fn drop(&mut self) {
while self.dequeue().is_some() {}
// 释放哨兵节点
let sentinel = self.head.load(Ordering::Relaxed);
if !sentinel.is_null() {
unsafe { Box::from_raw(sentinel) };
}
}
}
Actor 模型
Actor 模型是一种并发编程范式,其中每个 actor 是一个独立的计算单元,通过消息传递进行通信。
使用 Actix 实现 Actor 系统
Actix 是 Rust 中流行的 Actor 框架:
use actix::prelude::*;
// 定义消息
#[derive(Message)]
#[rtype(result = "String")]
struct Ping(String);
// 定义 Actor
struct MyActor;
impl Actor for MyActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("Actor 已启动");
}
}
// 实现消息处理
impl Handler<Ping> for MyActor {
type Result = String;
fn handle(&mut self, msg: Ping, _ctx: &mut Context<Self>) -> Self::Result {
println!("收到消息: {}", msg.0);
format!("Pong: {}", msg.0)
}
}
#[actix_rt::main]
async fn main() {
// 创建 actor
let addr = MyActor.start();
// 发送消息并等待响应
let res = addr.send(Ping("Hello Actor".to_string())).await;
match res {
Ok(result) => println!("收到响应: {}", result),
Err(err) => println!("错误: {}", err),
}
System::current().stop();
}
工作窃取(Work Stealing)
工作窃取是一种任务调度算法,允许空闲线程从繁忙线程的队列中