Python 高级教程:异步与并发编程详解

科技   2025-01-03 12:04   中国香港  

Python 高级教程:异步与并发编程详解

在现代软件开发中,高性能和响应速度是应用程序的关键要求。Python 作为一门流行的编程语言,提供了强大的异步和并发编程工具来满足这些需求。本教程将深入探讨 Python 中的异步编程和并发编程,通过实际示例帮助你掌握这些高级概念。

目录

  • 异步编程
    • asyncio 基础
    • 事件循环
    • async/await 语法
    • 任务和 Future
    • 异步上下文管理器
  • 并发编程
    • 多线程编程
    • 多进程编程
    • 线程池和进程池
    • 锁和同步原语
    • 并发模式设计

1. 异步编程

异步编程是一种编程范式,它允许程序在等待某些操作完成时继续执行其他任务,而不是被阻塞。在 Python 中,asyncio 是实现异步编程的核心库。

1.1 asyncio

协程基础

协程(Coroutine)是异步编程的核心概念。它们是可以暂停执行的特殊函数,允许其他代码在等待期间运行。在使用协程时,需要注意以下关键点:

  • 使用 async def 定义协程函数
  • 使用 await 等待协程执行完成
  • 理解 awaitasyncio.create_task() 的区别:
    • await 直接等待协程完成
    • create_task() 创建任务并允许并发执行
import asyncio
import time

async def make_coffee():
    print("开始煮咖啡...")
    # 模拟煮咖啡的过程
    await asyncio.sleep(3)
    print("咖啡准备好了!")
    return "一杯香浓的咖啡"

async def make_toast():
    print("开始烤面包...")
    # 模拟烤面包的过程
    await asyncio.sleep(2)
    print("面包烤好了!")
    return "一片金黄的吐司"

async def prepare_breakfast():
    # 创建任务实现真正的并发
    coffee_task = asyncio.create_task(make_coffee())
    toast_task = asyncio.create_task(make_toast())
    
    try:
        # 等待所有任务完成
        coffee, toast = await asyncio.gather(coffee_task, toast_task)
        print(f"早餐准备完成:{coffee}{toast}")
    except Exception as e:
        print(f"准备早餐时发生错误:{e}")
        # 取消未完成的任务
        for task in [coffee_task, toast_task]:
            if not task.done():
                task.cancel()

if __name__ == "__main__":
    asyncio.run(prepare_breakfast())
事件循环

事件循环是 asyncio 的核心,它负责协调所有异步操作的执行。以下示例展示了如何正确使用事件循环:

import asyncio
import time

async def check_weather(city):
    print(f"正在查询{city}的天气...")
    await asyncio.sleep(1)  # 模拟 API 调用
    return f"{city}的天气晴朗"

async def main():
    try:
        # 获取事件循环
        loop = asyncio.get_running_loop()
        
        # 创建多个任务
        cities = ["北京""上海""广州""深圳"]
        tasks = [loop.create_task(check_weather(city)) for city in cities]
        
        # 使用超时机制
        try:
            # 设置 5 秒超时
            results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=5.0)
            for result in results:
                print(result)
        except asyncio.TimeoutError:
            print("查询超时!")
            # 取消所有未完成的任务
            for task in tasks:
                if not task.done():
                    task.cancel()
    except Exception as e:
        print(f"发生错误:{e}")

if __name__ == "__main__":
    asyncio.run(main())
异步上下文管理器

下面是一个改进的异步数据库连接示例,展示了正确的资源管理和错误处理:

import asyncio
import contextlib

class AsyncDatabase:
    def __init__(self):
        self.connected = False
    
    async def __aenter__(self):
        try:
            print("连接数据库...")
            await asyncio.sleep(1)  # 模拟连接过程
            self.connected = True
            return self
        except Exception as e:
            print(f"连接数据库失败:{e}")
            raise
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.connected:
            print("关闭数据库连接...")
            try:
                await asyncio.sleep(0.5)  # 模拟关闭过程
                self.connected = False
            except Exception as e:
                print(f"关闭数据库连接时发生错误:{e}")
                raise
    
    async def query(self, sql):
        if not self.connected:
            raise RuntimeError("数据库未连接")
        await asyncio.sleep(1)  # 模拟查询
        return f"查询结果:{sql}"

async def main():
    try:
        async with AsyncDatabase() as db:
            result = await db.query("SELECT * FROM users")
            print(result)
    except Exception as e:
        print(f"数据库操作失败:{e}")

if __name__ == "__main__":
    asyncio.run(main())

1.2 并发编程

在开始并发编程部分之前,需要理解 Python 中的 GIL(全局解释器锁):

  • GIL 确保同一时刻只有一个线程执行 Python 字节码
  • 这意味着 Python 的多线程在 CPU 密集型任务上并不能提供真正的并行性
  • 对于 I/O 密集型任务,多线程仍然是有效的
  • 对于 CPU 密集型任务,应该使用多进程
