什么时候需要用到 multiprocessing?

发布于:2025-07-14 ⋅ 阅读:(15) ⋅ 点赞:(0)


场景速查表:

场景 是否需要 multiprocessing 原因
CPU 密集型任务(加密、科学计算、图像处理) ✅ 必须 GIL 限制,多线程无法并行
I/O 密集型任务(文件、网络) ❌ 优先用多线程/异步 线程切换开销更小
需要多个 Python 解释器隔离 ✅ 必须 每个进程独立内存空间
需要共享大量复杂对象 ⚠️ 谨慎 进程间共享成本高于线程

结论:只要你要榨干多核 CPU,就绕不开 multiprocessing。否则可以先考虑线程或协程。


必须记忆的 7 组核心 API 与实战代码

下面把「面试时写得出、项目里用得着」的知识点浓缩成一篇 CSDN 风格的速查博客。每个点都给出最小可运行示例(MRE:Minimal Runnable Example),复制即可运行。


1. Process 基本用法(记住两段模板)

模板 A:无参函数

import multiprocessing, time

def worker():
    print('Worker start', multiprocessing.current_process().name)
    time.sleep(1)

if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, name='worker-A')
    p.start()
    p.join()

模板 B:带参函数(注意可 pickle)

import multiprocessing

def power(base, exp):
    print(f'{base}^{exp} = {base**exp}')

if __name__ == '__main__':
    for pair in [(2, 10), (3, 5)]:
        multiprocessing.Process(target=power, args=pair).start()

2. 守护进程 Daemon(后台任务心跳)

import multiprocessing, time

def heartbeat():
    while True:
        print('heartbeat', time.strftime('%H:%M:%S'))
        time.sleep(1)

if __name__ == '__main__':
    d = multiprocessing.Process(target=heartbeat, daemon=True)
    d.start()
    time.sleep(3)      # 主进程 3 秒后退出,守护进程随之被杀

3. 优雅退出:join、terminate、exitcode

import multiprocessing, time, signal

def slow():
    time.sleep(5)

if __name__ == '__main__':
    p = multiprocessing.Process(target=slow)
    p.start()
    time.sleep(1)
    p.terminate()          # 向子进程发 SIGTERM
    p.join()               # 回收资源
    print('exitcode:', p.exitcode)   # -15 代表被信号 15 杀死

4. 进程间通信:Queue(生产者-消费者模式)

import multiprocessing, random, time

def producer(q):
    for i in range(5):
        q.put(random.randint(1, 100))
        time.sleep(0.2)

def consumer(q):
    while True:
        item = q.get()
        if item is None:          # 毒丸
            break
        print('Consumed', item)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=producer, args=(q,))
    c = multiprocessing.Process(target=consumer, args=(q,))
    p.start(); c.start()
    p.join()
    q.put(None)                  # 发毒丸
    c.join()

5. 共享状态:Manager 与 Lock

import multiprocessing

def add_to_dict(d, key, lock):
    with lock:                # 防止竞态
        d[key] = key * key

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    shared_dict = mgr.dict()
    lock = multiprocessing.Lock()

    jobs = [multiprocessing.Process(target=add_to_dict, args=(shared_dict, i, lock))
            for i in range(10)]
    for j in jobs: j.start()
    for j in jobs: j.join()
    print(shared_dict)

6. 进程池 Pool(MapReduce 雏形)

import multiprocessing, os

def square(x):
    return x * x

if __name__ == '__main__':
    with multiprocessing.Pool(processes=os.cpu_count()) as pool:
        result = pool.map(square, range(10))
    print(result)     # [0, 1, 4, 9, ... 81]

7. 自定义 Process 子类(可重用组件)

import multiprocessing, time

class TimerProcess(multiprocessing.Process):
    def __init__(self, interval):
        super().__init__()
        self.interval = interval

    def run(self):
        for i in range(3):
            print(f'{self.name}: {i+1}/{3}')
            time.sleep(self.interval)

if __name__ == '__main__':
    TimerProcess(1).start()

一张思维导图总结

multiprocessing
├─ 基本对象
│  ├─ Process
│  ├─ Queue / JoinableQueue
│  ├─ Lock / Semaphore / Condition / Event
│  └─ Manager
├─ 高级抽象
│  ├─ Pool
│  └─ 自定义 Process 子类
└─ 生命周期
   ├─ start / join / terminate
   └─ daemon / exitcode

结语:记忆策略

  1. 先背 3 个模板(Process、Queue、Pool)。
  2. 再背 生命周期:start → join/terminate → exitcode。
  3. 其余 API 用的时候查表即可。

网站公告

今日签到

点亮在社区的每一天
去签到