Python 高级教程:异步与并发编程详解
在现代软件开发中,高性能和响应速度是应用程序的关键要求。Python 作为一门流行的编程语言,提供了强大的异步和并发编程工具来满足这些需求。本教程将深入探讨 Python 中的异步编程和并发编程,通过实际示例帮助你掌握这些高级概念。
目录
异步编程 asyncio 基础 事件循环 async/await 语法 任务和 Future 异步上下文管理器 并发编程 多线程编程 多进程编程 线程池和进程池 锁和同步原语 并发模式设计
1. 异步编程
异步编程是一种编程范式,它允许程序在等待某些操作完成时继续执行其他任务,而不是被阻塞。在 Python 中,asyncio 是实现异步编程的核心库。
1.1 asyncio
协程基础
协程(Coroutine)是异步编程的核心概念。它们是可以暂停执行的特殊函数,允许其他代码在等待期间运行。在使用协程时,需要注意以下关键点:
使用 async def
定义协程函数使用 await
等待协程执行完成理解 await
和asyncio.create_task()
的区别:await
直接等待协程完成create_task()
创建任务并允许并发执行
import asyncio
import time
async def make_coffee():
print("开始煮咖啡...")
# 模拟煮咖啡的过程
await asyncio.sleep(3)
print("咖啡准备好了!")
return "一杯香浓的咖啡"
async def make_toast():
print("开始烤面包...")
# 模拟烤面包的过程
await asyncio.sleep(2)
print("面包烤好了!")
return "一片金黄的吐司"
async def prepare_breakfast():
# 创建任务实现真正的并发
coffee_task = asyncio.create_task(make_coffee())
toast_task = asyncio.create_task(make_toast())
try:
# 等待所有任务完成
coffee, toast = await asyncio.gather(coffee_task, toast_task)
print(f"早餐准备完成:{coffee}和{toast}")
except Exception as e:
print(f"准备早餐时发生错误:{e}")
# 取消未完成的任务
for task in [coffee_task, toast_task]:
if not task.done():
task.cancel()
if __name__ == "__main__":
asyncio.run(prepare_breakfast())
事件循环
事件循环是 asyncio 的核心,它负责协调所有异步操作的执行。以下示例展示了如何正确使用事件循环:
import asyncio
import time
async def check_weather(city):
print(f"正在查询{city}的天气...")
await asyncio.sleep(1) # 模拟 API 调用
return f"{city}的天气晴朗"
async def main():
try:
# 获取事件循环
loop = asyncio.get_running_loop()
# 创建多个任务
cities = ["北京", "上海", "广州", "深圳"]
tasks = [loop.create_task(check_weather(city)) for city in cities]
# 使用超时机制
try:
# 设置 5 秒超时
results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=5.0)
for result in results:
print(result)
except asyncio.TimeoutError:
print("查询超时!")
# 取消所有未完成的任务
for task in tasks:
if not task.done():
task.cancel()
except Exception as e:
print(f"发生错误:{e}")
if __name__ == "__main__":
asyncio.run(main())
异步上下文管理器
下面是一个改进的异步数据库连接示例,展示了正确的资源管理和错误处理:
import asyncio
import contextlib
class AsyncDatabase:
def __init__(self):
self.connected = False
async def __aenter__(self):
try:
print("连接数据库...")
await asyncio.sleep(1) # 模拟连接过程
self.connected = True
return self
except Exception as e:
print(f"连接数据库失败:{e}")
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.connected:
print("关闭数据库连接...")
try:
await asyncio.sleep(0.5) # 模拟关闭过程
self.connected = False
except Exception as e:
print(f"关闭数据库连接时发生错误:{e}")
raise
async def query(self, sql):
if not self.connected:
raise RuntimeError("数据库未连接")
await asyncio.sleep(1) # 模拟查询
return f"查询结果:{sql}"
async def main():
try:
async with AsyncDatabase() as db:
result = await db.query("SELECT * FROM users")
print(result)
except Exception as e:
print(f"数据库操作失败:{e}")
if __name__ == "__main__":
asyncio.run(main())
1.2 并发编程
在开始并发编程部分之前,需要理解 Python 中的 GIL(全局解释器锁):
GIL 确保同一时刻只有一个线程执行 Python 字节码 这意味着 Python 的多线程在 CPU 密集型任务上并不能提供真正的并行性 对于 I/O 密集型任务,多线程仍然是有效的 对于 CPU 密集型任务,应该使用多进程
多线程编程
以下是一个改进的多线程示例,包含了正确的线程管理和退出机制:
import threading
import time
from queue import Queue
import signal
import sys
class CoffeeShop:
def __init__(self):
self.orders_queue = Queue()
self.ready_orders = Queue()
self.should_stop = threading.Event()
# 设置信号处理
signal.signal(signal.SIGINT, self.handle_shutdown)
signal.signal(signal.SIGTERM, self.handle_shutdown)
def handle_shutdown(self, signum, frame):
print("\n正在关闭咖啡店...")
self.should_stop.set()
def take_order(self, order_id):
if not self.should_stop.is_set():
print(f"收到订单 #{order_id}")
self.orders_queue.put(order_id)
def make_coffee(self):
while not self.should_stop.is_set():
try:
order_id = self.orders_queue.get(timeout=1)
print(f"正在制作订单 #{order_id} 的咖啡...")
time.sleep(2) # 模拟制作咖啡的时间
self.ready_orders.put(order_id)
print(f"订单 #{order_id} 的咖啡制作完成!")
self.orders_queue.task_done()
except Queue.Empty:
continue
except Exception as e:
print(f"制作咖啡时发生错误:{e}")
def serve_coffee(self):
while not self.should_stop.is_set():
try:
order_id = self.ready_orders.get(timeout=1)
print(f"服务员正在派送订单 #{order_id}")
time.sleep(1) # 模拟派送时间
print(f"订单 #{order_id} 已送达!")
self.ready_orders.task_done()
except Queue.Empty:
continue
except Exception as e:
print(f"派送咖啡时发生错误:{e}")
def run_coffee_shop():
shop = CoffeeShop()
# 创建工作线程
threads = [
threading.Thread(target=shop.make_coffee, name="Barista"),
threading.Thread(target=shop.serve_coffee, name="Server")
]
# 将线程设置为守护线程
for thread in threads:
thread.daemon = True
thread.start()
try:
# 模拟接收订单
for i in range(5):
shop.take_order(i)
time.sleep(0.5)
# 等待所有订单处理完成
shop.orders_queue.join()
shop.ready_orders.join()
except KeyboardInterrupt:
print("\n收到中断信号,正在关闭...")
finally:
# 设置停止标志
shop.should_stop.set()
# 等待所有线程完成
for thread in threads:
thread.join(timeout=2)
print("咖啡店已关闭")
if __name__ == "__main__":
run_coffee_shop()
多进程编程
以下是一个改进的多进程示例,包含了更好的资源管理和错误处理:
import multiprocessing as mp
import time
import os
import signal
from functools import partial
def init_worker():
# 忽略子进程的 SIGINT 信号
signal.signal(signal.SIGINT, signal.SIG_IGN)
def process_image(image_name, processing_time=2):
try:
print(f"进程 {os.getpid()} 开始处理图片:{image_name}")
time.sleep(processing_time) # 模拟图片处理
return f"{image_name} 处理完成"
except Exception as e:
print(f"处理图片 {image_name} 时发生错误:{e}")
return f"{image_name} 处理失败"
def batch_process_images(images, max_workers=None):
if max_workers is None:
# 设置进程池大小上限
max_workers = min(len(images), os.cpu_count() or 1, 4)
print(f"使用 {max_workers} 个进程进行处理")
try:
# 创建进程池,设置初始化函数
with mp.Pool(processes=max_workers, initializer=init_worker) as pool:
# 使用进程池处理图片
results = pool.map_async(process_image, images)
try:
# 等待所有任务完成,设置超时
processed_results = results.get(timeout=30)
return processed_results
except mp.TimeoutError:
print("处理超时!")
pool.terminate()
return []
except KeyboardInterrupt:
print("\n检测到中断信号,正在优雅关闭...")
pool.terminate()
pool.join()
return []
except Exception as e:
print(f"批处理过程中发生错误:{e}")
return []
if __name__ == "__main__":
# 准备测试数据
test_images = [f"image_{i}.jpg" for i in range(4)]
try:
results = batch_process_images(test_images)
# 打印结果
for result in results:
print(result)
except Exception as e:
print(f"主程序发生错误:{e}")
线程安全和锁机制
以下是一个改进的银行账户示例,展示了正确的锁使用和死锁预防:
import threading
import time
from contextlib import contextmanager
class InsufficientFundsError(Exception):
pass
class BankAccount:
def __init__(self, balance):
self.balance = balance
self._lock = threading.RLock() # 使用可重入锁
@contextmanager
def _account_lock(self, timeout=1.0):
"""安全的获取锁的上下文管理器"""
if not self._lock.acquire(timeout=timeout):
raise TimeoutError("无法获取账户锁,操作超时")
try:
yield
finally:
self._lock.release()
def get_balance(self):
"""安全地获取余额"""
with self._account_lock():
return self.balance
def deposit(self, amount):
"""存款操作"""
if amount <= 0:
raise ValueError("存款金额必须大于 0")
with self._account_lock():
print(f"存款 ¥{amount}...")
# 模拟处理时间
time.sleep(0.1)
self.balance += amount
print(f"存款完成,当前余额:¥{self.balance}")
def withdraw(self, amount):
"""取款操作"""
if amount <= 0:
raise ValueError("取款金额必须大于 0")
with self._account_lock():
if self.balance >= amount:
print(f"取款 ¥{amount}...")
# 模拟处理时间
time.sleep(0.1)
self.balance -= amount
print(f"取款完成,当前余额:¥{self.balance}")
else:
raise InsufficientFundsError("余额不足!")
def transfer(self, other_account, amount):
"""
转账操作
使用固定的锁定顺序来防止死锁
"""
if self is other_account:
raise ValueError("不能转账给自己")
if amount <= 0:
raise ValueError("转账金额必须大于 0")
# 始终按照内存地址顺序获取锁,防止死锁
first_account, second_account = sorted([self, other_account],
key=lambda x: id(x))
with first_account._account_lock():
with second_account._account_lock():
if self.balance >= amount:
print(f"转账 ¥{amount}...")
self.balance -= amount
other_account.balance += amount
print(f"转账完成")
else:
raise InsufficientFundsError("余额不足!")
def test_bank_account():
# 创建测试账户
account1 = BankAccount(1000)
account2 = BankAccount(500)
# 创建多个操作线程
threads = []
# 存款操作
threads.append(threading.Thread(
target=account1.deposit,
args=(500,),
name="Deposit-1"
))
# 取款操作
threads.append(threading.Thread(
target=account1.withdraw,
args=(300,),
name="Withdraw-1"
))
# 转账操作
threads.append(threading.Thread(
target=account1.transfer,
args=(account2, 400),
name="Transfer-1-to-2"
))
# 启动所有线程
for thread in threads:
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 打印最终状态
print("\n最终状态:")
print(f"账户 1 余额:¥{account1.get_balance()}")
print(f"账户 2 余额:¥{account2.get_balance()}")
if __name__ == "__main__":
test_bank_account()
常见陷阱和调试技巧
异步编程陷阱:
避免在协程中使用阻塞操作 正确处理异步上下文管理器 注意任务取消和清理 使用 asyncio.shield()
保护关键操作
多线程陷阱:
理解 GIL 的限制 避免死锁 正确使用线程安全的数据结构 合理设置超时机制
多进程陷阱:
注意进程间通信的开销 合理设置进程池大小 正确处理进程终止 避免共享状态问题
调试技巧:
使用 logging
模块记录日志设置适当的超时机制 使用 threading.current_thread().name
跟踪线程使用 multiprocessing.current_process().name
跟踪进程
总结
本教程详细介绍了 Python 中的异步编程和并发编程的核心概念和实践应用:
异步编程(asyncio)
通过协程实现非阻塞操作 使用事件循环管理异步任务 async/await语法简化异步代码 任务和 Future 处理异步操作结果 异步上下文管理器管理资源
并发编程
多线程处理 I/O 密集型任务 多进程处理 CPU 密集型任务 线程池和进程池管理并发任务 锁机制确保数据安全 并发设计模式解决实际问题
在选择并发策略时,需要考虑以下因素:
I/O 密集型任务:选择 asyncio 或多线程 CPU 密集型任务:选择多进程 混合型任务:考虑组合使用不同策略
最佳实践:
始终进行适当的错误处理 实现优雅的关闭机制 使用超时机制防止无限等待 注意资源管理和清理 选择合适的并发策略 注意线程安全性 使用锁和其他同步原语保护共享资源