目录
1.2. 不同语言中的 Copy-on-Write 实现示例
1.3. Copy-on-Write 在 Java 中的典型应用
2.4. Python中threading.local()示例
1. Copy-on-Write(CoW)模式
项目 |
描述 |
本质 |
写时才复制(延迟+共享) |
优点 |
读性能高、线程安全 |
缺点 |
占内存、写性能低 |
应用领域 |
Java并发容器、操作系统fork、函数式编程、Docker、Git 等 |
1.1. Copy-on-Write的概念
Copy-On-Write(COW)的核心理念是牺牲一致性来换取并发性能。这意味着:
COW 的策略 |
带来的效果 |
写时复制 |
不阻塞读线程 |
读到旧数据 |
但读线程看到的是一致、稳定的历史快照 |
Copy-on-Write的定义:
- Copy-on-Write(COW,写时复制)是一种 延迟复制(Lazy Copy)策略。
- 核心思想:多个调用者共享同一份数据对象,直到其中一个调用者尝试修改数据时,才真正复制一份副本,修改操作只对副本进行,原始对象保持不变。
Copy-on-Write 的优势:
- 读操作无锁,效率高,非常适合 读多写少 的并发场景。
- 节省资源:只有在必要时才复制数据,避免无效复制。
实现机制:
- 共享不可变对象。
- 写操作触发复制。
- 更新后的数据指向新对象,旧对象仍可被其他线程使用。
1.2. 不同语言中的 Copy-on-Write 实现示例
1. Python 示例
class CowList:
def __init__(self, data):
self._data = data
self._copied = False
def write(self, index, value):
if not self._copied:
self._data = self._data[:]
self._copied = True
self._data[index] = value
def read(self, index):
return self._data[index]
2. Java 示例
List<String> list = new CopyOnWriteArrayList<>();
list.add("a");
list.add("b");
for (String s : list) {
System.out.println(s);
}
3. Go 示例(伪实现)
type CowSlice struct {
data []int
copied bool
}
func (c *CowSlice) Write(index int, val int) {
if !c.copied {
newData := make([]int, len(c.data))
copy(newData, c.data)
c.data = newData
c.copied = true
}
c.data[index] = val
}
1.3. Copy-on-Write 在 Java 中的典型应用
1. Java 中的 CopyOnWrite 容器
CopyOnWriteArrayList
CopyOnWriteArraySet
特点:
- 读操作无锁
- 写操作复制整个数组并替换旧数组
- 适用于 读多写少、弱一致性要求 的并发场景。
场景举例:
- 路由表缓存(读多写少):
ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> rt = new ConcurrentHashMap<>();
2. Java 中字符串的 replace 方法
String.replace()
实际是通过创建新字符串(复制)来实现修改的,原字符串不可变。
3. Java 中的包装类(如 Integer、Long)
Long.valueOf(long)
使用了享元 + 不变性策略(缓存 -128 到 127 的对象)。- 实质上也是一种 Copy-on-Write 策略的优化体现,避免重复创建。
Copy-on-Write 的使用建议
✅ 适用场景
- 读多写少
- 对一致性要求不高
- 要求高读性能
❌ 不适用场景
- 写操作频繁
- 数据结构本身较大
2. 线程本地存储模式(ThreadLocal)
- ThreadLocal 不是“线程安全工具”,而是“避免共享”的工具。
- 通过避免共享,天然就消除了竞争与并发问题,是并发编程中非常有用的一种模式。但使用时必须谨慎管理生命周期,尤其在线程池中,务必记得清理!
2.1. ThreadLocal的概念
ThreadLocal的定义:
ThreadLocal
是 Java 提供的一种 线程本地存储机制,用于让每个线程拥有一份自己的变量副本。- 它的核心思想是:避免多线程共享同一资源,从而避免并发冲突。
- 类似“每人发一个球”,避免争抢——这就是“没有共享,就没有伤害”。
ThreadLocal关键词记忆:
- 线程隔离:每个线程独立拥有自己的变量副本。
- 数据独享:避免竞争,提高线程安全性。
- 用于存储线程私有数据,如:用户登录信息、数据库连接、格式化器等。
ThreadLocal 的使用方法
static class ThreadId {
static final AtomicLong nextId = new AtomicLong(0);
static final ThreadLocal<Long> tl = ThreadLocal.withInitial(
() -> nextId.getAndIncrement()
);
static long get() {
return tl.get(); // 返回当前线程自己的 Id
}
}
- 每个线程调用
get()
获取的值是自己的唯一 ID。
2.2. ThreadLocal 的工作原理(结构与设计)
设计核心:
- 每个线程
Thread
对象中维护了一个ThreadLocalMap
(即线程本地变量存储容器)。 ThreadLocalMap
的 Key 是 ThreadLocal 实例本身,Value 是该线程保存的变量值。
为什么不让 ThreadLocal
持有 Map?
- 若
ThreadLocal
持有 Map,会强引用线程对象,线程无法被 GC,容易 内存泄漏。 - Java 实现中,
Thread
持有 Map,Map 中对ThreadLocal
是弱引用,更合理、安全。
2.3. ThreadLocal 与内存泄漏
内存泄漏的风险来源:
ThreadLocalMap
中的 Key 是ThreadLocal
的弱引用,但 Value 是强引用。- 若
ThreadLocal
被回收,而你没有手动调用remove()
,那Value
将无法被释放 ➜ 内存泄漏!
尤其在线程池中风险更高:
- 线程池中的线程不会被销毁,因此
ThreadLocalMap
会长期存在,积累大量无法释放的值。
正确做法(记住这个模板):
ThreadLocal<MyObject> tl = new ThreadLocal<>();
executor.execute(() -> {
tl.set(new MyObject());
try {
// 使用 tl.get() 进行业务处理
} finally {
tl.remove(); // 防止内存泄漏
}
});
- 总结一句话:记得 remove(),ThreadLocal 才安全!
2.4. Python中threading.local()示例
import threading
import time
# 创建一个 ThreadLocal 对象
thread_local_data = threading.local()
def worker(name):
# 为当前线程设置 thread_local 的属性
thread_local_data.value = f"我是线程{name}的数据"
time.sleep(0.1)
print(f"线程 {name} 读取 thread_local_data.value = {thread_local_data.value}")
# 创建两个线程
t1 = threading.Thread(target=worker, args=("T1",))
t2 = threading.Thread(target=worker, args=("T2",))
# 启动线程
t1.start()
t2.start()
# 等待两个线程执行完毕
t1.join()
t2.join()
输出示例(可能每次顺序不同):
线程 T1 读取 thread_local_data.value = 我是线程T1的数据
线程 T2 读取 thread_local_data.value = 我是线程T2的数据
说明:
threading.local()
创建了一个 线程局部变量容器。- 每个线程对
thread_local_data.value
的设置和读取 互不干扰。 - 本质上,Python 为每个线程创建了
thread_local_data
的独立副本空间。
3. 生成者-消费者模式
3.1. 核心思想和模型优点
核心思想
生产者-消费者模式类比于工厂流水线:
- 生产者线程负责生产任务(数据、日志、请求等),放入任务队列;
- 消费者线程从任务队列中取任务并处理。
这种设计主要用于:
- 异步处理
- 解耦模块间关系
- 平衡不同速率的处理单元
模式优点
1. 解耦生产者和消费者
- 两者之间没有直接依赖,仅通过队列通信
- 模块职责清晰,方便扩展、测试、维护
2. 支持异步
- 生产者不阻塞地将任务放入队列
- 消费者异步处理,可并行提高吞吐量
3. 平衡速度差异
- 队列作为“缓冲区”,避免快慢不一致导致资源浪费或阻塞
- 可根据实际需求调整线程数量
3.2. 应用实例
批量执行任务(典型场景:批量写数据库)
- 目的:将多个小任务合并成一次批量执行,提升效率
- 应用示例:一次性批量
INSERT
1000 条数据,比逐条插入快得多
1. Java实现
// 任务队列
BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);
// 启动5个消费者线程
void start() {
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
es.execute(() -> {
while (true) {
List<Task> ts = pollTasks(); // 批量获取任务
execTasks(ts); // 执行任务
}
});
}
}
// 获取任务(至少一个阻塞,后续非阻塞)
List<Task> pollTasks() throws InterruptedException {
List<Task> ts = new LinkedList<>();
Task t = bq.take(); // 阻塞等待一个任务
while (t != null) {
ts.add(t);
t = bq.poll(); // 非阻塞获取后续任务
}
return ts;
}
2. Python 实现(用 queue.Queue
+ threading
)
import threading
import queue
import time
import random
# 定义一个模拟任务类
class Task:
def __init__(self, name):
self.name = name
def run(self):
print(f"[{threading.current_thread().name}] 正在处理任务: {self.name}")
time.sleep(random.uniform(0.1, 0.5)) # 模拟任务耗时
# 创建阻塞队列(最多2000个任务)
task_queue = queue.Queue(maxsize=2000)
# 批量拉取任务:阻塞获取1个,再非阻塞拉多个
def poll_tasks():
tasks = []
try:
# 第一个任务是阻塞式获取
t = task_queue.get(block=True)
tasks.append(t)
# 后续非阻塞获取
while True:
t = task_queue.get(block=False)
tasks.append(t)
except queue.Empty:
pass # 没有更多任务就停止拉取
return tasks
# 消费者线程逻辑
def worker():
while True:
tasks = poll_tasks()
for task in tasks:
task.run()
# 启动 5 个消费者线程
def start_consumers():
for i in range(5):
t = threading.Thread(target=worker, name=f"Worker-{i+1}", daemon=True)
t.start()
# 模拟生产者:不断往队列添加任务
def start_producer():
def produce():
task_id = 1
while True:
task = Task(f"任务-{task_id}")
task_queue.put(task)
print(f"[生产者] 添加了 {task.name}")
task_id += 1
time.sleep(random.uniform(0.1, 0.3)) # 控制生产节奏
threading.Thread(target=produce, name="Producer", daemon=True).start()
# 启动程序
if __name__ == "__main__":
start_consumers()
start_producer()
while True:
time.sleep(1) # 主线程保持运行
当你运行这段代码,会看到类似如下输出:
[生产者] 添加了 任务-1
[生产者] 添加了 任务-2
[Worker-1] 正在处理任务: 任务-1
[Worker-1] 正在处理任务: 任务-2
[生产者] 添加了 任务-3
[Worker-3] 正在处理任务: 任务-3
...