1 asyncio的优势
asyncio
和 threading
都是 Python 中处理并发的方式,但它们各有优势。
效率:
asyncio
是基于单线程的,它通过协程(coroutine)实现并发,协程之间的切换开销小于线程之间的切换开销。在 I/O 密集型任务(如网络请求)中,asyncio
可以更高效地利用 CPU。资源占用:
asyncio
的协程是在一个线程中运行的,因此它占用的系统资源(如内存)通常比多线程要少。编程模型:
asyncio
提供了一种基于事件循环的编程模型,这种模型可以更好地处理异步 I/O 操作。而threading
则是基于线程的并发模型,更适合于 CPU 密集型任务。
总的来说,asyncio
和 threading
各有适用的场景。如果你的任务主要是 I/O 密集型的,那么 asyncio
可能是一个更好的选择。
2 举例: IDE 和Jupyter lab运行 asyncio的小区别
下面看这么一个例子:
import asyncio
async def my_task(task_id):
# Your time-consume task
await asyncio.sleep(2) # Simulate a long process
return {task_id: "done"}
async def my_coroutine():
# launch 100 task
tasks = []
for i in range(100):
tasks.append(my_task(i))
# wait for all tasks to complete
results = await asyncio.gather(*tasks)
total_result = {}
for result in results:
total_result.update(result)
return total_result
final_result = asyncio.run(my_coroutine())
print(final_result)
这个代码在python IDE 中执行, 没有任何错误, 但是在jupyter 中执行会报错。
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[132], line 25
21 total_result.update(result)
22 return total_result
---> 25 final_result = asyncio.run(my_coroutine())
26 print(final_result)
File /data/anaconda3/yes/envs/jupyter/lib/python3.10/asyncio/runners.py:33, in run(main, debug)
9 """Execute the coroutine and return the result.
10
11 This function runs the passed coroutine, taking care of
(...)
30 asyncio.run(main())
31 """
32 if events._get_running_loop() is not None:
---> 33 raise RuntimeError(
34 "asyncio.run() cannot be called from a running event loop")
36 if not coroutines.iscoroutine(main):
37 raise ValueError("a coroutine was expected, got {!r}".format(main))
RuntimeError: asyncio.run() cannot be called from a running event loop
原因是 Jupyter Lab 中的事件循环是由其内部的 Tornado 服务器提供的。在 Jupyter Lab 中,事件循环主要用于处理用户的输入、执行代码、更新界面等任务。例如,当你在 Jupyter Lab 中运行一个代码单元时,这个操作就会被添加到事件循环中,然后由事件循环调度 Python 解释器来执行这段代码。在代码执行的同时,事件循环还可以继续处理其他的事件,如用户的其他输入、网络请求等。
因此,如果你在 Jupyter Lab 中使用 asyncio.run
或 loop.run_until_complete
这样的函数来运行协程,会导致错误,因为这些函数会尝试创建和管理一个新的事件循环,而在 Jupyter Lab 中,已经有一个正在运行的事件循环了。在这种情况下,你应该使用 asyncio.create_task
或直接在 cell 中 await
协程来运行协程。比如采用下面方式来执行
import asyncio
async def my_task(task_id):
# Your time-consume task
await asyncio.sleep(2) # Simulate a long process
return {task_id: "done"}
async def my_coroutine():
# launch 100 task
tasks = []
for i in range(100):
tasks.append(my_task(i))
# wait for all tasks to complete
results = await asyncio.gather(*tasks)
total_result = {}
for result in results:
total_result.update(result)
return total_result
final_result = await my_coroutine()
print(final_result)
但是呢, 这个代码在python IDE 中会报错 “SyntaxError: ‘await’ outside function
”,直接与asyncio设计的理念冲突了。
3 在asyncio中执行耗时任务
loop.run_in_executor是Python的asyncio库中的一个方法。用于在单独的线程或进程中运行一个函数,并返回一个表示函数执行的Future对象。这允许你运行阻塞或CPU密集型操作,而不会阻塞事件循环。
下面看一个简单的示例:
import asyncio
import concurrent.futures
def cpu_bound_operation(x):
# 这可以是任何CPU密集型操作
return x * x
async def main():
loop = asyncio.get_event_loop()
# 在单独的线程中运行
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_operation, 5)
print(f'结果是 {result}')
# 运行main函数直到它完成
asyncio.run(main())
示例中, 建立了一个 python的线程池, 针对一个 CPU 耗时任务, 提交到线程池, 有单独的线程来执行, 而不要同步执行, 进而阻塞事件循环。
4 控制协程的并发数
import asyncio
# 创建一个Semaphore对象,最大并发数为3
sem = asyncio.Semaphore(3)
async def my_coroutine(num):
async with sem:
# 模拟异步IO操作
print(f'Coroutine {num} is running')
await asyncio.sleep(1)
print(f'Coroutine {num} is done')
# 创建一个事件循环
loop = asyncio.get_event_loop()
# 创建10个协程,但是由于我们设置了最大并发数为3,所以一次只会有3个协程在运行
coroutines = [my_coroutine(i) for i in range(10)]
# 运行所有协程,直到它们全部完成
loop.run_until_complete(asyncio.gather(*coroutines))