Python 异步编程完全指南:从原理到实战
Python 异步编程完全指南:从原理到实战
引言
在现代软件开发中,性能和高并发处理能力是衡量系统质量的重要指标。Python 作为一门广泛使用的编程语言,其异步编程能力在 3.5 版本引入 INLINE_CODE_0 语法后得到了质的飞跃。本文将深入讲解 Python 异步编程的核心原理、使用方法和实际应用场景,帮助你真正掌握这项重要技能。
一、什么是异步编程?
1.1 同步与异步的区别
同步编程是最直观的编程方式。当程序执行一个耗时操作(如网络请求、文件读写)时,会阻塞等待操作完成,然后继续执行下一行代码。这种方式简单易懂,但在处理多个耗时操作时效率低下。
异步编程则允许程序在等待某个操作完成时,转而执行其他任务。当等待的操作完成后,程序会收到通知并继续处理结果。这种方式特别适合 I/O 密集型应用。
1.2 为什么需要异步编程?
考虑一个实际场景:你需要从 100 个不同的 API 端点获取数据。使用同步方式,每个请求需要等待前一个完成,总耗时是单个请求时间的 100 倍。而使用异步方式,可以同时发起所有请求,总耗时仅相当于最慢的那个请求的时间。
二、asyncio 核心概念
2.1 Event Loop(事件循环)
事件循环是异步编程的心脏。它负责调度和执行异步任务,监控 I/O 事件,并在适当的时候唤醒等待的任务。可以把它想象成一个餐厅的服务员,不断巡视各个餐桌,为需要的顾客提供服务。
2.2 Coroutine(协程)
协程是一种特殊的函数,可以在执行过程中暂停和恢复。使用 INLINE_CODE_1 定义的函数就是协程函数,调用它返回一个协程对象。
2.3 Task(任务)
Task 是对协程的封装,用于在事件循环中调度执行。创建 Task 后,协程会立即开始执行,而不是等待被 await。
2.4 await 关键字
INLINE_CODE_2 用于等待一个可等待对象(awaitable)完成。当遇到 INLINE_CODE_3 时,当前协程会暂停执行,将控制权交还给事件循环,让其他任务有机会运行。
三、基础语法与实践
3.1 第一个异步程序
import asyncio
import time
async def say_hello(delay, name):
"""异步问候函数"""
await asyncio.sleep(delay) # 模拟耗时操作
print(f"Hello, {name}!")
return f"Greeted {name}"
async def main():
# 方式一:顺序执行
print("=== 顺序执行 ===")
start = time.time()
await say_hello(1, "Alice")
await say_hello(1, "Bob")
print(f"耗时:{time.time() - start:.2f}秒")
# 方式二:并发执行
print("\n=== 并发执行 ===")
start = time.time()
task1 = asyncio.create_task(say_hello(1, "Alice"))
task2 = asyncio.create_task(say_hello(1, "Bob"))
result1 = await task1
result2 = await task2
print(f"{result1}, {result2}")
print(f"耗时:{time.time() - start:.2f}秒")
if __name__ == "__main__":
asyncio.run(main())
输出结果:
=== 顺序执行 ===
Hello, Alice!
Hello, Bob!
耗时:2.00 秒
=== 并发执行 ===
Hello, Alice!
Hello, Bob!
耗时:1.00 秒
这个例子清晰地展示了并发执行的优势:两个 1 秒的任务,顺序执行需要 2 秒,而并发执行只需要 1 秒。
3.2 并发控制:gather 与 wait
当需要同时执行多个任务时,INLINE_CODE_4 是最常用的工具:
import asyncio
import random
async def fetch_data(url_id):
"""模拟从 API 获取数据"""
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
return f"Data from {url_id} (took {delay:.2f}s)"
async def main():
# 创建 10 个并发任务
tasks = [fetch_data(f"API-{i}") for i in range(10)]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())
INLINE_CODE_5 的优势:
- 简洁的语法,一行代码等待多个任务
- 自动收集所有结果并返回列表
- 可以通过 INLINE_CODE_6 捕获异常而不中断其他任务
3.3 异常处理
异步编程中的异常处理需要特别注意:
import asyncio
async def risky_task(task_id, should_fail=False):
await asyncio.sleep(0.5)
if should_fail:
raise ValueError(f"Task {task_id} failed!")
return f"Task {task_id} succeeded"
async def main():
# 方式一:异常会传播到 gather
try:
results = await asyncio.gather(
risky_task(1),
risky_task(2, should_fail=True),
risky_task(3)
)
except ValueError as e:
print(f"捕获异常:{e}")
# 方式二:收集异常而不中断
print("\n使用 return_exceptions=True:")
results = await asyncio.gather(
risky_task(1),
risky_task(2, should_fail=True),
risky_task(3),
return_exceptions=True
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"任务 {i} 异常:{result}")
else:
print(f"任务 {i} 结果:{result}")
asyncio.run(main())
四、实际应用场景
4.1 场景一:批量 API 调用
这是异步编程最典型的应用场景。假设你需要从多个数据源获取信息:
import asyncio
import aiohttp
from datetime import datetime
async def fetch_weather(session, city):
"""获取城市天气"""
url = f"https://api.weather.com/{city}"
async with session.get(url) as response:
data = await response.json()
return {"city": city, "temp": data.get("temp", "N/A")}
async def fetch_stock(session, symbol):
"""获取股票价格"""
url = f"https://api.finance.com/stock/{symbol}"
async with session.get(url) as response:
data = await response.json()
return {"symbol": symbol, "price": data.get("price", "N/A")}
async def fetch_news(session, topic):
"""获取相关新闻"""
url = f"https://api.news.com/search?q={topic}"
async with session.get(url) as response:
data = await response.json()
return {"topic": topic, "count": data.get("total", 0)}
async def dashboard():
async with aiohttp.ClientSession() as session:
# 并发获取所有数据
tasks = [
fetch_weather(session, "Beijing"),
fetch_weather(session, "Shanghai"),
fetch_stock(session, "AAPL"),
fetch_stock(session, "GOOGL"),
fetch_news(session, "AI")
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for result in results:
if isinstance(result, Exception):
print(f"请求失败:{result}")
else:
print(f"获取数据:{result}")
# asyncio.run(dashboard())
4.2 场景二:WebSocket 实时通信
异步编程是 WebSocket 应用的天然选择:
import asyncio
import websockets
async def websocket_handler(websocket, path):
"""处理 WebSocket 连接"""
print(f"新连接:{path}")
try:
async for message in websocket:
print(f"收到消息:{message}")
# 模拟处理
await asyncio.sleep(0.1)
# 发送响应
response = f"Echo: {message}"
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
print("连接关闭")
async def broadcast_server():
"""WebSocket 服务器"""
async with websockets.serve(websocket_handler, "localhost", 8765):
print("WebSocket 服务器运行在 ws://localhost:8765")
await asyncio.Future() # 永久运行
# asyncio.run(broadcast_server())
4.3 场景三:异步数据库操作
使用异步数据库驱动可以显著提升数据库密集型应用的性能:
import asyncio
import asyncpg
async def fetch_user_data(pool, user_id):
"""异步获取用户数据"""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)
return dict(row) if row else None
async def batch_user_query(user_ids):
"""批量查询用户数据"""
# 创建连接池
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=5,
max_size=20
)
try:
# 并发查询所有用户
tasks = [fetch_user_data(pool, uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
finally:
await pool.close()
# users = asyncio.run(batch_user_query([1, 2, 3, 4, 5]))
4.4 场景四:生产者 - 消费者模式
使用 INLINE_CODE_7 实现高效的任务队列:
import asyncio
import random
async def producer(queue, task_id):
"""生产者:生成任务"""
for i in range(5):
item = f"Task-{task_id}-Item-{i}"
await queue.put(item)
print(f"生产:{item}")
await asyncio.sleep(random.uniform(0.1, 0.3))
await queue.put(None) # 结束标记
async def consumer(queue, consumer_id):
"""消费者:处理任务"""
while True:
item = await queue.get()
if item is None: # 收到结束标记
queue.task_done()
break
# 模拟处理
await asyncio.sleep(random.uniform(0.2, 0.5))
print(f"消费者{consumer_id} 处理:{item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# 创建生产者和消费者
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# 等待生产者完成
await asyncio.gather(*producers)
# 等待队列处理完成
await queue.join()
# 取消消费者
for c in consumers:
c.cancel()
# asyncio.run(main())
五、性能优化技巧
5.1 使用信号量控制并发数
当需要限制并发数量时(如 API 有速率限制),使用 INLINE_CODE_8:
import asyncio
import aiohttp
async def fetch_with_limit(session, url, semaphore):
async with semaphore: # 获取信号量
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(5) # 最多 5 个并发
async with aiohttp.ClientSession() as session:
urls = [f"https://example.com/page{i}" for i in range(100)]
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
# asyncio.run(main())
5.2 超时处理
使用 INLINE_CODE_9 设置超时:
import asyncio
async def slow_operation():
await asyncio.sleep(10) # 模拟慢操作
return "完成"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("操作超时!")
# asyncio.run(main())
5.3 使用 asyncio.as_completed()
当需要按完成顺序处理结果时:
import asyncio
import random
async def task(task_id):
delay = random.uniform(0.5, 3.0)
await asyncio.sleep(delay)
return f"Task-{task_id} 完成(耗时{delay:.2f}s)"
async def main():
tasks = [asyncio.create_task(task(i)) for i in range(5)]
# 按完成顺序处理
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
# asyncio.run(main())
六、常见陷阱与最佳实践
6.1 避免阻塞事件循环
错误示例:
async def bad_example():
time.sleep(1) # ❌ 阻塞整个事件循环
正确示例:
async def good_example():
await asyncio.sleep(1) # ✅ 非阻塞等待
6.2 正确使用 asyncio.create_task()
# ❌ 错误:协程不会被调度执行
async def wrong():
coro = say_hello(1, "Alice") # 只是创建了协程对象
await asyncio.sleep(2)
await coro # 2 秒后才开始执行
# ✅ 正确:立即调度执行
async def right():
task = asyncio.create_task(say_hello(1, "Alice")) # 立即开始
await asyncio.sleep(2) # 同时可以做其他事
await task
6.3 资源清理
使用异步上下文管理器确保资源正确释放:
import aiohttp
async def fetch_data():
async with aiohttp.ClientSession() as session: # 自动关闭
async with session.get("https://example.com") as response:
return await response.text()
# session 自动关闭,无需手动 cleanup
七、总结
Python 异步编程是构建高性能 I/O 密集型应用的关键技术。通过本文的学习,你应该掌握了:
- 核心概念:事件循环、协程、Task、await 的工作原理
- 基础语法:async/await 的使用方法和并发控制
- 实际场景:API 调用、WebSocket、数据库、任务队列等应用
- 优化技巧:信号量、超时处理、按完成顺序处理
- 最佳实践:避免阻塞、正确创建任务、资源清理
异步编程的学习曲线较陡,但一旦掌握,将大大提升你构建高性能应用的能力。建议从简单的场景开始实践,逐步深入复杂的应用。
参考资源
- Python 官方 asyncio 文档:https://docs.python.org/3/library/asyncio.html
- aiohttp 文档:https://docs.aiohttp.org/
- 《Python 异步编程实战》相关书籍
本文示例代码均在 Python 3.8+ 环境下测试通过。实际使用时请根据具体版本调整。