【python知识】多进程专题(1)

发布于:2022-12-13 ⋅ 阅读:(271) ⋅ 点赞:(0)

一、提要

        multiprocessing 是一个使用类似于 threading 模块的 API 支持生成进程的包。         multiprocessing 包提供本地和远程并发,通过使用子进程而不是线程来有效地避开全局解释器锁。因此,多处理模块允许程序员充分利用给定机器上的多个处理器。它可以在 Unix 和 Windows 上运行。

二、Pool对象

        多处理模块还引入了在线程模块中没有类似物的 API。一个典型的例子是 Pool 对象,它提供了一种方便的方法,可以跨多个输入值并行执行函数,跨进程分布输入数据(数据并行性)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。

2.1 这个使用 Pool 的数据并行的基本示例,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

output

[1, 4, 9]

2.2 大量启动子进程

        如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

执行结果如下:

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

        代码解读:

        对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

        请注意输出的结果,task 0123是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:

p = Pool(5)

        就可以同时跑5个进程。

        由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。

三、fork()操作

        仅支持基于Unix核心的系统

        程序每次执行时,操作系统都会创建一个新进程来运行程序指令。进程中可调用os.fork,要求操作系统新建一个子进程.[Windowsc系统中,os模块没有os.fork函数]。

  每个进程都有一个不重复的进程ID号。或称pid,它对进程进行标识。子进程与父进程完全相同,子进程从父进程继承了多个值的拷贝。如全局变量和环境变量。fork后,子进程接收返回值0,而父进程接收子进程的pid作为返回值。

3.1 一般操作

# -*-coding:utf-8-*-
import os
import time
print('before calling')

p = os.fork()  # 主进程,子进程同时向下执行

print('after calling')

if p == 0:
    print('执行子进程, pid={} ppid={} p={}'.format(os.getpid(), os.getppid(), p))
else:
    print('执行主进程, pid={} ppid={} p={}'.format(os.getpid(), os.getppid(), p))

[root@192 ~]# python fork.py
before calling
after calling
执行主进程, pid=1629 ppid=1572 p=1630
after calling
执行子进程, pid=1630 ppid=1629 p=0

        有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

        结论:调用os.fork()之后,主进程和子进程同时执行该行以下的代码,子进程中fork函数返回0,父进程中返回1630,即子进程的pid.

3.2 让主进程沉睡一秒钟

# -*-coding:utf-8-*-
import os
import time
print('before calling')

p = os.fork()  # 主进程,子进程同时向下执行

print('after calling')

if p == 0:
    print('执行子进程, pid={} ppid={} p={}'.format(os.getpid(), os.getppid(), p))
    time.sleep(1)
    print('执行子进程, pid={} ppid={} p={}'.format(os.getpid(), os.getppid(), p))
else:
    print('执行主进程, pid={} ppid={} p={}'.format(os.getpid(), os.getppid(), p))

[root@192 ~]# python fork.py 
before calling
after calling
执行主进程, pid=1648 ppid=1572 p=1649
after calling
执行子进程, pid=1649 ppid=1648 p=0
[root@192 ~]# 执行子进程, pid=1649 ppid=1 p=0 

 

四、多进程multiprocessing

        如果你打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?

        由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

  multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

执行结果如下:

Parent process 928.
Child process will start.
Run child process test (929)...
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

五、子进程

        很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

  subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

        下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

        运行结果:

$ nslookup www.python.org
Server:		192.168.19.4
Address:	192.168.19.4#53

Non-authoritative answer:
www.python.org	canonical name = python.map.fastly.net.
Name:	python.map.fastly.net
Address: 199.27.79.223

Exit code: 0

        如果子进程还需要输入,则可以通过communicate()方法输入:

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

        上面的代码相当于在命令行执行命令nslookup,然后手动输入:

set q=mx
python.org
exit

        运行结果如下:

$ nslookup
Server:		192.168.19.4
Address:	192.168.19.4#53

Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
mail.python.org	internet address = 82.94.164.166
mail.python.org	has AAAA address 2001:888:2000:d::a6


Exit code: 0

六、进程间通信

   Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

        我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果如下:

Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

        在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

七、小结

  • 在Unix/Linux下,可以使用fork()调用实现多进程。
  • 要实现跨平台的多进程,可以使用multiprocessing模块。
  • 进程间通信是通过QueuePipes等实现的。

参考文章

https://docs.python.org/3/library/multiprocessing.html

多进程 - 廖雪峰的官方网站


网站公告

今日签到

点亮在社区的每一天
去签到