随着 Python 3.13 即将发布,我想看看 Python 即将推出的最大变化。我认为迄今为止最令人兴奋的特性是来自 PEP-703[1] 的自由线程 Python。
由于我加入得比较晚,已经有很多文章在讨论这个话题了。我看到了 Simon Wilson 的一篇优秀 文章[2] ,成功演示了纯 Python 函数的并行性。在此基础上,我想探索一下除了使用 ThreadPoolExecutor.map
之外的线程同步方法。
在 Python 3.13 之前,由于 GIL 的存在,线程被用于 IO 密集型任务,Asyncio 也用于 IO(显而易见...),我们可以使用 asyncio.to_thread
[3] 来包装线程。例如,
await asyncio.to_thread(io_bound_task, "first_arg", optional="optional")
我们能否将其用于 CPU 密集型任务?这里有一段直接引用自 Asyncio 文档的内容:
注意:由于 GIL 的存在,asyncio.to_thread() 通常只能用于使 IO 密集型函数变为非阻塞。然而,对于释放 GIL 的扩展模块或没有 GIL 的替代 Python 实现,asyncio.to_thread() 也可以用于 CPU 密集型函数。唯一阻碍我们的是 GIL,所以 CPU 密集型任务应该不成问题。尽管考虑到 Async_IO_ 这个名字,这仍然感觉有点傻。
我用 AsyncIO 修改更新了 Simon 的测试脚本:
import argparse import os import sys import time from asyncio import get_running_loop, run, to_thread, TaskGroup from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager @contextmanager def timer(): start = time.time() yield print(f"Elapsed time: {time.time() - start}") def cpu_bound_task(n): """A CPU-bound task that computes the sum of squares up to n.""" return sum(i * i for i in range(n)) async def main(): parser = argparse.ArgumentParser(description="Run a CPU-bound task with threads") parser.add_argument("--threads", type=int, default=4, help="Number of threads") parser.add_argument("--tasks", type=int, default=10, help="Number of tasks") parser.add_argument( "--size", type=int, default=5000000, help="Task size (n for sum of squares)" ) args = parser.parse_args() get_running_loop().set_default_executor(ThreadPoolExecutor(max_workers=args.threads)) with timer(): async with TaskGroup() as tg: for _ in range(args.tasks): tg.create_task(to_thread(cpu_bound_task, args.size)) if __name__ == "__main__": print("Parallel with Asyncio") print(f"GIL {sys._is_gil_enabled()}") # type: ignore run(main())
我在我的 M3 Macbook Pro 上分别在有 GIL 和无 GIL 的情况下运行了它:
➜ python parallel_asyncio.py
Parallel with Asyncio
GIL False
Elapsed time: 0.5552260875701904
在没有自由线程的情况下:
➜ python parallel_asyncio.py
Parallel with Asyncio
GIL True
Elapsed time: 1.6787209510803223
结果如预期一样,当我们使用 AsyncIO 并发运行代码时,我们观察到了预期的并行执行带来的速度提升。
但为什么要这样做?
通常在讨论 Asyncio 时,人们总是关注其性能或性能不足。虽然性能当然很重要,但能够推理并发性是最大的好处。
我个人认为,TaskGroup
的加入使得 Asyncio 并发任务变得相当容易理解,我们可以用它来同步线程任务的结果。
根据你对 AsyncIO 的熟悉程度,它实际上可能是启动线程最简单的方法。这有点类似于 golang 中 go 例程的便利性。
这种方式还可以混合 IO 密集型异步任务和 CPU 密集型任务。比如像这样:
async with TaskGroup() as tg: io_task_future = tg.create_task(fetch(url)) tg.create_task(to_thread(cpu_bound_task)) tg.create_task(to_thread(cpu_bound_task)) await to_thread(compute_results, await io_task_future)
具体示例
现在还很难判断这是否是我们想要的,我认为有一些更具体的例子会让我们更好地理解。如果我想到什么,我会尝试跟进这个话题。
参考链接
- PEP-703: https://peps.python.org/pep-0703/
- 文章: https://til.simonwillison.net/python/trying-free-threaded-python
asyncio.to_thread
: https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread