折腾侠
技术教程

Python 异步编程完全指南:从原理到实战

折腾侠
2026/03/29 发布
13约 9 分钟1585 字 / 1007 词00

Python 异步编程完全指南:从原理到实战

引言

在现代软件开发中,性能和高并发处理能力是衡量系统质量的重要指标。Python 作为一门广泛使用的编程语言,其异步编程能力在 3.5 版本引入 INLINE_CODE_0 语法后得到了质的飞跃。本文将深入讲解 Python 异步编程的核心原理、使用方法和实际应用场景,帮助你真正掌握这项重要技能。

一、什么是异步编程?

1.1 同步与异步的区别

同步编程是最直观的编程方式。当程序执行一个耗时操作(如网络请求、文件读写)时,会阻塞等待操作完成,然后继续执行下一行代码。这种方式简单易懂,但在处理多个耗时操作时效率低下。

异步编程则允许程序在等待某个操作完成时,转而执行其他任务。当等待的操作完成后,程序会收到通知并继续处理结果。这种方式特别适合 I/O 密集型应用。

1.2 为什么需要异步编程?

考虑一个实际场景:你需要从 100 个不同的 API 端点获取数据。使用同步方式,每个请求需要等待前一个完成,总耗时是单个请求时间的 100 倍。而使用异步方式,可以同时发起所有请求,总耗时仅相当于最慢的那个请求的时间。

二、asyncio 核心概念

2.1 Event Loop(事件循环)

事件循环是异步编程的心脏。它负责调度和执行异步任务,监控 I/O 事件,并在适当的时候唤醒等待的任务。可以把它想象成一个餐厅的服务员,不断巡视各个餐桌,为需要的顾客提供服务。

2.2 Coroutine(协程)

协程是一种特殊的函数,可以在执行过程中暂停和恢复。使用 INLINE_CODE_1 定义的函数就是协程函数,调用它返回一个协程对象。

2.3 Task(任务)

Task 是对协程的封装,用于在事件循环中调度执行。创建 Task 后,协程会立即开始执行,而不是等待被 await。

2.4 await 关键字

INLINE_CODE_2 用于等待一个可等待对象(awaitable)完成。当遇到 INLINE_CODE_3 时,当前协程会暂停执行,将控制权交还给事件循环,让其他任务有机会运行。

三、基础语法与实践

3.1 第一个异步程序

Python
import asyncio
import time

async def say_hello(delay, name):
    """异步问候函数"""
    await asyncio.sleep(delay)  # 模拟耗时操作
    print(f"Hello, {name}!")
    return f"Greeted {name}"

async def main():
    # 方式一:顺序执行
    print("=== 顺序执行 ===")
    start = time.time()
    await say_hello(1, "Alice")
    await say_hello(1, "Bob")
    print(f"耗时:{time.time() - start:.2f}秒")
    
    # 方式二:并发执行
    print("\n=== 并发执行 ===")
    start = time.time()
    task1 = asyncio.create_task(say_hello(1, "Alice"))
    task2 = asyncio.create_task(say_hello(1, "Bob"))
    result1 = await task1
    result2 = await task2
    print(f"{result1}, {result2}")
    print(f"耗时:{time.time() - start:.2f}秒")

if __name__ == "__main__":
    asyncio.run(main())

输出结果

=== 顺序执行 ===
Hello, Alice!
Hello, Bob!
耗时:2.00 秒

=== 并发执行 ===
Hello, Alice!
Hello, Bob!
耗时:1.00 秒

这个例子清晰地展示了并发执行的优势:两个 1 秒的任务,顺序执行需要 2 秒,而并发执行只需要 1 秒。

3.2 并发控制:gather 与 wait

当需要同时执行多个任务时,INLINE_CODE_4 是最常用的工具:

Python
import asyncio
import random

async def fetch_data(url_id):
    """模拟从 API 获取数据"""
    delay = random.uniform(0.5, 2.0)
    await asyncio.sleep(delay)
    return f"Data from {url_id} (took {delay:.2f}s)"

async def main():
    # 创建 10 个并发任务
    tasks = [fetch_data(f"API-{i}") for i in range(10)]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    
    for result in results:
        print(result)

asyncio.run(main())