多线程编程

以下是一个改进的多线程示例,包含了正确的线程管理和退出机制:

import threading
import time
from queue import Queue
import signal
import sys

class CoffeeShop:
    def __init__(self):
        self.orders_queue = Queue()
        self.ready_orders = Queue()
        self.should_stop = threading.Event()
        
        # 设置信号处理
        signal.signal(signal.SIGINT, self.handle_shutdown)
        signal.signal(signal.SIGTERM, self.handle_shutdown)
    
    def handle_shutdown(self, signum, frame):
        print("\n正在关闭咖啡店...")
        self.should_stop.set()
    
    def take_order(self, order_id):
        if not self.should_stop.is_set():
            print(f"收到订单 #{order_id}")
            self.orders_queue.put(order_id)
    
    def make_coffee(self):
        while not self.should_stop.is_set():
            try:
                order_id = self.orders_queue.get(timeout=1)
                print(f"正在制作订单 #{order_id} 的咖啡...")
                time.sleep(2)  # 模拟制作咖啡的时间
                self.ready_orders.put(order_id)
                print(f"订单 #{order_id} 的咖啡制作完成!")
                self.orders_queue.task_done()
            except Queue.Empty:
                continue
            except Exception as e:
                print(f"制作咖啡时发生错误:{e}")
    
    def serve_coffee(self):
        while not self.should_stop.is_set():
            try:
                order_id = self.ready_orders.get(timeout=1)
                print(f"服务员正在派送订单 #{order_id}")
                time.sleep(1)  # 模拟派送时间
                print(f"订单 #{order_id} 已送达!")
                self.ready_orders.task_done()
            except Queue.Empty:
                continue
            except Exception as e:
                print(f"派送咖啡时发生错误:{e}")

def run_coffee_shop():
    shop = CoffeeShop()
    
    # 创建工作线程
    threads = [
        threading.Thread(target=shop.make_coffee, name="Barista"),
        threading.Thread(target=shop.serve_coffee, name="Server")
    ]
    
    # 将线程设置为守护线程
    for thread in threads:
        thread.daemon = True
        thread.start()
    
    try:
        # 模拟接收订单
        for i in range(5):
            shop.take_order(i)
            time.sleep(0.5)
        
        # 等待所有订单处理完成
        shop.orders_queue.join()
        shop.ready_orders.join()
    except KeyboardInterrupt:
        print("\n收到中断信号,正在关闭...")
    finally:
        # 设置停止标志
        shop.should_stop.set()
        
        # 等待所有线程完成
        for thread in threads:
            thread.join(timeout=2)
        
        print("咖啡店已关闭")

if __name__ == "__main__":
    run_coffee_shop()
多进程编程

以下是一个改进的多进程示例,包含了更好的资源管理和错误处理:

import multiprocessing as mp
import time
import os
import signal
from functools import partial

def init_worker():
    # 忽略子进程的 SIGINT 信号
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def process_image(image_name, processing_time=2):
    try:
        print(f"进程 {os.getpid()} 开始处理图片:{image_name}")
        time.sleep(processing_time)  # 模拟图片处理
        return f"{image_name} 处理完成"
    except Exception as e:
        print(f"处理图片 {image_name} 时发生错误:{e}")
        return f"{image_name} 处理失败"

def batch_process_images(images, max_workers=None):
    if max_workers is None:
        # 设置进程池大小上限
        max_workers = min(len(images), os.cpu_count() or 14)
    
    print(f"使用 {max_workers} 个进程进行处理")
    
    try:
        # 创建进程池,设置初始化函数
        with mp.Pool(processes=max_workers, initializer=init_worker) as pool:
            # 使用进程池处理图片
            results = pool.map_async(process_image, images)
            
            try:
                # 等待所有任务完成,设置超时
                processed_results = results.get(timeout=30)
                return processed_results
            except mp.TimeoutError:
                print("处理超时!")
                pool.terminate()
                return []
            
    except KeyboardInterrupt:
        print("\n检测到中断信号,正在优雅关闭...")
        pool.terminate()
        pool.join()
        return []
    except Exception as e:
        print(f"批处理过程中发生错误:{e}")
        return []

if __name__ == "__main__":
    # 准备测试数据
    test_images = [f"image_{i}.jpg" for i in range(4)]
    
    try:
        results = batch_process_images(test_images)
        
        # 打印结果
        for result in results:
            print(result)
    except Exception as e:
        print(f"主程序发生错误:{e}")
线程安全和锁机制

以下是一个改进的银行账户示例,展示了正确的锁使用和死锁预防:

import threading
import time
from contextlib import contextmanager

class InsufficientFundsError(Exception):
    pass

