[并发与并行] python如何构建并发管道处理多阶段任务?

发布于:2025-02-11 ⋅ 阅读:(28) ⋅ 点赞:(0)

1. 背景&目标

背景:一个任务可分为多个阶段(各个阶段非CPU密集型任务,而是属于IO密集型任务),希望每个阶段能够交给各自的线程去执行。
目标:构建支持多并发的稳定的程序高效处理上述问题的程序,要求能够灵活设置并发。

2. show me the code

假设任务分为三个阶段,分别是download、resize和upload,代码采用管道将三个阶段进行拼接。
在这里插入图片描述
三个阶段的处理简化为三个函数,处理结果通过Queue传到下个阶段,各个阶段可以创建不同的线程去消费Queue中的数据,直到所有数据处理完成。代码如下:

from threading import Thread
from queue import Queue
import time

my_queue = Queue()


class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)
    
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    # 退出机制
                    return
                yield item
            finally:
                self.task_done()


class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    
    def run(self):
        # 确保线程退出的机制:在生产者-消费者模型中,通常需要使用特殊标志(如 SENTINEL)通知消费者线程结束循环。
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


def download(obj):
    print(f'[download] id= {id(obj)}')
    time.sleep(0.1)
    return obj


def resize(obj):
    print(f'[resize] id= {id(obj)}')
    time.sleep(0.01)
    return obj


def upload(obj):
    print(f'[upload] id= {id(obj)}')
    time.sleep(1)
    return obj

def start_threads(count, *args):
    threads = [Worker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads

def stop_threads(closable_queue, threads):
    # close次数根据threads次数来,保障每个每个线程都能正确关闭
    for _ in threads:
        closable_queue.close()

    # 阻塞调用线程,直到队列中的所有任务都被处理完成
    # 每次向队列中添加一个任务(通过 .put()),队列内部的任务计数器会增加 1。
    # 每次调用 .task_done(),任务计数器会减少 1。
    # .join() 方法会一直阻塞,直到任务计数器降为 0,也就是队列中的所有任务都被标记为完成(通过调用 .task_done())
    closable_queue.join()

    for thread in threads:
        thread.join()


if __name__ == '__main__':
    download_queue = ClosableQueue()
    resize_queue = ClosableQueue()
    upload_queue = ClosableQueue()
    done_queue = ClosableQueue()

    download_threads = start_threads(3, download, download_queue, resize_queue)
    resize_threads = start_threads(4, resize, resize_queue, upload_queue)
    upload_threads = start_threads(5, upload, upload_queue, done_queue)

    obj = object()
    for _ in range(1000):
        download_queue.put(obj)

    stop_threads(download_queue, download_threads)
    stop_threads(resize_queue, resize_threads)
    stop_threads(upload_queue, upload_threads)

    print(done_queue.qsize(), 'items fininished')

3.知识点

上述代码涉及到几个知识点,挺有意思的:

  1. 当我们想用queue来传递数据时,头疼的点在于:
    ①下游任务该怎么判断上游生产了数据呢?轮巡有点不优雅,可能会造成性能影响。
    ②上游任务啥时候告诉下游数据生产完毕了呢?可以通过插入一个特殊的数据告诉下游生产完毕了。
    ③队列该设置多大呢?如果下游数据消费不过来,上游一直生产数据插入到队列,容易oom。
    ④怎么判断中间队列的数据消费完毕了呢?即如何优雅地结束程序。

上述代码利用Queue类的特性比较优雅地解决了上述的几个问题:
① Queue非常优雅,设置size之后,如果size满了,put方法会阻塞,直到数据被消费了才可以往里面添加数据。当queue为空,get方法会阻塞,直到有数据进来。
② 如代码中SENTINEL对象,如果调用close方法,就往队列中插入一个哨兵对象,告诉下游,上游数据生产完毕了。
③ 代码中没有设置队列大小,但是Queue支持设置。
④ Queue中.join() 方法会一直阻塞,直到任务计数器降为 0,也就是队列中的所有任务都被标记为完成。
其中,任务计数器的原理为:

  • 每次向队列中添加一个任务(通过 .put()),队列内部的任务计数器会增加 1。
  • 每次调用 .task_done(),任务计数器会减少 1。

注意看,改写__iter__方法时,每次获取一个元素都调用了一次task_done()方法,即告诉任务计数器需要-1了。

4. 总结

这段代码还是比较精髓,其中关于队列Queue的用法,关于threads的用法和线程中共享数据的用法值得学习。


网站公告

今日签到

点亮在社区的每一天
去签到