【Rust并发集合】如何在多线程中并发安全地使用集合

发布于:2025-07-30 ⋅ 阅读:(27) ⋅ 点赞:(0)

在这里插入图片描述

✨✨ 欢迎大家来到景天科技苑✨✨

🎈🎈 养成好习惯,先赞后看哦~🎈🎈

🏆 作者简介:景天科技苑
🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。
🏆《博客》:Rust开发,Python全栈,Golang开发,云原生开发,PyQt5和Tkinter桌面开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi,flask等框架,云原生K8S,linux,shell脚本等实操经验,网站搭建,数据库等分享。

所属的专栏:Rust高性能并发编程
景天的主页:景天科技苑

在这里插入图片描述

Rust并发集合

集合类型是我们编程中常用的数据类型,Rust 中提供了一些集合类型,比如Vec<T>HashMap<K, V>HashSet<T>VecDeque<T>LinkedList<T>BTreeMap<K,V>BTreeSet<T> 等,
它们的特点如下:
• Vec - 这是一种可变大小的数组, 允许在头部或尾部高效地添加和删除元素。它类似于 C++ 的 vector 或 Java 的 ArrayList。
HashMap<K,V> - 这是一个哈希映射, 允许通过键快速查找值。它类似于 C++ 的unordered_map 或 Java 的 HashMap。
• HashSet - 这是一个基于哈希的集, 可以快速判断一个值是否在集合中。它类似于C++ 的 unordered_set 或 Java 的 HashSet。
• VecDeque - 这是一个双端队列, 允许在头部或尾部高效地添加和删除元素。它类似于 C++ 的 deque 或 Java 的 ArrayDeque。
• LinkedList - 这是一个链表数据结构, 允许在头部或尾部快速添加和删除元素。
BTreeMap<K,V> - 这是一个有序的映射, 可以通过键快速查找, 同时保持元素的排序。它使用 B 树作为底层数据结构。
• BTreeSet - 这是一个有序的集合, 元素会自动排序。它使用 B 树作为底层数据结构。

不幸的是这些类型都不是线程安全的,没有办法在线程中共享使用,
有幸的是,我们可以使用前面介绍的并发原语,对这些类型进行包装,使之成为线程安全的。

1、线程安全的 Vec

要实现线程安全的 Vec,可以使用 Arc(原子引用计数)和 Mutex(互斥锁)的组合。
Arc 允许多个线程共享拥有相同数据的所有权,而 Mutex 用于在访问数据时进行同步,确保只有一个线程能够修改数据。
以下是一个简单的例子,演示如何创建线程安全的 Vec:

use std::sync::{ Arc, Mutex };
use std::thread;
fn main() {
    // 使用Arc和Mutex包装Vec
    let shared_vec = Arc::new(Mutex::new(Vec::new()));
    // 创建多个线程,每个线程向Vec中添加一个元素
    let mut handles = vec![];
    for i in 0..5 {
        // 克隆shared_vec,以便在线程中使用
        let shared_vec = Arc::clone(&shared_vec);
        let handle = thread::spawn(move || {
            // 获取锁
            let mut vec = shared_vec.lock().unwrap();
            // 修改Vec
            vec.push(i);
        });
        handles.push(handle);
    }
    // 等待所有线程结束
    for handle in handles {
        handle.join().unwrap();
    }
    // 打印最终的Vec
    let final_vec = shared_vec.lock().unwrap();
    println!("Final Vec: {:?}", *final_vec);
}

在这个例子中,shared_vec 是一个 Mutex 包装的 Arc,使得多个线程能够共享对 Vec的所有权。
每个线程在修改 Vec 之前需要先获取锁,确保同一时刻只有一个线程能够修改数据。
在这里插入图片描述

2、线程安全的 HashMap

要实现线程安全的 HashMap,可以使用 Arc(原子引用计数)和 Mutex(互斥锁)的组合,或者使用 RwLock(读写锁)来提供更细粒度的并发控制。
以下是使用 Arc 和Mutex 的简单示例:

