在Python中,当我们面临选择使用线程(Thread)还是进程(Process)时,进程往往因其更高的稳定性和可扩展性而被优先考虑。特别是,进程能够跨越多台机器进行分布,而线程则受限于同一台机器的多个CPU核心。Python的multiprocessing
模块不仅支持多进程,其managers
子模块更是提供了将多进程分布到多台机器上的能力。
通过managers
模块,我们可以轻松编写分布式多进程程序,而无需深入了解网络通信的细节。以下是一个具体的例子,展示了如何将原本在同一台机器上运行的使用Queue
进行通信的多进程程序,扩展到两台机器上运行。
服务进程(task_master.py)
服务进程负责启动并注册Queue
到网络上,然后向其中写入任务。
python复制代码
import random, time, queue |
|
from multiprocessing.managers import BaseManager |
|
# 定义任务队列和结果队列 |
|
task_queue = queue.Queue() |
|
result_queue = queue.Queue() |
|
# 创建一个继承自BaseManager的QueueManager类 |
|
class QueueManager(BaseManager): |
|
pass |
|
# 将两个Queue注册到网络上 |
|
QueueManager.register('get_task_queue', callable=lambda: task_queue) |
|
QueueManager.register('get_result_queue', callable=lambda: result_queue) |
|
# 启动QueueManager,绑定端口5000,并设置验证码'abc' |
|
manager = QueueManager(address=('', 5000), authkey=b'abc') |
|
manager.start() |
|
# 获取网络访问的Queue对象 |
|
task = manager.get_task_queue() |
|
result = manager.get_result_queue() |
|
# 向任务队列中添加任务 |
|
for i in range(10): |
|
n = random.randint(0, 10000) |
|
print(f'Put task {n}...') |
|
task.put(n) |
|
# 从结果队列中读取结果 |
|
print('Try getting results...') |
|
for i in range(10): |
|
r = result.get(timeout=10) |
|
print(f'Result: {r}') |
|
# 关闭manager |
|
manager.shutdown() |
|
print('Master exit.') |
注意:在分布式环境中,添加任务到Queue
时,必须通过manager.get_task_queue()
获得的接口进行,而不能直接操作原始的task_queue
。
任务进程(task_worker.py)
任务进程负责连接到服务进程,从任务队列中取任务,并将结果放入结果队列。
python复制代码
import time, sys, queue |
|
from multiprocessing.managers import BaseManager |
|
# 创建QueueManager类 |
|
class QueueManager(BaseManager): |
|
pass |
|
# 注册Queue的访问接口 |
|
QueueManager.register('get_task_queue') |
|
QueueManager.register('get_result_queue') |
|
# 连接到服务进程所在的服务器 |
|
server_addr = '127.0.0.1' # 替换为实际的服务进程IP地址 |
|
print(f'Connecting to server {server_addr}...') |
|
manager = QueueManager(address=(server_addr, 5000), authkey=b'abc') |
|
manager.connect() |
|
# 获取Queue对象 |
|
task = manager.get_task_queue() |
|
result = manager.get_result_queue() |
|
# 处理任务 |
|
for i in range(10): |
|
try: |
|
n = task.get(timeout=1) |
|
print(f'Running task {n} * {n}...') |
|
r = f'{n} * {n} = {n*n}' |
|
time.sleep(1) |
|
result.put(r) |
|
except queue.Empty: |
|
print('Task queue is empty.') |
|
print('Worker exit.') |
运行步骤
- 在一台机器上启动服务进程(
task_master.py
)。 - 在另一台机器(或同一台机器的不同终端)上启动任务进程(
task_worker.py
),并确保其连接到服务进程的正确IP地址和端口。
分布式计算的简单应用
这个Master/Worker模型展示了分布式计算的基本原理。通过简单的改造,可以启动多个worker进程,将任务分布到多台机器上,实现高效的并行处理。例如,将计算n*n
的代码替换为发送邮件的逻辑,就可以构建一个邮件队列的异步发送系统。
注意
- Queue的作用:Queue用于传递任务和接收结果,因此每个任务的描述数据量应尽量小。例如,发送处理日志文件的任务时,应传递日志文件的路径而非文件本身。
- authkey的重要性:authkey用于确保两台机器之间的正常通信,防止恶意干扰。如果任务进程的authkey与服务进程不一致,将无法建立连接。
总结
Python的分布式进程接口简洁且封装良好,非常适合在需要将繁重任务分布到多台机器的环境中应用。通过合理利用这些接口,我们可以轻松实现高效的分布式计算。