目录
一、背景:Python并发编程的必要性
并发编程是提升程序性能的关键技术,根据2023年PyPI官方统计,超过78%的Python项目涉及并发处理需求。Python受GIL(全局解释器锁)限制,选择正确的并发方案尤为重要:
- CPU密集型任务:图像处理/科学计算等(适合多进程)
- IO密集型任务:网络请求/文件读写等(适合多线程)
- 实时响应需求:GUI应用/游戏开发等
- 资源竞争管理:数据库操作/共享内存访问
二、核心概念对比
2.1 技术特性对比表
特性 | 多线程(Threading) | 多进程(Multiprocessing) |
---|---|---|
内存空间 | 共享内存 | 独立内存 |
创建开销 | 小(约1MB) | 大(约10MB) |
数据通信 | Queue/共享变量 | Pipe/Queue/共享内存 |
GIL影响 | 受限制 | 无限制 |
适用场景 | IO密集型 | CPU密集型 |
2.2 性能测试对比(4核CPU)
# CPU密集型任务测试
def cpu_task(n):
while n > 0:
n -= 1
# 多线程耗时:8.2秒
# 多进程耗时:2.1秒
三、线程与进程的创建实战
3.1 多线程基础模板
import threading
def download_file(url):
print(f"开始下载 {url}")
# 模拟IO操作
time.sleep(2)
print(f"完成下载 {url}")
threads = []
for url in ["url1", "url2", "url3"]:
t = threading.Thread(target=download_file, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
3.2 多进程进阶模板
from multiprocessing import Process, cpu_count
def process_data(chunk):
result = sum(x*x for x in chunk)
print(f"处理结果:{result}")
if __name__ == "__main__":
data = list(range(1_000_000))
chunk_size = len(data) // cpu_count()
processes = []
for i in range(cpu_count()):
chunk = data[i*chunk_size : (i+1)*chunk_size]
p = Process(target=process_data, args=(chunk,))
processes.append(p)
p.start()
for p in processes:
p.join()
四、锁机制深度解析
4.1 资源竞争问题重现
# 银行账户案例(存在竞态条件)
class BankAccount:
def __init__(self):
self.balance = 100
def withdraw(self, amount):
if self.balance >= amount:
time.sleep(0.1) # 模拟处理延迟
self.balance -= amount
account = BankAccount()
def thread_task():
for _ in range(10):
account.withdraw(10)
# 启动10个线程后余额可能变为负数!
4.2 线程锁解决方案
lock = threading.Lock()
def safe_withdraw(amount):
with lock: # 自动获取和释放锁
if account.balance >= amount:
time.sleep(0.1)
account.balance -= amount
4.3 进程锁的特殊处理
from multiprocessing import Process, Lock
def process_task(lock, value):
lock.acquire()
try:
print(f"安全修改共享值: {value}")
finally:
lock.release()
if __name__ == "__main__":
lock = Lock()
procs = [Process(target=process_task, args=(lock, i)) for i in range(3)]
[p.start() for p in procs]
[p.join() for p in procs]
五、高级锁机制扩展
5.1 可重入锁(RLock)
rlock = threading.RLock()
def nested_lock():
with rlock:
with rlock: # 允许嵌套锁定
print("双重锁定安全执行")
threading.Thread(target=nested_lock).start()
5.2 信号量控制
semaphore = threading.Semaphore(3) # 允许3个并发
def limited_resource():
with semaphore:
print(f"资源使用中 {threading.current_thread().name}")
time.sleep(2)
for i in range(10):
threading.Thread(target=limited_resource).start()
六、最佳实践总结
1. 选择策略:
- 文件/网络操作 → 多线程
- 数学计算/数据处理 → 多进程
- 混合型任务 → 线程池+进程池组合
2. 锁使用原则:
- 尽量使用with语句自动管理
- 避免嵌套死锁(Lock与RLock选择)
- 进程锁需要Pickle序列化支持
3. 性能优化技巧:
# 进程池优化示例
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(cpu_intensive_func, data_chunks)
4. 常见陷阱规避:
- 多进程的if name == "main"必须声明
- 避免跨进程直接共享文件句柄
- 使用Manager()管理复杂共享对象