Python协程调度

发布于:2025-03-23 ⋅ 阅读:(47) ⋅ 点赞:(0)

介绍

Python 协程调度指的是 在单线程内利用协程(coroutines)实现并发执行 的机制。这主要依赖于 Python 内置的 asyncio 模块。

协程(Coroutines):使用 async def 定义的函数,这类函数在遇到 await 时会挂起执行,将控制权交还给事件循环,然后在适当时刻恢复执行。

异步编程:利用协程实现非阻塞的代码执行,即使只有一个线程也可以同时处理多个 I/O 密集型任务。

事件循环 是 Python 协程调度的核心,它不断轮询任务,检查哪些任务准备好继续执行。

• 当协程遇到 await 时,它会暂停并注册一个 Future(类似占位符),等到异步操作完成后,事件循环会重新唤醒这个协程继续执行。

任务(Task):事件循环会将协程包装成任务(Task),并调度它们的运行。任务是对协程执行状态的封装,允许你取消、等待和检查结果。

• 调度时采用 协作式多任务:每个协程在自己有 I/O 等待时主动让出控制权,而不是抢占式调度。

关键使用:

•	asyncio.run():启动事件循环并运行主协程。
•	await:暂停协程,等待一个可等待对象(例如 Future、Task、协程)的完成。
•	async for 和 async with:分别用于异步迭代和异步上下文管理。

  1. 任务是并发启动的(create_task 立即调度,不等待)。

  2. await task1 会阻塞当前协程直到 task1 完成,但不会阻塞事件循环。

  3. 所以虽然 await task1 写在最前面,但 task2、task3 已经开始运行了,只是你在等待 task1 的完成。

  4. asyncio.sleep(x) 会挂起该任务 x 秒,不占用线程(不会阻塞其他协程)。

await 和create_task


import asyncio

async def test_async():
    async def say_hello(delay:int=1,name:str="world"):
        if delay == 1:
            await asyncio.sleep(20)
        await asyncio.sleep(delay)
        print(f"hello {name}")
    
    task1 = asyncio.create_task(say_hello(1,"world-1"))
    task2 = asyncio.create_task(say_hello(2,"world-2"))
    task3 = asyncio.create_task(say_hello(3,"world-3"))
    await task1
    await task2
    await task3

if __name__ == "__main__":
    asyncio.run(test_async())

• create_task() 是立即将三个任务放入事件循环中并发执行的。

• 由于 await task1 等待第一个任务结束(1秒后),然后才 await 第二个,再等 2 秒,第三个再等 3 秒。

• 所以它们按顺序等待结束的时间点去打印。

⚠️ 实际是并发启动,但你是顺序 await,所以显得顺序执行。

• create_task 不会阻塞,它立即提交任务到事件循环。

• await taskX 只会阻塞当前协程,不阻塞事件循环,其他任务依然可以执行。

• asyncio.sleep(x) 是非阻塞的延时方法,它只是“挂起”当前协程 x 秒,让出事件循环。

比较两个形式:

async def test_async():
    async def say_hello(delay:int=1,name:str="world"):
        if delay == 1:
            await asyncio.sleep(20)
        await asyncio.sleep(delay)
        print(f"hello {name}")
    
    task1 = asyncio.create_task(say_hello(1,"world-1"))
    task2 = asyncio.create_task(say_hello(2,"world-2"))
    task3 = asyncio.create_task(say_hello(3,"world-3"))
    
    await task1
    await task2
    await task3
    
    
    
    
    await say_hello(1,"world-100")
    await say_hello(2,"world-200")
    await say_hello(3,"world-300")
  •	等 world-100 打印完(20 + 1 秒),才开始执行 world-2002 秒),再执行 world-3003 秒)。
	•	总时间 = 1+20 + 2 + 3 = 26 秒左右。
	•	阻塞后面的每一个任务,因为每一个 await 是顺序执行。
特性 使用 create_task 并发执行 直接 await 顺序执行
是否并发? ✅ 是 ❌ 否
谁先完成谁先输出? ✅ 是 ❌ 必须一个个等
效率 ⏱️ 高(多个任务可同时执行) 🐌 低(串行执行)
适用场景 IO 密集、多个任务无依赖 逻辑强依赖、有先后顺序要求

不过并发一般这样写:

await asyncio.gather(
    say_hello(1, "world-1"),
    say_hello(2, "world-2"),
    say_hello(3, "world-3"),
)

其实我一直不是很懂几个概念,分别是进程 线程 协程,进程我知道 就是 一个fastapi 或者一个main函数运行的程序,在一个系统里面只有1个存在,开多个进程就是 多个worker,线程就是在一个进程里面的,也是并发操作的,但是由于python gil的原因,所以就算开了线程 一般意义也不大,但是

线程操作

注意:哪个线程先完成就先输出对应的 Hello,和 asyncio.create_task 类似。

def test_thread_pool():
    def say_hello(delay:int=1,name:str="world"):
        if delay == 1:
            time.sleep(20)
        time.sleep(delay)
        print(f"hello {name}")
    # 创建线程
    t1 = threading.Thread(target=say_hello, args=(1, "world-1"))
    t2 = threading.Thread(target=say_hello, args=(2, "world-2"))
    t3 = threading.Thread(target=say_hello, args=(3, "world-3"))
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

线程是操作系统能够进行运算调度的最小单位,它是比进程更小的执行单元。
• 一个进程可以包含多个线程。
• 所有线程共享该进程的内存空间(所以线程之间通信非常快,也有数据竞争风险)。

所谓“抢占式并发”,是指操作系统调度器来决定哪个线程何时运行,而不是程序本身控制。

特点 说明
自动切换 线程运行时,操作系统会在任意时刻暂停一个线程,切换到另一个线程
不可预测 你写的代码不会决定线程先后,操作系统决定
非协作 一个线程不会“让出”执行权,操作系统直接打断

GIL 是 CPython 为了线程安全而设置的一把大锁 —— 同一时刻,只能有一个线程执行 Python 字节码

这意味着:

• 多线程不能并行执行 Python 代码(即使有多核 CPU)。

• 多线程适合 IO 密集型任务,而不是 CPU 密集型任务。

• CPU 密集型任务建议使用 multiprocessing 或 C/C++ 扩展绕过 GIL。

控制不了先后,因为调度由 OS 决定,这就是“抢占式”!

线程vs 协程

对比点 asyncio threading
并发机制 单线程协程切换(协作式) 多线程并发(抢占式)
适合任务类型 IO 密集(如网络请求) IO 密集 or 少量 CPU 密集
切换成本 高(线程上下文切换)
调度顺序 由事件循环控制 由操作系统调度器控制
可否使用 await ✅ 是 ❌ 否
对比项 协程(asyncio) 线程(threading)
调度者 Python 的事件循环(你自己) 操作系统
控制权 协作式,让出执行 抢占式,被打断
是否共享内存
是否受 GIL 限制 没有影响 会被 GIL 限制并发