在进行大规模数据处理或需要并行化的任务时,Python 提供了非常强大的多进程支持。multiprocessing
模块可以帮助我们在多核 CPU 上并行处理任务,打破全局解释器锁(GIL)的限制,充分利用系统资源。
1. multiprocessing.Process
Process
类是 multiprocessing
模块的核心,它用于创建和管理独立的进程,每个进程在其自己的内存空间中运行,互不干扰。
from multiprocessing import Process
import time
def worker(name):
print(f'Worker {name} started')
time.sleep(2)
print(f'Worker {name} finished')
if __name__ == '__main__':
p = Process(target=worker, args=('A',)) # 创建进程
p.start() # 启动进程
p.join() # 等待进程结束
2. multiprocessing.Pool
Pool
类提供了一种便捷的方式来管理多个进程。它通过池化进程的方式,避免频繁创建和销毁进程的开销。Pool
适合并发执行多个任务,并且可以通过 map
或 apply_async
等方法方便地处理并行任务。
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(4) as p:
result = p.map(square, [1, 2, 3, 4])
print(result) # [1, 4, 9, 16]
3. multiprocessing.Queue
Queue
是一种安全的进程间通信方式。它提供了先进先出的队列机制,允许进程之间发送和接收数据,非常适合需要在进程间传递数据的场景。
from multiprocessing import Process, Queue
def worker(q):
q.put('Hello from worker')
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
print(q.get()) # 从队列中获取数据
p.join()
4. multiprocessing.Pipe
Pipe
提供了双向通信管道,允许两个进程之间通过管道进行通信。它是另一种用于进程间数据传输的方式。
from multiprocessing import Process, Pipe
def worker(conn):
conn.send('Hello from worker')
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 接收来自子进程的数据
p.join()
5. multiprocessing.Lock
在并发编程中,多个进程可能会同时访问共享资源。Lock
是一个同步原语,用于保证一次只有一个进程可以访问某个共享资源,防止竞争条件。
from multiprocessing import Process, Lock
def worker(lock, num):
with lock:
print(f'Process {num} is working')
if __name__ == '__main__':
lock = Lock()
processes = [Process(target=worker, args=(lock, i)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
6. multiprocessing.Value
和 multiprocessing.Array
Value
和 Array
提供了一种在进程之间共享数据的方式。Value
用于共享一个变量,Array
用于共享数组,确保多个进程可以安全地访问和修改这些共享变量。
from multiprocessing import Process, Value, Array
def worker(val, arr):
val.value += 1
for i in range(len(arr)):
arr[i] += 1
if __name__ == '__main__':
val = Value('i', 0) # 'i'表示整数类型
arr = Array('i', [0, 1, 2, 3])
p = Process(target=worker, args=(val, arr))
p.start()
p.join()
print(val.value) # 1
print(arr[:]) # [1, 2, 3, 4]
7. multiprocessing.Manager
Manager
提供了一种更高级的进程间共享数据的方式。它允许我们在进程之间共享更复杂的 Python 对象,如列表、字典等。Manager
管理的对象支持进程间同步,因此操作是安全的。
from multiprocessing import Manager, Process
def worker(shared_dict, key, value):
shared_dict[key] = value
if __name__ == '__main__':
with Manager() as manager:
shared_dict = manager.dict()
processes = [Process(target=worker, args=(shared_dict, i, i*i)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(shared_dict) # {0: 0, 1: 1, 2: 4, 3: 9, 4: 16}
8. multiprocessing.Event
Event
是一种用于在进程间实现同步的工具,它允许一个或多个进程等待某个状态的改变。wait()
方法可以阻塞进程,直到事件被设置。
from multiprocessing import Process, Event
import time
def worker(event):
print('Waiting for event to be set...')
event.wait() # 等待事件被设置
print('Event is set, proceeding')
if __name__ == '__main__':
event = Event()
p = Process(target=worker, args=(event,))
p.start()
time.sleep(2)
print('Setting event')
event.set() # 设置事件,解除子进程的等待
p.join()
9. multiprocessing.Semaphore
Semaphore
是一种用于控制对共享资源并发访问的同步工具。它允许最多 n
个进程同时访问资源。
from multiprocessing import Process, Semaphore
import time
def worker(sem, num):
with sem:
print(f'Worker {num} is accessing the resource')
time.sleep(1)
if __name__ == '__main__':
sem = Semaphore(2) # 同时允许2个进程访问
processes = [Process(target=worker, args=(sem, i)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
总结
Python 的 multiprocessing
模块为我们提供了丰富的工具来处理多进程编程中的各种需求。从基础的 Process
类,到高级的进程间通信、同步机制,每种工具都有其适用的场景。以下是一些常见工具的总结:
Process
:创建和管理独立进程。Pool
:通过进程池并发执行多个任务。Queue
和Pipe
:实现进程间的安全通信。Lock
和Semaphore
:用于进程间的同步和共享资源控制。Value
、Array
和Manager
:在进程间共享数据。