介绍
Python 协程调度指的是 在单线程内利用协程(coroutines)实现并发执行 的机制。这主要依赖于 Python 内置的 asyncio 模块。
• 协程(Coroutines):使用 async def 定义的函数,这类函数在遇到 await 时会挂起执行,将控制权交还给事件循环,然后在适当时刻恢复执行。
• 异步编程:利用协程实现非阻塞的代码执行,即使只有一个线程也可以同时处理多个 I/O 密集型任务。
• 事件循环 是 Python 协程调度的核心,它不断轮询任务,检查哪些任务准备好继续执行。
• 当协程遇到 await 时,它会暂停并注册一个 Future(类似占位符),等到异步操作完成后,事件循环会重新唤醒这个协程继续执行。
• 任务(Task):事件循环会将协程包装成任务(Task),并调度它们的运行。任务是对协程执行状态的封装,允许你取消、等待和检查结果。
• 调度时采用 协作式多任务:每个协程在自己有 I/O 等待时主动让出控制权,而不是抢占式调度。
关键使用:
• asyncio.run():启动事件循环并运行主协程。
• await:暂停协程,等待一个可等待对象(例如 Future、Task、协程)的完成。
• async for 和 async with:分别用于异步迭代和异步上下文管理。
任务是并发启动的(create_task 立即调度,不等待)。
await task1 会阻塞当前协程直到 task1 完成,但不会阻塞事件循环。
所以虽然 await task1 写在最前面,但 task2、task3 已经开始运行了,只是你在等待 task1 的完成。
asyncio.sleep(x) 会挂起该任务 x 秒,不占用线程(不会阻塞其他协程)。
await 和create_task
import asyncio
async def test_async():
async def say_hello(delay:int=1,name:str="world"):
if delay == 1:
await asyncio.sleep(20)
await asyncio.sleep(delay)
print(f"hello {name}")
task1 = asyncio.create_task(say_hello(1,"world-1"))
task2 = asyncio.create_task(say_hello(2,"world-2"))
task3 = asyncio.create_task(say_hello(3,"world-3"))
await task1
await task2
await task3
if __name__ == "__main__":
asyncio.run(test_async())
• create_task() 是立即将三个任务放入事件循环中并发执行的。
• 由于 await task1 等待第一个任务结束(1秒后),然后才 await 第二个,再等 2 秒,第三个再等 3 秒。
• 所以它们按顺序等待结束的时间点去打印。
⚠️ 实际是并发启动,但你是顺序 await,所以显得顺序执行。
• create_task 不会阻塞,它立即提交任务到事件循环。
• await taskX 只会阻塞当前协程,不阻塞事件循环,其他任务依然可以执行。
• asyncio.sleep(x) 是非阻塞的延时方法,它只是“挂起”当前协程 x 秒,让出事件循环。
比较两个形式:
async def test_async():
async def say_hello(delay:int=1,name:str="world"):
if delay == 1:
await asyncio.sleep(20)
await asyncio.sleep(delay)
print(f"hello {name}")
task1 = asyncio.create_task(say_hello(1,"world-1"))
task2 = asyncio.create_task(say_hello(2,"world-2"))
task3 = asyncio.create_task(say_hello(3,"world-3"))
await task1
await task2
await task3
await say_hello(1,"world-100")
await say_hello(2,"world-200")
await say_hello(3,"world-300")
• 等 world-100 打印完(20 + 1 秒),才开始执行 world-200(2 秒),再执行 world-300(3 秒)。
• 总时间 = 1+20 + 2 + 3 = 26 秒左右。
• 阻塞后面的每一个任务,因为每一个 await 是顺序执行。
特性 | 使用 create_task 并发执行 | 直接 await 顺序执行 |
---|---|---|
是否并发? | ✅ 是 | ❌ 否 |
谁先完成谁先输出? | ✅ 是 | ❌ 必须一个个等 |
效率 | ⏱️ 高(多个任务可同时执行) | 🐌 低(串行执行) |
适用场景 | IO 密集、多个任务无依赖 | 逻辑强依赖、有先后顺序要求 |
不过并发一般这样写:
await asyncio.gather(
say_hello(1, "world-1"),
say_hello(2, "world-2"),
say_hello(3, "world-3"),
)
其实我一直不是很懂几个概念,分别是进程 线程 协程,进程我知道 就是 一个fastapi 或者一个main函数运行的程序,在一个系统里面只有1个存在,开多个进程就是 多个worker,线程就是在一个进程里面的,也是并发操作的,但是由于python gil的原因,所以就算开了线程 一般意义也不大,但是
线程操作
注意:哪个线程先完成就先输出对应的 Hello,和 asyncio.create_task 类似。
def test_thread_pool():
def say_hello(delay:int=1,name:str="world"):
if delay == 1:
time.sleep(20)
time.sleep(delay)
print(f"hello {name}")
# 创建线程
t1 = threading.Thread(target=say_hello, args=(1, "world-1"))
t2 = threading.Thread(target=say_hello, args=(2, "world-2"))
t3 = threading.Thread(target=say_hello, args=(3, "world-3"))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
线程是操作系统能够进行运算调度的最小单位,它是比进程更小的执行单元。
• 一个进程可以包含多个线程。
• 所有线程共享该进程的内存空间(所以线程之间通信非常快,也有数据竞争风险)。
所谓“抢占式并发”,是指操作系统调度器来决定哪个线程何时运行,而不是程序本身控制。
特点 | 说明 |
---|---|
自动切换 | 线程运行时,操作系统会在任意时刻暂停一个线程,切换到另一个线程 |
不可预测 | 你写的代码不会决定线程先后,操作系统决定 |
非协作 | 一个线程不会“让出”执行权,操作系统直接打断 |
GIL 是 CPython 为了线程安全而设置的一把大锁 —— 同一时刻,只能有一个线程执行 Python 字节码。
这意味着:
• 多线程不能并行执行 Python 代码(即使有多核 CPU)。
• 多线程适合 IO 密集型任务,而不是 CPU 密集型任务。
• CPU 密集型任务建议使用 multiprocessing 或 C/C++ 扩展绕过 GIL。
控制不了先后,因为调度由 OS 决定,这就是“抢占式”!
线程vs 协程
对比点 | asyncio | threading |
---|---|---|
并发机制 | 单线程协程切换(协作式) | 多线程并发(抢占式) |
适合任务类型 | IO 密集(如网络请求) | IO 密集 or 少量 CPU 密集 |
切换成本 | 低 | 高(线程上下文切换) |
调度顺序 | 由事件循环控制 | 由操作系统调度器控制 |
可否使用 await | ✅ 是 | ❌ 否 |
对比项 | 协程(asyncio) | 线程(threading) |
---|---|---|
调度者 | Python 的事件循环(你自己) | 操作系统 |
控制权 | 协作式,让出执行 | 抢占式,被打断 |
是否共享内存 | 是 | 是 |
是否受 GIL 限制 | 没有影响 | 会被 GIL 限制并发 |