概念解析
异步编程是一种
允许程序在等待 I/O 操作
(如网络请求、文件读写)时不被阻塞
,转而执行其他任务的编程范式。
在 Python 中,其核心实现概念如下:
协程(Coroutines)
- 定义:可暂停执行并在合适时机恢复的函数,通过
async def
关键字声明。- 特点:非抢占式调度,协程主动通过
await
让出执行权,适用于处理高并发I/O
场景。
事件循环(Event Loop)
- 定义:异步编程的 “调度器”,负责管理协程的执行顺序、监控 I/O 事件状态,并在事件就绪时恢复对应协程。
- 核心逻辑:循环检查可执行的协程任务,按事件触发顺序调度执行。
任务(Tasks)
- 定义:对协程的封装,代表一个独立的异步操作单元,通过
asyncio.create_task()
创建。- 功能:支持取消任务、等待任务完成(
await task
)或获取任务状态。
Future 对象
- 定义:表示一个尚未完成的异步操作结果,本质是协程间传递状态的载体。
- 作用:当协程需要等待某个异步操作的结果时,可通过
await Future
暂停执行,待操作完成后获取结果。
await 表达式
- 定义:协程中用于暂停执行的关键字,用于等待另一个协程、Task 或 Future 对象的完成。
- 示例:
result = await async_function()
,表示暂停当前协程,直到async_function
执行完毕并返回结果。
应用场景
异步编程尤其
适用于I/O 密集型任务
(如 HTTP 请求、数据库查询、文件操作等)。
通过减少线程切换开销和 CPU 闲置时间,显著提升程序的并发处理能力。
相比多线程编程,异步编程在高并发场景下更轻量、更高效。
asyncio库详解
Python 3.4
引入了asyncio
库,作为异步编程的核心组件,事件循环是asyncio
的核心。
在Windows
上使用ProactorEventLoop
,在Unix
上使用SelectorEventLoop
。
Unix系统(SelectorEventLoop)
- 基于
selectors 模块对底层 I/O 多路复用机制
(如select、poll、epoll、kqueue)的抽象。 - 采用 “就绪通知” 机制:监视文件描述符(如套接字)状态,当
I/O
操作就绪时(如可读 / 可写)通知应用程序。 - 优势场景1:擅长处理大量并发连接(如
Linux
的epoll机
制支持高效事件驱动)。 - 优势场景2:适用于网络服务器、高并发
I/O
场景。
Windows系统(ProactorEventLoop)
- 基于
Windows 专有 I/O 完成端口(IOCP)机制
,属于系统级异步I/O
框架。 - 采用 “完成通知” 机制:异步启动
I/O
操作,操作完成后由系统主动通知程序。 - 优势场景1:深度优化
Windows
平台特性,支持全类型异步I/O
操作(含文件I/O
)。 - 优势场景1:在
Windows
环境下性能通常优于SelectorEventLoop
。
两种事件循环的关键差异
维度 | SelectorEventLoop(Unix) | ProactorEventLoop(Windows) |
---|---|---|
通知机制 | 等待 I/O 就绪后执行操作(“询问式”:Can I read/write?) |
异步启动 I/O 操作,完成后被动接收通知(“回调式”:Notify when done) |
API 覆盖范围 | 部分平台可能不支持全类型文件 I/O 操作 |
原生支持 Windows 所有异步 I/O 操作(如管道、套接字、文件) |
性能特点 | Unix 系统下,依赖epoll/kqueue 等机制实现高效并发 |
Windows 下利用 IOCP 机制实现低延迟、高吞吐量 |
事件循环管理
import asyncio
# 获取事件循环
loop = asyncio.get_event_loop()
# 运行协程直到完成
loop.run_until_complete(my_coroutine())
# 运行事件循环直到stop()被调用
loop.run_forever()
# 关闭事件循环
loop.close()
协程定义与执行
async def fetch_data(url):
print(f"开始获取数据: {url}")
await asyncio.sleep(2) # 模拟I/O操作
print(f"数据获取完成: {url}")
return f"来自 {url} 的数据"
# Python 3.7+ 推荐方式
async def main():
result = await fetch_data("example.com")
print(result)
asyncio.run(main()) # Python 3.7+引入,简化了事件循环管理
任务创建与管理
async def main():
# 创建任务
task1 = asyncio.create_task(fetch_data("site1.com"))
task2 = asyncio.create_task(fetch_data("site2.com"))
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(results)
# 并发运行多个协程
results = await asyncio.gather(
fetch_data("site3.com"),
fetch_data("site4.com")
)
超时管理
async def main():
try:
# 设置超时
result = await asyncio.wait_for(fetch_data("example.com"), timeout=1.0)
except asyncio.TimeoutError:
print("操作超时")
同步与异步代码结合
import concurrent.futures
def cpu_bound_task(x):
# 计算密集型任务
return x * x
async def main():
# 使用线程池执行阻塞I/O
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_task, 10)
print(result)
高并发场景实战案例
案例1: 并发网络请求
import asyncio
import aiohttp
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 测试URLs
urls = [
"https://www.google.com",
"https://www.github.com",
"https://www.python.org",
# 可添加更多URL
] * 5 # 重复请求以增加数量
async def main():
start = time.time()
results = await fetch_all(urls)
end = time.time()
print(f"获取了 {len(results)} 个页面,耗时: {end - start:.2f} 秒")
# 运行
asyncio.run(main())
案例2: 异步数据库操作
使用
asyncpg
进行PostgreSQL
异步操作
import asyncio
import asyncpg
async def create_tables(conn):
await conn.execute('''
CREATE TABLE IF NOT EXISTS users(
id SERIAL PRIMARY KEY,
name TEXT,
email TEXT
)
''')
async def insert_users(conn, users):
# 批量插入
await conn.executemany(
'INSERT INTO users(name, email) VALUES($1, $2)',
users
)
async def fetch_users(conn):
return await conn.fetch('SELECT * FROM users')
async def main():
# 连接数据库
conn = await asyncpg.connect(
user='postgres',
password='password',
database='testdb',
host='127.0.0.1'
)
# 创建表
await create_tables(conn)
# 生成测试数据
test_users = [
('User1', 'user1@example.com'),
('User2', 'user2@example.com'),
('User3', 'user3@example.com'),
]
# 插入数据
await insert_users(conn, test_users)
# 查询数据
users = await fetch_users(conn)
for user in users:
print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")
# 关闭连接
await conn.close()
# 运行
asyncio.run(main())
案例3: 异步Web爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def parse(html):
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html, 'html.parser')
# 获取所有链接
links = [a.get('href') for a in soup.find_all('a') if a.get('href')]
return links
async def crawl(url, max_depth=2):
visited = set()
async def _crawl(current_url, depth):
if depth > max_depth or current_url in visited:
return
visited.add(current_url)
print(f"正在爬取: {current_url}")
try:
async with aiohttp.ClientSession() as session:
html = await fetch(session, current_url)
links = await parse(html)
# 过滤出同域名链接
base_url = '/'.join(current_url.split('/')[:3])
same_domain_links = [
link if link.startswith('http') else f"{base_url}{link}"
for link in links if link and (link.startswith('http') or link.startswith('/'))
]
# 创建子任务继续爬取
tasks = [
_crawl(link, depth + 1)
for link in same_domain_links[:5] # 限制每页最多爬5个链接
]
await asyncio.gather(*tasks)
except Exception as e:
print(f"爬取 {current_url} 出错: {e}")
await _crawl(url, 0)
return visited
async def main():
start = time.time()
visited = await crawl("https://python.org", max_depth=1)
end = time.time()
print(f"爬取了 {len(visited)} 个页面,耗时: {end - start:.2f} 秒")
# 运行
asyncio.run(main())
案例4: 异步API服务器处理大量并发请求
使用
FastAPI
构建高并发API
服务
from fastapi import FastAPI, BackgroundTasks
import asyncio
import uvicorn
import time
import random
app = FastAPI()
# 模拟数据库
db = {}
# 模拟异步数据库操作
async def db_operation(key, delay=None):
if delay is None:
delay = random.uniform(0.1, 0.5) # 随机延迟模拟真实场景
await asyncio.sleep(delay)
return db.get(key)
# 模拟耗时任务
async def process_item(item_id):
print(f"开始处理项目 {item_id}")
await asyncio.sleep(5) # 模拟耗时操作
print(f"项目 {item_id} 处理完成")
return {"item_id": item_id, "status": "processed"}
# 常规端点
@app.get("/items/{item_id}")
async def read_item(item_id: str):
result = await db_operation(item_id)
return {"item_id": item_id, "value": result}
# 批量操作端点
@app.get("/batch")
async def batch_operation(items: str):
item_ids = items.split(",")
tasks = [db_operation(item_id) for item_id in item_ids]
results = await asyncio.gather(*tasks)
return dict(zip(item_ids, results))
# 后台任务
@app.post("/items/{item_id}/process")
async def process(item_id: str, background_tasks: BackgroundTasks):
background_tasks.add_task(process_item, item_id)
return {"message": f"Processing item {item_id} in the background"}
# 负载测试端点
@app.get("/load-test/{count}")
async def load_test(count: int):
start = time.time()
# 创建多个并发任务
tasks = []
for i in range(count):
# 随机延迟
delay = random.uniform(0.1, 0.5)
tasks.append(asyncio.sleep(delay))
# 并发执行所有任务
await asyncio.gather(*tasks)
end = time.time()
return {
"tasks_completed": count,
"time_taken": f"{end - start:.2f} 秒",
"tasks_per_second": f"{count / (end - start):.2f}"
}
# 初始化一些测试数据
@app.on_event("startup")
async def startup_event():
for i in range(1000):
db[str(i)] = f"value-{i}"
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)