Python进程与线程:分布式进程

发布于:2024-12-22 ⋅ 阅读:(13) ⋅ 点赞:(0)

在Python中,当我们面临选择使用线程(Thread)还是进程(Process)时,进程往往因其更高的稳定性和可扩展性而被优先考虑。特别是,进程能够跨越多台机器进行分布,而线程则受限于同一台机器的多个CPU核心。Python的multiprocessing模块不仅支持多进程,其managers子模块更是提供了将多进程分布到多台机器上的能力。

通过managers模块,我们可以轻松编写分布式多进程程序,而无需深入了解网络通信的细节。以下是一个具体的例子,展示了如何将原本在同一台机器上运行的使用Queue进行通信的多进程程序,扩展到两台机器上运行。

服务进程(task_master.py)

服务进程负责启动并注册Queue到网络上,然后向其中写入任务。


python复制代码

import random, time, queue
from multiprocessing.managers import BaseManager
# 定义任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 创建一个继承自BaseManager的QueueManager类
class QueueManager(BaseManager):
pass
# 将两个Queue注册到网络上
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 启动QueueManager,绑定端口5000,并设置验证码'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')
manager.start()
# 获取网络访问的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 向任务队列中添加任务
for i in range(10):
n = random.randint(0, 10000)
print(f'Put task {n}...')
task.put(n)
# 从结果队列中读取结果
print('Try getting results...')
for i in range(10):
r = result.get(timeout=10)
print(f'Result: {r}')
# 关闭manager
manager.shutdown()
print('Master exit.')

注意:在分布式环境中,添加任务到Queue时,必须通过manager.get_task_queue()获得的接口进行,而不能直接操作原始的task_queue

任务进程(task_worker.py)

任务进程负责连接到服务进程,从任务队列中取任务,并将结果放入结果队列。


python复制代码

import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建QueueManager类
class QueueManager(BaseManager):
pass
# 注册Queue的访问接口
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务进程所在的服务器
server_addr = '127.0.0.1' # 替换为实际的服务进程IP地址
print(f'Connecting to server {server_addr}...')
manager = QueueManager(address=(server_addr, 5000), authkey=b'abc')
manager.connect()
# 获取Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 处理任务
for i in range(10):
try:
n = task.get(timeout=1)
print(f'Running task {n} * {n}...')
r = f'{n} * {n} = {n*n}'
time.sleep(1)
result.put(r)
except queue.Empty:
print('Task queue is empty.')
print('Worker exit.')

运行步骤

  1. 在一台机器上启动服务进程(task_master.py)。
  2. 在另一台机器(或同一台机器的不同终端)上启动任务进程(task_worker.py),并确保其连接到服务进程的正确IP地址和端口。

分布式计算的简单应用

这个Master/Worker模型展示了分布式计算的基本原理。通过简单的改造,可以启动多个worker进程,将任务分布到多台机器上,实现高效的并行处理。例如,将计算n*n的代码替换为发送邮件的逻辑,就可以构建一个邮件队列的异步发送系统。

注意

  • Queue的作用:Queue用于传递任务和接收结果,因此每个任务的描述数据量应尽量小。例如,发送处理日志文件的任务时,应传递日志文件的路径而非文件本身。
  • authkey的重要性:authkey用于确保两台机器之间的正常通信,防止恶意干扰。如果任务进程的authkey与服务进程不一致,将无法建立连接。

总结

Python的分布式进程接口简洁且封装良好,非常适合在需要将繁重任务分布到多台机器的环境中应用。通过合理利用这些接口,我们可以轻松实现高效的分布式计算。