use std::collections::HashMap;
use std::sync::{ Arc, Mutex };
use std::thread;
fn main() {
    // 使用Arc和Mutex包装HashMap
    let shared_map = Arc::new(Mutex::new(HashMap::new()));
    // 创建多个线程,每个线程向HashMap中添加键值对
    let mut handles = vec![];
    for i in 0..5 {
        let shared_map = Arc::clone(&shared_map);
        let handle = thread::spawn(move || {
            // 获取锁
            let mut map = shared_map.lock().unwrap();
            // 修改HashMap
            map.insert(i, i * i);
        });
        handles.push(handle);
    }
    // 等待所有线程结束
    for handle in handles {
        handle.join().unwrap();
    }
    // 获取HashMap,并输出结果
    let final_map = shared_map.lock().unwrap();
    println!("Final HashMap: {:?}", *final_map);
}

在这里插入图片描述

你发觉处理的套路都是一样的,就是使用 Arc<Mutex<T>> 实现。
使用 Arc<Mutex<T>>组合是一种常见的方式来实现线程安全的集合类型,但不是唯一的选择。
这种组合的基本思想是使用 Arc(原子引用计数)来实现多线程间的所有权共享,而 Mutex 则提供了互斥锁,确保在任何时刻只有一个线程能够修改数据。
后面的几种集合类型都可以这么去实现。
有些场景下你可能使用 Arc<RwLock<T>> 更合适,允许多个线程同时读取数据,但只有一个线程能够写入数据。
适用于读操作频繁、写操作较少的场景。

3、dashmap

3.1 dashmap介绍

dashmap 是极快的 Rust 并发 map 实现。
DashMap 是一个 Rust 中并发关联 array/hashmap 的实现。
DashMap 试图实现一个类似于 std::collections::HashMap 的简单易用的 API, 并做了一些细微的改变来处理并发。
DashMap 的目标是变得非常简单易用, 并可直接替代 RwLock<HashMap<K, V>>。
为实现这些目标, 所有方法采用 &self 而不是修改方法采用 &mut self。
这允许您将一个DashMap 放入一个 Arc 中, 并在线程之间共享它, 同时仍然能够修改它。
DashMap 非常注重性能, 并旨在尽可能快。

dashmap 是 Rust 中一个高性能并发哈希表,类似于 std::collections::HashMap,但支持多线程并发读写,内部采用了分段锁(sharded lock)机制以提高并发性能。

特点:
多线程安全
自动分片锁(sharding),每个 shard 是一个 RwLock<HashMap<…>>
支持常见的 HashMap 操作:插入、查找、删除、遍历等

3.2 基本用法

  1. 创建 DashMap
use dashmap::DashMap;

fn main() {
    let map: DashMap<String, i32> = DashMap::new();
}
  1. 插入元素
map.insert("apple".to_string(), 3);
map.insert("banana".into(), 5);
  1. 获取元素(只读)
if let Some(value) = map.get("apple") {
    println!("apple = {}", *value);
}

返回的是 dashmap::mapref:1️⃣:Ref<K, V>,实现了 Deref。

  1. 修改元素(写)
if let Some(mut v) = map.get_mut("banana") {
    *v += 1;
}
  1. 删除元素
map.remove("apple"); // 返回 Option<(K, V)>

3.3 并发使用

线程安全示例:

use dashmap::DashMap;
use std::sync::Arc;
use std::thread;

