使用 Asyncio 实现 Python 自由线程

文摘   2024-10-18 11:12   上海  

随着 Python 3.13 即将发布,我想看看 Python 即将推出的最大变化。我认为迄今为止最令人兴奋的特性是来自  PEP-703[1]  的自由线程 Python。

由于我加入得比较晚,已经有很多文章在讨论这个话题了。我看到了 Simon Wilson 的一篇优秀 文章[2] ,成功演示了纯 Python 函数的并行性。在此基础上,我想探索一下除了使用 ThreadPoolExecutor.map 之外的线程同步方法。

在 Python 3.13 之前,由于 GIL 的存在,线程被用于 IO 密集型任务,Asyncio 也用于 IO(显而易见...),我们可以使用  asyncio.to_thread[3]  来包装线程。例如,

await asyncio.to_thread(io_bound_task, "first_arg", optional="optional")

我们能否将其用于 CPU 密集型任务?这里有一段直接引用自 Asyncio 文档的内容:

注意:由于 GIL 的存在,asyncio.to_thread() 通常只能用于使 IO 密集型函数变为非阻塞。然而,对于释放 GIL 的扩展模块或没有 GIL 的替代 Python 实现,asyncio.to_thread() 也可以用于 CPU 密集型函数。唯一阻碍我们的是 GIL,所以 CPU 密集型任务应该不成问题。尽管考虑到 Async_IO_ 这个名字,这仍然感觉有点傻。

我用 AsyncIO 修改更新了 Simon 的测试脚本:

import argparse
import os
import sys
import time
from asyncio import get_running_loop, run, to_thread, TaskGroup
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager

@contextmanager
def timer():
    start = time.time()
    yield
    print(f"Elapsed time: {time.time() - start}")

def cpu_bound_task(n):
    """A CPU-bound task that computes the sum of squares up to n."""
    return sum(i * i for i in range(n))

async def main():
    parser = argparse.ArgumentParser(description="Run a CPU-bound task with threads")
    parser.add_argument("--threads", type=int, default=4, help="Number of threads")
    parser.add_argument("--tasks", type=int, default=10, help="Number of tasks")
    parser.add_argument(
        "--size", type=int, default=5000000, help="Task size (n for sum of squares)"
    )
    args = parser.parse_args()

    get_running_loop().set_default_executor(ThreadPoolExecutor(max_workers=args.threads))

    with timer():
        async with TaskGroup() as tg:
            for _ in range(args.tasks):
                tg.create_task(to_thread(cpu_bound_task, args.size))

if __name__ == "__main__":
    print("Parallel with Asyncio")
    print(f"GIL {sys._is_gil_enabled()}")  # type: ignore
    run(main())

我在我的 M3 Macbook Pro 上分别在有 GIL 和无 GIL 的情况下运行了它:

➜ python parallel_asyncio.py
Parallel with Asyncio
GIL False
Elapsed time: 0.5552260875701904

在没有自由线程的情况下:

➜  python parallel_asyncio.py
Parallel with Asyncio
GIL True
Elapsed time: 1.6787209510803223

结果如预期一样,当我们使用 AsyncIO 并发运行代码时,我们观察到了预期的并行执行带来的速度提升。

但为什么要这样做?

通常在讨论 Asyncio 时,人们总是关注其性能或性能不足。虽然性能当然很重要,但能够推理并发性是最大的好处。

我个人认为,TaskGroup 的加入使得 Asyncio 并发任务变得相当容易理解,我们可以用它来同步线程任务的结果。

根据你对 AsyncIO 的熟悉程度,它实际上可能是启动线程最简单的方法。这有点类似于 golang 中 go 例程的便利性。

这种方式还可以混合 IO 密集型异步任务和 CPU 密集型任务。比如像这样:

async with TaskGroup() as tg:
    io_task_future = tg.create_task(fetch(url))
    tg.create_task(to_thread(cpu_bound_task))
    tg.create_task(to_thread(cpu_bound_task))

await to_thread(compute_results, await io_task_future)

具体示例

现在还很难判断这是否是我们想要的,我认为有一些更具体的例子会让我们更好地理解。如果我想到什么,我会尝试跟进这个话题。



参考链接

  1. PEP-703: https://peps.python.org/pep-0703/
  2. 文章: https://til.simonwillison.net/python/trying-free-threaded-python
  3. asyncio.to_thread: https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread

幻想发生器
图解技术本质
 最新文章