✨✨ 欢迎大家来到景天科技苑✨✨
🎈🎈 养成好习惯,先赞后看哦~🎈🎈
🏆 作者简介:景天科技苑
🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。
🏆《博客》:Rust开发,Python全栈,Golang开发,云原生开发,PyQt5和Tkinter桌面开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi,flask等框架,云原生K8S,linux,shell脚本等实操经验,网站搭建,数据库等分享。所属的专栏:Rust高性能并发编程
景天的主页:景天科技苑
文章目录
Rust并发集合
集合类型是我们编程中常用的数据类型,Rust 中提供了一些集合类型,比如Vec<T>
、HashMap<K, V>
、HashSet<T>
、VecDeque<T>
、LinkedList<T>
、BTreeMap<K,V>
、BTreeSet<T>
等,
它们的特点如下:
• Vec - 这是一种可变大小的数组, 允许在头部或尾部高效地添加和删除元素。它类似于 C++ 的 vector 或 Java 的 ArrayList。
• HashMap<K,V>
- 这是一个哈希映射, 允许通过键快速查找值。它类似于 C++ 的unordered_map 或 Java 的 HashMap。
• HashSet - 这是一个基于哈希的集, 可以快速判断一个值是否在集合中。它类似于C++ 的 unordered_set 或 Java 的 HashSet。
• VecDeque - 这是一个双端队列, 允许在头部或尾部高效地添加和删除元素。它类似于 C++ 的 deque 或 Java 的 ArrayDeque。
• LinkedList - 这是一个链表数据结构, 允许在头部或尾部快速添加和删除元素。
• BTreeMap<K,V>
- 这是一个有序的映射, 可以通过键快速查找, 同时保持元素的排序。它使用 B 树作为底层数据结构。
• BTreeSet - 这是一个有序的集合, 元素会自动排序。它使用 B 树作为底层数据结构。
不幸的是这些类型都不是线程安全的,没有办法在线程中共享使用,
有幸的是,我们可以使用前面介绍的并发原语,对这些类型进行包装,使之成为线程安全的。
1、线程安全的 Vec
要实现线程安全的 Vec,可以使用 Arc(原子引用计数)和 Mutex(互斥锁)的组合。
Arc 允许多个线程共享拥有相同数据的所有权,而 Mutex 用于在访问数据时进行同步,确保只有一个线程能够修改数据。
以下是一个简单的例子,演示如何创建线程安全的 Vec:
use std::sync::{ Arc, Mutex };
use std::thread;
fn main() {
// 使用Arc和Mutex包装Vec
let shared_vec = Arc::new(Mutex::new(Vec::new()));
// 创建多个线程,每个线程向Vec中添加一个元素
let mut handles = vec![];
for i in 0..5 {
// 克隆shared_vec,以便在线程中使用
let shared_vec = Arc::clone(&shared_vec);
let handle = thread::spawn(move || {
// 获取锁
let mut vec = shared_vec.lock().unwrap();
// 修改Vec
vec.push(i);
});
handles.push(handle);
}
// 等待所有线程结束
for handle in handles {
handle.join().unwrap();
}
// 打印最终的Vec
let final_vec = shared_vec.lock().unwrap();
println!("Final Vec: {:?}", *final_vec);
}
在这个例子中,shared_vec 是一个 Mutex 包装的 Arc,使得多个线程能够共享对 Vec的所有权。
每个线程在修改 Vec 之前需要先获取锁,确保同一时刻只有一个线程能够修改数据。
2、线程安全的 HashMap
要实现线程安全的 HashMap,可以使用 Arc(原子引用计数)和 Mutex(互斥锁)的组合,或者使用 RwLock(读写锁)来提供更细粒度的并发控制。
以下是使用 Arc 和Mutex 的简单示例:
use std::collections::HashMap;
use std::sync::{ Arc, Mutex };
use std::thread;
fn main() {
// 使用Arc和Mutex包装HashMap
let shared_map = Arc::new(Mutex::new(HashMap::new()));
// 创建多个线程,每个线程向HashMap中添加键值对
let mut handles = vec![];
for i in 0..5 {
let shared_map = Arc::clone(&shared_map);
let handle = thread::spawn(move || {
// 获取锁
let mut map = shared_map.lock().unwrap();
// 修改HashMap
map.insert(i, i * i);
});
handles.push(handle);
}
// 等待所有线程结束
for handle in handles {
handle.join().unwrap();
}
// 获取HashMap,并输出结果
let final_map = shared_map.lock().unwrap();
println!("Final HashMap: {:?}", *final_map);
}
你发觉处理的套路都是一样的,就是使用 Arc<Mutex<T>>
实现。
使用 Arc<Mutex<T>>
组合是一种常见的方式来实现线程安全的集合类型,但不是唯一的选择。
这种组合的基本思想是使用 Arc(原子引用计数)来实现多线程间的所有权共享,而 Mutex 则提供了互斥锁,确保在任何时刻只有一个线程能够修改数据。
后面的几种集合类型都可以这么去实现。
有些场景下你可能使用 Arc<RwLock<T>>
更合适,允许多个线程同时读取数据,但只有一个线程能够写入数据。
适用于读操作频繁、写操作较少的场景。
3、dashmap
3.1 dashmap介绍
dashmap 是极快的 Rust 并发 map 实现。
DashMap 是一个 Rust 中并发关联 array/hashmap 的实现。
DashMap 试图实现一个类似于 std::collections::HashMap 的简单易用的 API, 并做了一些细微的改变来处理并发。
DashMap 的目标是变得非常简单易用, 并可直接替代 RwLock<HashMap<K, V>>。
为实现这些目标, 所有方法采用 &self 而不是修改方法采用 &mut self。
这允许您将一个DashMap 放入一个 Arc 中, 并在线程之间共享它, 同时仍然能够修改它。
DashMap 非常注重性能, 并旨在尽可能快。
dashmap 是 Rust 中一个高性能并发哈希表,类似于 std::collections::HashMap,但支持多线程并发读写,内部采用了分段锁(sharded lock)机制以提高并发性能。
特点:
多线程安全
自动分片锁(sharding),每个 shard 是一个 RwLock<HashMap<…>>
支持常见的 HashMap 操作:插入、查找、删除、遍历等
3.2 基本用法
- 创建 DashMap
use dashmap::DashMap;
fn main() {
let map: DashMap<String, i32> = DashMap::new();
}
- 插入元素
map.insert("apple".to_string(), 3);
map.insert("banana".into(), 5);
- 获取元素(只读)
if let Some(value) = map.get("apple") {
println!("apple = {}", *value);
}
返回的是 dashmap::mapref:1️⃣:Ref<K, V>,实现了 Deref。
- 修改元素(写)
if let Some(mut v) = map.get_mut("banana") {
*v += 1;
}
- 删除元素
map.remove("apple"); // 返回 Option<(K, V)>
3.3 并发使用
线程安全示例:
use dashmap::DashMap;
use std::sync::Arc;
use std::thread;
fn main() {
// 创建一个 DashMap
let map = Arc::new(DashMap::new());
// 创建多个线程,每个线程向 DashMap 插入一个元素
let mut handles = vec![];
for i in 0..10 {
// 克隆 DashMap
let map = Arc::clone(&map);
let handle = thread::spawn(move || {
map.insert(i, i * 10);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
// 遍历所有元素
for entry in map.iter() {
println!("{} -> {}", entry.key(), entry.value());
}
}
3.3 dashset
DashSet 是 DashMap 提供的一个并发集合实现(类似于 HashSet),其内部本质是 DashMap<T, ()>,即值部分固定为 unit type (),只关心元素是否存在。
1)添加依赖
[dependencies]
dashmap = “6.1.0”
2)快速上手
use dashmap::DashSet;
fn main() {
let set = DashSet::new();
set.insert("apple");
set.insert("banana");
println!("contains apple? {}", set.contains("apple"));
println!("len: {}", set.len());
set.remove("banana");
for item in set.iter() {
//item是RefMulti<'_, T> 是 DashMap 为了分片并发而引入的引用包装类型,用于在迭代过程中保持读锁。
//它没有实现 Display 是为了避免误用或误打印其内部状态。
//应该通过 .key() 或解引用来访问其真实值。
println!("item: {}", *item);
println!("item: {}", item.key());
}
}
3)常用 API 列表
4)线程安全并发使用示例
use dashmap::DashSet;
use std::sync::Arc;
use std::thread;
fn main() {
let set = Arc::new(DashSet::new());
let mut handles = vec![];
for i in 0..10 {
let set = Arc::clone(&set);
let handle = thread::spawn(move || {
set.insert(i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
//查询set中的元素
for item in set.iter() {
println!("Set item: {}", item.key());
}
}
每个线程都可以安全插入,DashSet 内部使用分片加锁。
5)使用 iter() 注意事项
DashSet 的迭代器项是 RefMulti<T>
,可以 .deref() 成 &T:
for item in set.iter() {
println!("Item: {}", *item);
}
也可以使用 .key():
for item in set.iter() {
println!("Item: {}", item.key());
}
6)使用 retain() 进行条件保留
let set = DashSet::from_iter(0..10);
set.retain(|v| *v % 2 == 0); // 保留偶数
assert_eq!(set.len(), 5);
7)常见问题
insert() 返回的是 bool
true 表示新增,false 表示重复。DashSet<T>
必须要求 T: Eq + Hash + Send + Sync
可用于多线程,元素类型需要实现相应 trait。为什么不用
Mutex<HashSet>
?
性能问题。DashSet 使用多分片锁,允许高并发访问;Mutex<HashSet>
只允许一次一个线程访问。
4、arc-swap
arc-swap 是一个 Rust 库,提供了基于 Arc 和 Atomic 的数据结构,用于在多线程之间原子地交换数据。
它的设计目的是提供一种高效的方式来实现线程间的共享数据更新,避免锁的开销。
你可以把它看成 Atomic<Arc<T>>
或者 RwLock<Arc<T>>
。在许多情况下, 可能需要一些数据结构, 这些数据结构经常被读取而很少更新。
一些例子可能是服务的配置, 路由表, 每几分钟更新一次的某些数据的快照等。
在所有这些情况下, 需要: - 快速、频繁并从多个线程并发读取数据结构的当前值。
- 在更长的时间内使用数据结构的相同版本 —— 查询应该由数据的一致版本回答, 数据包应该由旧版本或新版本的路由表路由, 而不是由组合路由。
- 在不中断处理的情况下执行更新。
第一个想法是使用 RwLock<T>
并在整个处理时间内保持读锁。但是更新会暂停所有处理直到完成。
更好的选择是使用 RwLock<Arc<T>>
。然后可以获取锁, 克隆 Arc 并解锁。
这会受到 CPU 级别的争用 (锁和 Arc 的引用计数) 的影响, 从而相对较慢。根据实现的不同, 稳定的 reader 流入可能会阻塞更新任意长的时间。
可以使用 ArcSwap 替代, 它解决了上述问题, 在竞争和非竞争场景下, 性能特征都优于RwLock。
arc-swap 是 Rust 社区中的一个高性能、线程安全的共享数据更新库,专为多线程场景中读多写少的场合设计。
它解决了在多线程中频繁读取共享数据时的性能瓶颈,使用 Arc 实现数据共享,用 atomic 实现高效的无锁读取,并提供了安全的“写时复制(copy-on-write)”更新机制。
4.1 主要特性
4.2 适用场景
适用于:
多线程读多写少的场景
全局配置共享
高性能读取需求
原子更新逻辑,如:AB测试配置切换、策略下发、只读缓存等
4.3 基本用法示例
4.3.1 添加依赖
# Cargo.toml
[dependencies]
arc-swap = "1.7.1"
4.3.2 基础示例
use arc_swap::ArcSwap;
use std::sync::Arc;
use std::thread;
fn main() {
// 初始化配置
let config = Arc::new("v1".to_string());
// 创建 ArcSwap
let shared = Arc::new(ArcSwap::from(config));
// 多线程读取
let mut handles = vec![];
for _ in 0..5 {
let shared = Arc::clone(&shared);
let handle = thread::spawn(move || {
let val = shared.load();
println!("线程读到配置: {}", val);
});
handles.push(handle);
}
// 写线程:更新配置
{
let new_config = Arc::new("v2".to_string());
shared.store(new_config);
}
for handle in handles {
handle.join().unwrap();
}
println!("主线程最终配置: {}", shared.load());
}
4.4 重要类型与方法详解
4.4.1 ArcSwap<T>
主类型,封装一个 Arc<T>
,提供原子性读写。
let arc_swap = ArcSwap::from(Arc::new(“data”.to_string()));
4.4.2 load() -> Arc<T>
获取当前 Arc<T>
的 克隆副本(高效原子加载)。
let arc = arc_swap.load(); // 返回 Arc<T>
4.4.3 store(new: Arc<T>
)
原子性替换内部的 Arc<T>
,旧值自动释放(可能延迟)。
arc_swap.store(Arc::new(“new_data”.to_string()));
4.4.4 swap(new: Arc<T>
) -> Arc<T>
原子替换并返回旧值。
let old = arc_swap.swap(Arc::new(“newer_data”.to_string()));
4.4.5 compare_and_swap
提供乐观锁式的 compare-and-swap:
let old = arc_swap.load();
let new = Arc::new(“maybe update”.to_string());
let result = arc_swap.compare_and_swap(&old, new);
如果旧值匹配就更新,返回旧值。
4.5 进阶用法
4.5.1 使用 lease 避免重复克隆
let lease = arc_swap.load(); // 获取 Arc<T>
let val: &str = &lease; // 可以解引用使用
或者使用 lease() 直接获得引用(不产生 Arc 拷贝):
let lease = arc_swap.lease();
println!("配置值: {}", **lease);
4.5.2 使用 rc::Weak 做缓存控制
可以搭配 Arc::downgrade 实现缓存的生命周期控制,减少不必要内存持有。
4.6 和标准库对比
4.7 实战案例
4.7.1 热更新配置中心
use arc_swap::ArcSwap;
use std::sync::Arc;
#[allow(dead_code)]
#[derive(Debug)]
struct Config {
log_level: String,
cache_size: usize,
}
fn main() {
let config = Arc::new(Config {
log_level: "INFO".to_string(),
cache_size: 1024,
});
let global_config = Arc::new(ArcSwap::from(config));
// 更新配置
{
let new_config = Arc::new(Config {
log_level: "DEBUG".to_string(),
cache_size: 2048,
});
global_config.store(new_config);
}
// 在其他线程读取配置
let config_read = global_config.load();
println!("当前配置: {:?}", config_read);
}