fn main() {
    // 创建一个 DashMap
    let map = Arc::new(DashMap::new());

    // 创建多个线程,每个线程向 DashMap 插入一个元素
    let mut handles = vec![];

    for i in 0..10 {
        // 克隆 DashMap
        let map = Arc::clone(&map);
        let handle = thread::spawn(move || {
            map.insert(i, i * 10);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    // 遍历所有元素
    for entry in map.iter() {
        println!("{} -> {}", entry.key(), entry.value());
    }
}

在这里插入图片描述

3.3 dashset

DashSet 是 DashMap 提供的一个并发集合实现(类似于 HashSet),其内部本质是 DashMap<T, ()>,即值部分固定为 unit type (),只关心元素是否存在。
1)添加依赖
[dependencies]
dashmap = “6.1.0”

2)快速上手

use dashmap::DashSet;

fn main() {
    let set = DashSet::new();

    set.insert("apple");
    set.insert("banana");

    println!("contains apple? {}", set.contains("apple"));
    println!("len: {}", set.len());

    set.remove("banana");

    for item in set.iter() {
        //item是RefMulti<'_, T> 是 DashMap 为了分片并发而引入的引用包装类型,用于在迭代过程中保持读锁。
        //它没有实现 Display 是为了避免误用或误打印其内部状态。
        //应该通过 .key() 或解引用来访问其真实值。
        println!("item: {}", *item);
        println!("item: {}", item.key());
    }
}

在这里插入图片描述

3)常用 API 列表
在这里插入图片描述

4)线程安全并发使用示例

use dashmap::DashSet;
use std::sync::Arc;
use std::thread;

fn main() {
    let set = Arc::new(DashSet::new());

    let mut handles = vec![];

    for i in 0..10 {
        let set = Arc::clone(&set);
        let handle = thread::spawn(move || {
            set.insert(i);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    //查询set中的元素
    for item in set.iter() {
        println!("Set item: {}", item.key());
    }
}

每个线程都可以安全插入,DashSet 内部使用分片加锁。
在这里插入图片描述

5)使用 iter() 注意事项
DashSet 的迭代器项是 RefMulti<T>,可以 .deref() 成 &T:

for item in set.iter() {
    println!("Item: {}", *item);
}

也可以使用 .key():

for item in set.iter() {
    println!("Item: {}", item.key());
}

6)使用 retain() 进行条件保留

let set = DashSet::from_iter(0..10);

set.retain(|v| *v % 2 == 0); // 保留偶数

assert_eq!(set.len(), 5);

7)常见问题

  1. insert() 返回的是 bool
    true 表示新增,false 表示重复。

  2. DashSet<T> 必须要求 T: Eq + Hash + Send + Sync
    可用于多线程,元素类型需要实现相应 trait。

  3. 为什么不用 Mutex<HashSet>
    性能问题。DashSet 使用多分片锁,允许高并发访问;Mutex<HashSet> 只允许一次一个线程访问。

4、arc-swap

arc-swap 是一个 Rust 库,提供了基于 Arc 和 Atomic 的数据结构,用于在多线程之间原子地交换数据。
它的设计目的是提供一种高效的方式来实现线程间的共享数据更新,避免锁的开销。
你可以把它看成 Atomic<Arc<T>> 或者 RwLock<Arc<T>>。在许多情况下, 可能需要一些数据结构, 这些数据结构经常被读取而很少更新。
一些例子可能是服务的配置, 路由表, 每几分钟更新一次的某些数据的快照等。

在所有这些情况下, 需要: - 快速、频繁并从多个线程并发读取数据结构的当前值。

  • 在更长的时间内使用数据结构的相同版本 —— 查询应该由数据的一致版本回答, 数据包应该由旧版本或新版本的路由表路由, 而不是由组合路由。
  • 在不中断处理的情况下执行更新。

第一个想法是使用 RwLock<T> 并在整个处理时间内保持读锁。但是更新会暂停所有处理直到完成。
更好的选择是使用 RwLock<Arc<T>>。然后可以获取锁, 克隆 Arc 并解锁。
这会受到 CPU 级别的争用 (锁和 Arc 的引用计数) 的影响, 从而相对较慢。根据实现的不同, 稳定的 reader 流入可能会阻塞更新任意长的时间。
可以使用 ArcSwap 替代, 它解决了上述问题, 在竞争和非竞争场景下, 性能特征都优于RwLock。