class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self._lock = threading.RLock()  # 使用可重入锁
        
    @contextmanager
    def _account_lock(self, timeout=1.0):
        """安全的获取锁的上下文管理器"""
        if not self._lock.acquire(timeout=timeout):
            raise TimeoutError("无法获取账户锁,操作超时")
        try:
            yield
        finally:
            self._lock.release()
    
    def get_balance(self):
        """安全地获取余额"""
        with self._account_lock():
            return self.balance
    
    def deposit(self, amount):
        """存款操作"""
        if amount <= 0:
            raise ValueError("存款金额必须大于 0")
            
        with self._account_lock():
            print(f"存款 ¥{amount}...")
            # 模拟处理时间
            time.sleep(0.1)
            self.balance += amount
            print(f"存款完成,当前余额:¥{self.balance}")
    
    def withdraw(self, amount):
        """取款操作"""
        if amount <= 0:
            raise ValueError("取款金额必须大于 0")
            
        with self._account_lock():
            if self.balance >= amount:
                print(f"取款 ¥{amount}...")
                # 模拟处理时间
                time.sleep(0.1)
                self.balance -= amount
                print(f"取款完成,当前余额:¥{self.balance}")
            else:
                raise InsufficientFundsError("余额不足!")
    
    def transfer(self, other_account, amount):
        """
        转账操作
        使用固定的锁定顺序来防止死锁
        """

        if self is other_account:
            raise ValueError("不能转账给自己")
            
        if amount <= 0:
            raise ValueError("转账金额必须大于 0")
        
        # 始终按照内存地址顺序获取锁,防止死锁
        first_account, second_account = sorted([self, other_account], 
                                             key=lambda x: id(x))
        
        with first_account._account_lock():
            with second_account._account_lock():
                if self.balance >= amount:
                    print(f"转账 ¥{amount}...")
                    self.balance -= amount
                    other_account.balance += amount
                    print(f"转账完成")
                else:
                    raise InsufficientFundsError("余额不足!")

def test_bank_account():
    # 创建测试账户
    account1 = BankAccount(1000)
    account2 = BankAccount(500)
    
    # 创建多个操作线程
    threads = []
    
    # 存款操作
    threads.append(threading.Thread(
        target=account1.deposit, 
        args=(500,), 
        name="Deposit-1"
    ))
    
    # 取款操作
    threads.append(threading.Thread(
        target=account1.withdraw, 
        args=(300,), 
        name="Withdraw-1"
    ))
    
    # 转账操作
    threads.append(threading.Thread(
        target=account1.transfer, 
        args=(account2, 400), 
        name="Transfer-1-to-2"
    ))
    
    # 启动所有线程
    for thread in threads:
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    # 打印最终状态
    print("\n最终状态:")
    print(f"账户 1 余额:¥{account1.get_balance()}")
    print(f"账户 2 余额:¥{account2.get_balance()}")

if __name__ == "__main__":
    test_bank_account()

常见陷阱和调试技巧

  1. 异步编程陷阱:

  • 避免在协程中使用阻塞操作
  • 正确处理异步上下文管理器
  • 注意任务取消和清理
  • 使用 asyncio.shield() 保护关键操作
  • 多线程陷阱:

    • 理解 GIL 的限制
    • 避免死锁
    • 正确使用线程安全的数据结构
    • 合理设置超时机制
  • 多进程陷阱:

    • 注意进程间通信的开销
    • 合理设置进程池大小
    • 正确处理进程终止
    • 避免共享状态问题
  • 调试技巧:

    • 使用 logging 模块记录日志
    • 设置适当的超时机制
    • 使用 threading.current_thread().name 跟踪线程
    • 使用 multiprocessing.current_process().name 跟踪进程

    总结

    本教程详细介绍了 Python 中的异步编程和并发编程的核心概念和实践应用:

    1. 异步编程(asyncio)

    • 通过协程实现非阻塞操作
    • 使用事件循环管理异步任务
    • async/await语法简化异步代码
    • 任务和 Future 处理异步操作结果
    • 异步上下文管理器管理资源
  • 并发编程

    • 多线程处理 I/O 密集型任务
    • 多进程处理 CPU 密集型任务
    • 线程池和进程池管理并发任务
    • 锁机制确保数据安全
    • 并发设计模式解决实际问题

    在选择并发策略时,需要考虑以下因素:

    • I/O 密集型任务:选择 asyncio 或多线程
    • CPU 密集型任务:选择多进程
    • 混合型任务:考虑组合使用不同策略

    最佳实践:

    1. 始终进行适当的错误处理
    2. 实现优雅的关闭机制
    3. 使用超时机制防止无限等待
    4. 注意资源管理和清理
    5. 选择合适的并发策略
    6. 注意线程安全性
    7. 使用锁和其他同步原语保护共享资源


    机器学习算法与Python实战
    长期跟踪关注统计学、数据挖掘、机器学习算法、深度学习、人工智能技术与行业发展动态,分享Python、机器学习等技术文章。回复机器学习有惊喜资料。
     最新文章