INLINE_CODE_5 的优势:

  • 简洁的语法,一行代码等待多个任务
  • 自动收集所有结果并返回列表
  • 可以通过 INLINE_CODE_6 捕获异常而不中断其他任务

3.3 异常处理

异步编程中的异常处理需要特别注意:

Python
import asyncio

async def risky_task(task_id, should_fail=False):
    await asyncio.sleep(0.5)
    if should_fail:
        raise ValueError(f"Task {task_id} failed!")
    return f"Task {task_id} succeeded"

async def main():
    # 方式一:异常会传播到 gather
    try:
        results = await asyncio.gather(
            risky_task(1),
            risky_task(2, should_fail=True),
            risky_task(3)
        )
    except ValueError as e:
        print(f"捕获异常:{e}")
    
    # 方式二:收集异常而不中断
    print("\n使用 return_exceptions=True:")
    results = await asyncio.gather(
        risky_task(1),
        risky_task(2, should_fail=True),
        risky_task(3),
        return_exceptions=True
    )
    
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"任务 {i} 异常:{result}")
        else:
            print(f"任务 {i} 结果:{result}")

asyncio.run(main())

四、实际应用场景

4.1 场景一:批量 API 调用

这是异步编程最典型的应用场景。假设你需要从多个数据源获取信息:

Python
import asyncio
import aiohttp
from datetime import datetime

async def fetch_weather(session, city):
    """获取城市天气"""
    url = f"https://api.weather.com/{city}"
    async with session.get(url) as response:
        data = await response.json()
        return {"city": city, "temp": data.get("temp", "N/A")}

async def fetch_stock(session, symbol):
    """获取股票价格"""
    url = f"https://api.finance.com/stock/{symbol}"
    async with session.get(url) as response:
        data = await response.json()
        return {"symbol": symbol, "price": data.get("price", "N/A")}

async def fetch_news(session, topic):
    """获取相关新闻"""
    url = f"https://api.news.com/search?q={topic}"
    async with session.get(url) as response:
        data = await response.json()
        return {"topic": topic, "count": data.get("total", 0)}

async def dashboard():
    async with aiohttp.ClientSession() as session:
        # 并发获取所有数据
        tasks = [
            fetch_weather(session, "Beijing"),
            fetch_weather(session, "Shanghai"),
            fetch_stock(session, "AAPL"),
            fetch_stock(session, "GOOGL"),
            fetch_news(session, "AI")
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理结果
        for result in results:
            if isinstance(result, Exception):
                print(f"请求失败:{result}")
            else:
                print(f"获取数据:{result}")

# asyncio.run(dashboard())

4.2 场景二:WebSocket 实时通信

异步编程是 WebSocket 应用的天然选择:

Python
import asyncio
import websockets

async def websocket_handler(websocket, path):
    """处理 WebSocket 连接"""
    print(f"新连接:{path}")
    
    try:
        async for message in websocket:
            print(f"收到消息:{message}")
            
            # 模拟处理
            await asyncio.sleep(0.1)
            
            # 发送响应
            response = f"Echo: {message}"
            await websocket.send(response)
            
    except websockets.exceptions.ConnectionClosed:
        print("连接关闭")

async def broadcast_server():
    """WebSocket 服务器"""
    async with websockets.serve(websocket_handler, "localhost", 8765):
        print("WebSocket 服务器运行在 ws://localhost:8765")
        await asyncio.Future()  # 永久运行

# asyncio.run(broadcast_server())

4.3 场景三:异步数据库操作

使用异步数据库驱动可以显著提升数据库密集型应用的性能:

Python
import asyncio
import asyncpg

async def fetch_user_data(pool, user_id):
    """异步获取用户数据"""
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1",
            user_id
        )
        return dict(row) if row else None

async def batch_user_query(user_ids):
    """批量查询用户数据"""
    # 创建连接池
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/db",
        min_size=5,
        max_size=20
    )
    
    try:
        # 并发查询所有用户
        tasks = [fetch_user_data(pool, uid) for uid in user_ids]
        results = await asyncio.gather(*tasks)
        
        return [r for r in results if r is not None]
    finally:
        await pool.close()

# users = asyncio.run(batch_user_query([1, 2, 3, 4, 5]))

4.4 场景四:生产者 - 消费者模式