arc-swap 是 Rust 社区中的一个高性能、线程安全的共享数据更新库,专为多线程场景中读多写少的场合设计。
它解决了在多线程中频繁读取共享数据时的性能瓶颈,使用 Arc 实现数据共享,用 atomic 实现高效的无锁读取,并提供了安全的“写时复制(copy-on-write)”更新机制。

4.1 主要特性

在这里插入图片描述

4.2 适用场景

适用于:
多线程读多写少的场景
全局配置共享
高性能读取需求
原子更新逻辑,如:AB测试配置切换、策略下发、只读缓存等

4.3 基本用法示例

4.3.1 添加依赖

# Cargo.toml
[dependencies]
arc-swap = "1.7.1"

4.3.2 基础示例

use arc_swap::ArcSwap;
use std::sync::Arc;
use std::thread;

fn main() {
    // 初始化配置
    let config = Arc::new("v1".to_string());
    // 创建 ArcSwap
    let shared = Arc::new(ArcSwap::from(config));

    // 多线程读取
    let mut handles = vec![];
    for _ in 0..5 {
        let shared = Arc::clone(&shared);
        let handle = thread::spawn(move || {
            let val = shared.load();
            println!("线程读到配置: {}", val);
        });
        handles.push(handle);
    }

    // 写线程:更新配置
    {
        let new_config = Arc::new("v2".to_string());
        shared.store(new_config);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("主线程最终配置: {}", shared.load());
}

在这里插入图片描述

4.4 重要类型与方法详解

4.4.1 ArcSwap<T>
主类型,封装一个 Arc<T>,提供原子性读写。
let arc_swap = ArcSwap::from(Arc::new(“data”.to_string()));

4.4.2 load() -> Arc<T>
获取当前 Arc<T> 的 克隆副本(高效原子加载)。
let arc = arc_swap.load(); // 返回 Arc<T>

4.4.3 store(new: Arc<T>)
原子性替换内部的 Arc<T>,旧值自动释放(可能延迟)。
arc_swap.store(Arc::new(“new_data”.to_string()));

4.4.4 swap(new: Arc<T>) -> Arc<T>
原子替换并返回旧值。
let old = arc_swap.swap(Arc::new(“newer_data”.to_string()));

4.4.5 compare_and_swap
提供乐观锁式的 compare-and-swap:
let old = arc_swap.load();
let new = Arc::new(“maybe update”.to_string());
let result = arc_swap.compare_and_swap(&old, new);
如果旧值匹配就更新,返回旧值。

4.5 进阶用法

4.5.1 使用 lease 避免重复克隆

let lease = arc_swap.load(); // 获取 Arc<T>
let val: &str = &lease; // 可以解引用使用

或者使用 lease() 直接获得引用(不产生 Arc 拷贝):

let lease = arc_swap.lease();
println!("配置值: {}", **lease);

4.5.2 使用 rc::Weak 做缓存控制
可以搭配 Arc::downgrade 实现缓存的生命周期控制,减少不必要内存持有。

4.6 和标准库对比

在这里插入图片描述

4.7 实战案例

4.7.1 热更新配置中心

use arc_swap::ArcSwap;
use std::sync::Arc;

#[allow(dead_code)]
#[derive(Debug)]
struct Config {
    log_level: String,
    cache_size: usize,
}

fn main() {
    let config = Arc::new(Config {
        log_level: "INFO".to_string(),
        cache_size: 1024,
    });
    let global_config = Arc::new(ArcSwap::from(config));

    // 更新配置
    {
        let new_config = Arc::new(Config {
            log_level: "DEBUG".to_string(),
            cache_size: 2048,
        });
        global_config.store(new_config);
    }

    // 在其他线程读取配置
    let config_read = global_config.load();
    println!("当前配置: {:?}", config_read);
}

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到