异步编程
Python 的异步编程是一种高效的并发编程方式,特别适合 I/O 密集型任务(如网络请求、文件读写等)。通过异步编程,可以在等待 I/O 操作时释放 CPU 资源,从而提高程序的效率。使用asyncio
是 Python 标准库中用于编写异步代码的模块。
核心概念:
事件循环(Event Loop):管理所有异步任务的执行。
协程(Coroutine):使用
async def
定义的函数,可以在其中使用await
挂起任务。Future:表示异步操作的结果。
Task:对协程的封装,用于调度执行。
使用
asyncio模块官网定义:asyncio 是用来编写并发 代码的库,使用 async/await 语法。asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
# 这个模块蛮好记住的 异步 比如js中的都是async实现同步 而python 是根据io的
import asyncio
# 定义一个python的asyncio 定义一个协程
async def main():
print('Hello ...')
# 两个参数分别代表了 一个是多少秒 第二个参数代表返回值 或者返回的内容 await 用于挂起协程,等待异步操作完成。
result = await asyncio.sleep(delay=1,result="hello how are you?")
print('... World!')
print(result)
# help(asyncio.sleep) async sleep(delay, result=None) delay是秒
asyncio.run(main())
并发执行
gather函数
并行执行方式一:使用gather方法gather(*coros_or_futures, return_exceptions=False),执行参数一任务方法、参数二return_exceptions=False代表任务1失败的话 任务2不会在执行 起到一个中断作用如若是True则会继续执行(看具体场景)。
import asyncio
# help(asyncio.gather) gather(*coros_or_futures, return_exceptions=False)
# 定义第一个协程
async def task1():
print("task1 start")
await asyncio.sleep(1)
print("task1 end")
# 定义第二个协程
async def task2():
print("task2 start")
await asyncio.sleep(2)
print("task2 end")
# 运行并行方式一使用gather方法执行 参数一任务方法、参数二return_exceptions=False代表任务1失败的话 任务2不会在执行 起到一个中断作用如若是True则会继续执行
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
create_task函数:
asyncio.create_task(
my_coroutine(),# 协程只能写一个
name="MyTask", # 指定任务名称
context=context # 指定任务运行的上下文 使用的是扩展模块
)
import asyncio
# 上下文模块 具体看下这个位置的函数以及用法很简单:https://docs.python.org/zh-cn/3.13/library/contextvars.html#module-contextvars
import contextvars
# 定义一个上下文变量
var = contextvars.ContextVar('var', default='default')
# 定义一个协程函数
async def my_coroutine():
await asyncio.sleep(1)
# 获取上下文变量的值
value = var.get()
print(f"Task '{asyncio.current_task().get_name()}' 上下文的变量为: {value}")
return "返回值"
async def main():
# 创建一个上下文,并设置上下文变量的值
context = contextvars.copy_context()
context.run(var.set, '新的变量哈')
# 使用 create_task 创建任务,并指定 name 和 context
task = asyncio.create_task(
my_coroutine(),
name="MyTask", # 指定任务名称
context=context # 指定任务运行的上下文
)
# 等待任务完成
result = await task
print(f"Task result: {result}")
# 运行主函数
asyncio.run(main())
异步上下文
异步上下文管理器是Python中用于管理异步资源的一种机制。它通过实现现__aenter__和__aexit__方法,使得我们可以使用async with语法来管理异步资源的获取和释放。(就是使用啊enter方法做一些方法入参操作连接数据库、获取锁操作等、使用aexit方法来释放一些资源)
import asyncio
class AsyncResource:
# 初始化方法
def __init__(self, database):
self.database = database
# 异步进入上下文
async def __aenter__(self):
print(f"获取数据库连接然后连接数据库 {self.database}")
await asyncio.sleep(1) # 模拟异步操作
return self # 返回资源对象
# 异步退出上下文
async def __aexit__(self, exc_type, exc_value, traceback):
print(f"退出后释放该数据库对象的 {self.database}")
await asyncio.sleep(1) # 模拟异步操作
if exc_type is not None:
print(f"An exception occurred: {exc_value}")
# 如果返回 True,异常会被抑制;否则,异常会传播
return True
# 使用异步上下文管理器
async def use_resource():
async with AsyncResource("url:localhost,password=123456,user=root") as resource:
print(f"使用数据库 {resource.database}")
# 这里可以执行一些异步操作
await asyncio.sleep(1)
# 运行异步函数
asyncio.run(use_resource())
Future函数
代表一个异步操作的结果。Future对象通常用于跟踪异步操作的状态(是否完成、是否取消等),并允许我们在操作完成后获取结果或处理异常。
import asyncio
async def set_future_result(future):
await asyncio.sleep(1) # 模拟异步操作
future.set_result("Future is done!") # 设置结果
async def main():
# 创建一个 Future 对象
future = asyncio.Future()
# 启动一个任务来设置 Future 的结果
asyncio.create_task(set_future_result(future))
# 等待 Future 完成并获取结果
print("Waiting for future...")
result = await future
print(f"Future result: {result}")
asyncio.run(main())
核心方法
方法名 | 描述 | 返回值/行为 |
---|---|---|
set_result(result) |
设置 Future 的结果,并标记为完成。 |
无返回值。Future 的状态变为完成,等待中的 await 会继续执行。 |
set_exception(exception) |
设置 Future 的异常,并标记为完成。 |
无返回值。Future 的状态变为完成,等待中的 await 会抛出异常。 |
result() |
获取 Future 的结果。 |
如果 Future 已完成,返回结果;如果未完成,阻塞直到完成;如果取消或异常,抛出相应异常。 |
exception() |
获取 Future 的异常。 |
如果 Future 已完成且抛出异常,返回异常对象;否则返回 None 。 |
done() |
检查 Future 是否已完成(成功、失败或取消)。 |
返回 bool ,True 表示已完成,False 表示未完成。 |
cancelled() |
检查 Future 是否被取消。 |
返回 bool ,True 表示被取消,False 表示未被取消。 |
add_done_callback(callback) |
添加一个回调函数,当 Future 完成时调用。 |
无返回值。回调函数会在 Future 完成时被调用,接收 Future 对象作为参数。 |
cancel() |
取消 Future 。 |
返回 bool ,True 表示取消成功,False 表示 Future 已经完成或无法取消。 |
remove_done_callback(callback) |
移除已添加的回调函数。 | 返回被移除的回调函数的数量。 |
Task函数
task是Future
的子类,专门用于包装协程,并由事件循环调度执行。
使用
asyncio.create_task()
创建Task
。Task
会自动调度协程的执行。
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2) # 模拟耗时操作
print("Task 1 completed")
return "Result from Task 1"
async def task2():
print("Task 2 started")
await asyncio.sleep(1) # 模拟耗时操作
print("Task 2 completed")
return "Result from Task 2"
async def main():
# 创建两个 Task
t1 = asyncio.create_task(task1(), name="MyTask1") # 设置 Task 名称
t2 = asyncio.create_task(task2(), name="MyTask2")
# 打印 Task 名称
print(f"Task 1 name: {t1.get_name()}")
print(f"Task 2 name: {t2.get_name()}")
# 等待 Task 完成
await asyncio.sleep(0.1) # 让 Task 有机会启动
print("Tasks are running...")
# 获取 Task 结果
result1 = await t1
result2 = await t2
print(f"Result from Task 1: {result1}")
print(f"Result from Task 2: {result2}")
# 检查 Task 状态
print(f"Is Task 1 done? {t1.done()}")
print(f"Is Task 2 done? {t2.done()}")
# 运行主函数
asyncio.run(main())
常用方法
方法名 | 描述 | 返回值/行为 |
---|---|---|
done() |
检查 Task 是否已完成(成功、失败或取消)。 |
返回 bool ,True 表示已完成,False 表示未完成。 |
cancelled() |
检查 Task 是否被取消。 |
返回 bool ,True 表示被取消,False 表示未被取消。 |
result() |
获取 Task 的结果。 |
如果 Task 已完成,返回结果;如果未完成,阻塞直到完成;如果取消或异常,抛出相应异常。 |
exception() |
获取 Task 的异常。 |
如果 Task 已完成且抛出异常,返回异常对象;否则返回 None 。 |
add_done_callback(callback) |
添加一个回调函数,当 Task 完成时调用。 |
无返回值。回调函数会在 Task 完成时被调用,接收 Task 对象作为参数。 |
cancel() |
取消 Task 。 |
返回 bool ,True 表示取消成功,False 表示 Task 已经完成或无法取消。 |
get_name() |
获取 Task 的名称。 |
返回 Task 的名称(字符串)。 |
set_name(name) |
设置 Task 的名称。 |
无返回值。 |