Python 多线程:让你的程序开启 “并行” 加速之旅!

文摘   科技   2025-01-15 15:27   上海  
点击蓝字,关注山海摸鱼人

多线程就是在一个程序中同时运行多个任务,在 Python 中,我们可以使用 threading 模块、queue 模块、concurrent.futures 模块来实现多线程编程。

导入 threading 模块

首先,我们需要导入 threading 模块,代码如下:

import threading

创建线程的基本方法

我们可以通过创建一个类,让它继承自 threading.Thread,然后重写 run 方法,这个 run 方法里就是线程要执行的任务啦。看下面的代码:

import threading

class ShanHaiTask(threading.Thread):
    def __init__(self, task_name):
        threading.Thread.__init__(self)
        self.task_name = task_name  # 线程的名字

    def run(self):
        print(f"{self.task_name} 开始执行任务啦!")
        # 这里可以写具体的任务代码,比如计算、文件操作等
        print(f"{self.task_name} 完成任务啦!")


# 创建一个线程实例
task1 = ShanHaiTask("ShanHaiMoYuRen")
task1.start()  # 启动线程

在这个代码中,我们创建了一个名为 ShanHaiTask 的类,它继承自 threading.Thread__init__ 方法用来初始化线程的名字,而 run 方法就是线程真正执行的任务。我们创建了一个 ShanHaiMoYuRen 的线程,当我们调用 start() 方法时,线程就会开始执行 run 方法里的内容啦。

输出结果大概是这样的:

ShanHaiMoYuRen 开始执行任务啦!
ShanHaiMoYuRen 完成任务啦!

另一种创建线程的方法是使用函数和 threading.Thread 的 target 参数,像这样:

import threading

def shan_hai_task(task_name):
    print(f"{task_name} 开始执行任务啦!")
    # 这里添加具体的任务代码
    print(f"{task_name} 完成任务啦!")

task2 = threading.Thread(target=shan_hai_task, args=("ShanHaiXiaoYaoKe",))
task2.start()

这里我们定义了一个 shan_hai_task 函数,然后使用 threading.Thread 创建了一个线程,target 参数指定了线程要执行的函数,args 参数是传递给函数的参数。这样我们就创建了一个名为 ShanHaiXiaoYaoKe 的线程,当启动这个线程时,它会执行 shan_hai_task 函数。

多线程案例

计算一系列数字的平方。

import threading, time


def calculate_square(num):
    time.sleep(0.1 * num)
    print(f"计算 {num} 的平方:{num * num}")


nums = [12345]
threads = []
for num in nums:
    thread = threading.Thread(target=calculate_square, args=(num,))
    threads.append(thread)

print('运行开始')
for thread in threads:
    thread.start()
print('运行中')
for thread in threads:
    thread.join()  # 等待所有线程完成
print('运行结束')

在这个代码中,我们定义了一个 calculate_square 函数,它用来计算一个数字的平方。然后我们有一个数字列表 nums,我们为每个数字创建一个线程,让它们同时计算平方。join() 方法会让主程序等待所有线程完成,这样我们就能保证在所有计算完成后再做其他事情啦。

多线程并取得返回值

有时候,我们希望线程完成任务后能给我们返回一个结果。

方式1:使用全局变量(不推荐,但简单易懂)

我们可以使用全局变量来存储线程的结果,不过这种方法在多线程环境下可能会出现一些问题,比如多个线程同时修改这个变量可能会导致混乱哦。但让我们先看看代码:

import threading

result = {}
lock = threading.Lock()


def calculate_cube(num):
    global result
    cube = num * num * num
    with lock:
        result[num] = cube


thread1 = threading.Thread(target=calculate_cube, args=(3,))
thread2 = threading.Thread(target=calculate_cube, args=(5,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()

for k, v in result.items():
    print(f"计算 {k} 的立方:{v}")

result:这是一个全局字典,用于存储计算结果。它的键是输入的数字,值是该数字的立方。最初,它是空的,等待存储计算结果。
lock:创建了一个 threading.Lock() 实例,即一个互斥锁。这个锁用于确保在多线程环境下对 result 变量的修改是线程安全的,防止多个线程同时修改 result 而导致数据不一致或丢失。

方式2:使用 queue 模块(推荐)

更安全的方法是使用 queue 模块,它可以帮助我们在多线程间安全地传递数据哦。让我们来看一个例子:

import threading
import queue


def calculate_power(num, result_queue):
    power = num ** 4
    result_queue.put(power)
    print(f"计算 {num} 的四次方:{power}")


result_queue = queue.Queue()
thread = threading.Thread(target=calculate_power, args=(2, result_queue))
thread.start()
thread.join()
final_result = result_queue.get()
print(f"最终结果:{final_result}")

在这个代码中,我们导入了 queue 模块,创建了一个 result_queue。在 calculate_power 函数中,我们计算一个数的四次方并将结果放入队列中。这样,即使多个线程同时使用这个队列,也不会出现混乱,因为队列会自动处理线程安全问题哦。

方式3:使用 concurrent.futures 模块

concurrent.futures 模块提供了一种更高级的方法,它让多线程编程变得更加简单和方便。让我们看看如何使用它:

import concurrent.futures


def calculate_sqrt(num):
    import math
    sqrt = math.sqrt(num)
    return sqrt


# 方式A
with concurrent.futures.ThreadPoolExecutor() as executor:
    numbers = [491625]
    results = executor.map(calculate_sqrt, numbers)
    for result in results:
        print(f"结果:{result}")

# 方式B
with concurrent.futures.ThreadPoolExecutor() as executor:
    numbers = [491625]
    futures = [executor.submit(calculate_sqrt, num) for num in numbers]
    for future in concurrent.futures.as_completed(futures):
        num = numbers[futures.index(future)]
        try:
            result = future.result()
            print(f"输入数字 {num} 的最终结果:{result}")
        except Exception as e:
            print(f"输入数字 {num} 发生错误: {e}")

方式A使用 executor.map() 会按照任务提交的顺序返回结果。而方式B使用 as_completed(futures) 会按照任务完成的顺序返回结果,会先看到先完成的任务的结果,而不是按照输入列表的顺序。

方式A的executor.map() 会将任务执行过程中的异常作为结果抛出,需要在 for 循环中使用 try-except 来处理异常,但是会阻塞后续结果的输出。而方式B使用 future.result() 获取结果时,异常会在调用 result() 时抛出,并且可以单独处理每个任务的异常,不影响其他任务结果的获取。

无论选择使用 executor.map() 还是 as_completed() 取决于你对结果顺序和异常处理的需求。

关于线程池的最大线程数

ThreadPoolExecutor() 可以接受一个 max_workers 参数,它决定了线程池中的最大线程数。例如:

with concurrent.futures.ThreadPoolExecutor(max_workers=2as executor:
    # 代码部分

如果不指定 max_workers,Python 会根据系统的 CPU 核心数自动选择一个合适的值。一般来说,如果任务是 I/O 密集型,较多的线程数可能更合适;如果是 CPU 密集型,线程数通常设置为 CPU 核心数或略多一点,以避免过多的上下文切换和资源竞争。

山海摸鱼人
致力于记录美好之瞬间,追寻美好之明天。
 最新文章