使用 python asyncio的一个例子,以及在jupyter lab中使用时的一个常识

发布于:2024-07-04 ⋅ 阅读:(17) ⋅ 点赞:(0)

1 asyncio的优势

asynciothreading 都是 Python 中处理并发的方式,但它们各有优势。

  1. 效率asyncio 是基于单线程的,它通过协程(coroutine)实现并发,协程之间的切换开销小于线程之间的切换开销。在 I/O 密集型任务(如网络请求)中,asyncio 可以更高效地利用 CPU。

  2. 资源占用asyncio 的协程是在一个线程中运行的,因此它占用的系统资源(如内存)通常比多线程要少。

  3. 编程模型asyncio 提供了一种基于事件循环的编程模型,这种模型可以更好地处理异步 I/O 操作。而 threading 则是基于线程的并发模型,更适合于 CPU 密集型任务。

总的来说,asynciothreading 各有适用的场景。如果你的任务主要是 I/O 密集型的,那么 asyncio 可能是一个更好的选择。

2 举例: IDE 和Jupyter lab运行 asyncio的小区别

下面看这么一个例子:

import asyncio


async def my_task(task_id):
    # Your time-consume task
    await asyncio.sleep(2)  # Simulate a long process
    return {task_id: "done"}


async def my_coroutine():
    # launch 100 task
    tasks = []
    for i in range(100):
        tasks.append(my_task(i))

    # wait for all tasks to complete
    results = await asyncio.gather(*tasks)

    total_result = {}
    for result in results:
        total_result.update(result)
    return total_result


final_result = asyncio.run(my_coroutine())
print(final_result)

这个代码在python IDE 中执行, 没有任何错误, 但是在jupyter 中执行会报错。

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[132], line 25
     21         total_result.update(result)
     22     return total_result
---> 25 final_result = asyncio.run(my_coroutine())
     26 print(final_result)

File /data/anaconda3/yes/envs/jupyter/lib/python3.10/asyncio/runners.py:33, in run(main, debug)
      9 """Execute the coroutine and return the result.
     10 
     11 This function runs the passed coroutine, taking care of
   (...)
     30     asyncio.run(main())
     31 """
     32 if events._get_running_loop() is not None:
---> 33     raise RuntimeError(
     34         "asyncio.run() cannot be called from a running event loop")
     36 if not coroutines.iscoroutine(main):
     37     raise ValueError("a coroutine was expected, got {!r}".format(main))

RuntimeError: asyncio.run() cannot be called from a running event loop

原因是 Jupyter Lab 中的事件循环是由其内部的 Tornado 服务器提供的。在 Jupyter Lab 中,事件循环主要用于处理用户的输入、执行代码、更新界面等任务。例如,当你在 Jupyter Lab 中运行一个代码单元时,这个操作就会被添加到事件循环中,然后由事件循环调度 Python 解释器来执行这段代码。在代码执行的同时,事件循环还可以继续处理其他的事件,如用户的其他输入、网络请求等。

因此,如果你在 Jupyter Lab 中使用 asyncio.runloop.run_until_complete 这样的函数来运行协程,会导致错误,因为这些函数会尝试创建和管理一个新的事件循环,而在 Jupyter Lab 中,已经有一个正在运行的事件循环了。在这种情况下,你应该使用 asyncio.create_task 或直接在 cell 中 await 协程来运行协程。比如采用下面方式来执行

import asyncio


async def my_task(task_id):
    # Your time-consume task
    await asyncio.sleep(2)  # Simulate a long process
    return {task_id: "done"}


async def my_coroutine():
    # launch 100 task
    tasks = []
    for i in range(100):
        tasks.append(my_task(i))

    # wait for all tasks to complete
    results = await asyncio.gather(*tasks)

    total_result = {}
    for result in results:
        total_result.update(result)
    return total_result


final_result = await my_coroutine()
print(final_result)

但是呢, 这个代码在python IDE 中会报错 “SyntaxError: ‘await’ outside function
”,直接与asyncio设计的理念冲突了。

3 在asyncio中执行耗时任务

loop.run_in_executor是Python的asyncio库中的一个方法。用于在单独的线程或进程中运行一个函数,并返回一个表示函数执行的Future对象。这允许你运行阻塞或CPU密集型操作,而不会阻塞事件循环。
下面看一个简单的示例:

import asyncio
import concurrent.futures

def cpu_bound_operation(x):
    # 这可以是任何CPU密集型操作
    return x * x

async def main():
    loop = asyncio.get_event_loop()

    # 在单独的线程中运行
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_operation, 5)
        print(f'结果是 {result}')

# 运行main函数直到它完成
asyncio.run(main())

示例中, 建立了一个 python的线程池, 针对一个 CPU 耗时任务, 提交到线程池, 有单独的线程来执行, 而不要同步执行, 进而阻塞事件循环。

4 控制协程的并发数

import asyncio

# 创建一个Semaphore对象,最大并发数为3
sem = asyncio.Semaphore(3)

async def my_coroutine(num):
    async with sem:
        # 模拟异步IO操作
        print(f'Coroutine {num} is running')
        await asyncio.sleep(1)
        print(f'Coroutine {num} is done')

# 创建一个事件循环
loop = asyncio.get_event_loop()

# 创建10个协程,但是由于我们设置了最大并发数为3,所以一次只会有3个协程在运行
coroutines = [my_coroutine(i) for i in range(10)]

# 运行所有协程,直到它们全部完成
loop.run_until_complete(asyncio.gather(*coroutines))