多线程-4-线程池

发布于:2025-07-20 ⋅ 阅读:(16) ⋅ 点赞:(0)

线程池

在 Python 中,线程池是一种用于管理多个线程资源、提高并发效率的重要工具,尤其适用于 I/O 密集型任务 (如网络请求、文件读写等)。

什么是线程池?

线程池是一组预先创建并处于等待状态的线程,它们等待任务被分配给它们执行。使用线程池可以:

  • 减少线程创建和销毁的开销
  • 控制并发数量,避免资源耗尽
  • 提高程序响应速度

为什么需要线程池?

在没有线程池的情况下,每次执行一个任务都需要手动创建线程,任务完成后销毁线程,这会带来较大的开销。而线程池可以复用线程,提高效率。

基本使用

Python 提供了多种实现线程池的方式,其中最推荐的是:concurrent.futures.ThreadPoolExecutor这是 Python 3.2+ 内置的高级线程池接口,简洁易用,功能强大。

concurrent包中仅有futures一个模块用于启动并行任务。

from concurrent.futures import ThreadPoolExecutor,as_ompleted

# 创建一个最大线程数为5的线程池
executor = ThreadPoolExecutor(max_workers=5)
# 提交任务 
future = executor.submit(task_function,arg1,arg2)

case:

from  concurrent.futures import ThreadPoolExecutor,as_completed
import time

def func(time_,num):
    time.sleep(1)
    print(f"\033[32m{num}:{time_}\033[0m")
    return time_

executor = ThreadPoolExecutor(max_workers=20)
future = [executor.submit(func,time_,100) for time_ in range(100)]
print([f.result() for f in as_completed(future)])
  1. future = [executor.submit(func, time_, 100) for time_ in range(100)] 这行代码会向线程池提交100个任务,每个任务会调用 func 函数。 executor.submit 返回的是 Future 对象。在这个代码中future代表的就是Future列表,即future:list[Future]
  2. Future 对象代表一个异步执行的任务。它包含了任务的执行状态(如是否完成)、返回结果(如果已完成)、异常信息(如果有异常)等内容。你可以通过 future.result() 获取任务的返回值,如果任务还没完成,这个方法会阻塞直到任务完成。
  3. as_completed(future) 是一个生成器,会在每个 Future 对象完成时依次产出它。这样你可以按任务完成的顺序处理结果,而不是提交的顺序。这对于并发任务来说很有用,因为有些任务可能先完成,有些后完成。

如果不使用as_completed,直接遍历future,获取的future.result是按任务submit顺序输出的,反之是按照任务完成顺序输出的,效率更高(如果先提交了但是未完成会有一定时间的阻塞)。

# 按完成顺序获取结果
for f in as_completed(future):
    print(f.result())

# 按提交顺序获取结果
for f in future:
    print(f.result())

future.result()返回的值与函数返回值对应。future.result返回的是返回值object信息,加上括号()才是函数返回值。

case2:

线程池结合with语句

import time
from concurrent.futures import ThreadPoolExecutor,as_completed
import random
import json
def func(args:list[dict]):
    num = random.randint(1,3)
    time.sleep(num*2)
    print(f"\033[3{num}m:{args}\033[0m")
    return args

if __name__ == "__main__":
    with open('./education_self.json','r',encoding="utf-8") as f:
        data = json.load(f)
    with ThreadPoolExecutor(max_workers=5) as executor:
        future = [executor.submit(func,arg) for  arg in data]
    for f in as_completed(future):
        print(f.result)

case3:

线程池结合map方法,map方法会将参数逐一传递,返回的不再是future对象而是future.result(),也就是这一步被省略了,直接返回函数结果。

import time
from concurrent.futures import ThreadPoolExecutor,as_completed
import random
import json
def func(args:dict):
    num = random.randint(1,3)
    # time.sleep(num*2)
    print(f"\033[3{num}m:{args}\033[0m")
    return args

if __name__ == "__main__":
    with open('./education_self.json','r',encoding="utf-8") as f:
        data = json.load(f)
    with ThreadPoolExecutor(max_workers=5) as executor:
        for res in executor.map(func,data):
            print(res)

case4

import concurrent.futures


def test_function(arguments: tuple):
    test_value, function = arguments
    """Print the test value 1000 times"""
    for i in range(0, 1000):
        print(f"Function {function}, {test_value}, iteration {i}")

    return test_value


def main():
    """Main function"""
    
    # Context manager for parallel tasks
    with concurrent.futures.ThreadPoolExecutor() as executor:

        # Submit example. Executes the function calls asynchronously
        result = [executor.submit(test_function, (i, "submit")) for i in range(1, 21)]

        # Map example. 
        # Takes an iterable as argument that will execute the function once for each item
        result_2 = executor.map(test_function, [(i, "map") for i in range(1, 21)])

    for future in concurrent.futures.as_completed(result):
        print(f"Submit: Process {future.result()} completed")

    for future in result_2:
        print(f"Map: Process {future} completed")


if __name__ == '__main__':
    main()


网站公告

今日签到

点亮在社区的每一天
去签到