使用 INLINE_CODE_7 实现高效的任务队列:

Python
import asyncio
import random

async def producer(queue, task_id):
    """生产者:生成任务"""
    for i in range(5):
        item = f"Task-{task_id}-Item-{i}"
        await queue.put(item)
        print(f"生产:{item}")
        await asyncio.sleep(random.uniform(0.1, 0.3))
    
    await queue.put(None)  # 结束标记

async def consumer(queue, consumer_id):
    """消费者:处理任务"""
    while True:
        item = await queue.get()
        
        if item is None:  # 收到结束标记
            queue.task_done()
            break
        
        # 模拟处理
        await asyncio.sleep(random.uniform(0.2, 0.5))
        print(f"消费者{consumer_id} 处理:{item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    
    # 创建生产者和消费者
    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
    
    # 等待生产者完成
    await asyncio.gather(*producers)
    
    # 等待队列处理完成
    await queue.join()
    
    # 取消消费者
    for c in consumers:
        c.cancel()

# asyncio.run(main())

五、性能优化技巧

5.1 使用信号量控制并发数

当需要限制并发数量时(如 API 有速率限制),使用 INLINE_CODE_8

Python
import asyncio
import aiohttp

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:  # 获取信号量
        async with session.get(url) as response:
            return await response.text()

async def main():
    semaphore = asyncio.Semaphore(5)  # 最多 5 个并发
    
    async with aiohttp.ClientSession() as session:
        urls = [f"https://example.com/page{i}" for i in range(100)]
        tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
        
    print(f"完成 {len(results)} 个请求")

# asyncio.run(main())

5.2 超时处理

使用 INLINE_CODE_9 设置超时:

Python
import asyncio

async def slow_operation():
    await asyncio.sleep(10)  # 模拟慢操作
    return "完成"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!")

# asyncio.run(main())

5.3 使用 asyncio.as_completed()

当需要按完成顺序处理结果时:

Python
import asyncio
import random

async def task(task_id):
    delay = random.uniform(0.5, 3.0)
    await asyncio.sleep(delay)
    return f"Task-{task_id} 完成(耗时{delay:.2f}s)"

async def main():
    tasks = [asyncio.create_task(task(i)) for i in range(5)]
    
    # 按完成顺序处理
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

# asyncio.run(main())

六、常见陷阱与最佳实践

6.1 避免阻塞事件循环

错误示例

Python
async def bad_example():
    time.sleep(1)  # ❌ 阻塞整个事件循环

正确示例

Python
async def good_example():
    await asyncio.sleep(1)  # ✅ 非阻塞等待

6.2 正确使用 asyncio.create_task()

Python
# ❌ 错误:协程不会被调度执行
async def wrong():
    coro = say_hello(1, "Alice")  # 只是创建了协程对象
    await asyncio.sleep(2)
    await coro  # 2 秒后才开始执行

# ✅ 正确:立即调度执行
async def right():
    task = asyncio.create_task(say_hello(1, "Alice"))  # 立即开始
    await asyncio.sleep(2)  # 同时可以做其他事
    await task

6.3 资源清理

使用异步上下文管理器确保资源正确释放:

Python
import aiohttp

async def fetch_data():
    async with aiohttp.ClientSession() as session:  # 自动关闭
        async with session.get("https://example.com") as response:
            return await response.text()
    # session 自动关闭,无需手动 cleanup

七、总结

Python 异步编程是构建高性能 I/O 密集型应用的关键技术。通过本文的学习,你应该掌握了:

  1. 核心概念:事件循环、协程、Task、await 的工作原理
  2. 基础语法:async/await 的使用方法和并发控制
  3. 实际场景:API 调用、WebSocket、数据库、任务队列等应用
  4. 优化技巧:信号量、超时处理、按完成顺序处理
  5. 最佳实践:避免阻塞、正确创建任务、资源清理

异步编程的学习曲线较陡,但一旦掌握,将大大提升你构建高性能应用的能力。建议从简单的场景开始实践,逐步深入复杂的应用。

参考资源


本文示例代码均在 Python 3.8+ 环境下测试通过。实际使用时请根据具体版本调整。

分享到:

如果这篇文章对你有帮助,欢迎请作者喝杯咖啡 ☕

加载评论中...