multiprocessing
是 Python 标准库中的一个模块,用于实现多进程并行计算,可以在多核 CPU 上显著提升程序性能,尤其适用于 CPU 密集型任务。Python 的多线程由于 GIL(全局解释器锁)限制,在进行 CPU 密集型任务时无法真正实现并行。而 multiprocessing
模块通过创建多个子进程,每个子进程拥有独立的 Python 解释器,因此可以实现真正的并行运行。
常用组件一览
组件 | 用途 |
---|---|
Process |
创建单个进程 |
Pool |
创建进程池,适合大量任务并行处理 |
Queue |
进程间通信(FIFO) |
Pipe |
进程间双向通信 |
Manager |
管理共享数据结构 |
Lock / RLock |
进程同步,避免资源竞争 |
常见用法
1. 使用 Process
创建子进程
from multiprocessing import Process
def worker(name):
print(f"Hello from {name}")
if __name__ == "__main__":
p = Process(target=worker, args=("Process-1",))
p.start()
p.join()
调用 start() 方法启动子进程,调用 join() 方法等待子进程结束。
2. 使用 Pool
创建进程池(适合大规模任务)
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == "__main__":
with Pool(processes=4) as pool:
results = pool.map(square, [1, 2, 3, 4, 5])
print(results) # [1, 4, 9, 16, 25]
Pool.map()
类似于内置的 map()
,但是会并行运行。
3. imap
/ imap_unordered
(流式并行)
for result in pool.imap_unordered(square, range(10)):
print(result)
imap()
保持顺序imap_unordered()
不保持顺序(更快)
4. 使用 Queue
进行进程通信
from multiprocessing import Process, Queue
def producer(q):
q.put("Data from producer")
def consumer(q):
print(q.get())
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
5. 使用 Manager
实现共享变量
from multiprocessing import Manager, Process
def worker(shared_list):
shared_list.append("hello")
if __name__ == "__main__":
with Manager() as manager:
lst = manager.list()
p = Process(target=worker, args=(lst,))
p.start()
p.join()
print(lst) # ['hello']
注意事项
多进程必须加
if __name__ == "__main__":
,防止无限递归。进程间不共享内存空间,需要通过
Queue
、Pipe
或Manager
传递数据。不适合 I/O 密集型任务,I/O 密集可考虑
asyncio
或threading
。
适用场景
大规模数据处理:如批量解析结构文件、图像处理、日志分析。
训练模型并行跑实验:多组参数同时启动训练进程。
Web 抓取中的页面解析等 CPU 操作。