ExecutorService 并发指南

科技   2024-10-31 08:18   河北  

在软件开发不断发展的世界中,有效管理并发任务的能力至关重要。传统的线程方法可能变得繁琐且容易出错,特别是在处理大量异步操作时。这时,ExecutorService 登场了:它是Java并发框架中一个强大的抽象,旨在简化和优化异步任务执行。

本文我们将深入探讨 ExecutorService 其核心功能,探索各种线程池配置,以解决Java应用程序中的现实世界并发挑战。通过阅读、实践本文内容,你将掌握以下能力;

  • 简化异步编程: 抽象线程管理允许专注于任务的逻辑,而不是线程创建和生命周期的细节。
  • 提高可扩展性: 轻松管理线程池,使应用程序能够高效地处理不同的工作负载。
  • 增强可维护性: 通过集中线程管理实现更干净、更易读和可维护的应用程序。

ExecutorService 概览

传统上,线程被用来实现并发。每个线程就像处理器上的一个单独核心,一次专注于一个任务。然而,直接管理线程可能是一个 杂技 行为。这种做法需要担心创建线程、处理它们的生命周期(启动、停止)以及当多个线程访问共享资源(如数据库或文件系统)时可能出现的同步问题。

这就是 ExecutorService 可靠的地方。它作为管理 异步任务 的高级抽象。异步任务本质上是可以独立提交和执行的工作,而不必阻塞主执行线程。ExecutorService 负责线程池管理,让开发者专注于任务的逻辑,而不是线程的复杂性。

使用ExecutorService与原始线程管理相比的好处:

  • 简化的代码: 编写的是任务的功能代码(例如,处理图像、下载文件),而不是线程管理(创建和管理工作线程)。这导致代码更干净、更易于维护。
  • 提高的可扩展性: ExecutorService 管理一个线程池。如果一个线程忙于处理图像,另一个可以下载文件,确保程序能够高效地处理不同的工作负载。
  • 增强的错误处理: ExecutorService 提供了处理任务执行期间可能发生的错误的方法。开发者不必编写单独的代码来捕获和处理个别线程抛出的异常。
  • 资源管理: ExecutorService 控制线程池中的线程数量,防止创建太多线程,可能会压垮系统的资源。这就像有一个定义好的工作线程数量,以避免用太多任务超载处理器。

基础功能

现在我们已经理解了ExecutorService的魔力,让我们看看如何将其付诸实践。以下是如何创建一个ExecutorService 实例并提交任务进行异步执行的方法:

创建ExecutorService

Java 的 Executors 类提供了各种工厂方法来创建不同类型的 ExecutorService 实例。这里是一个常见的例子:

ExecutorService executorService = Executors.newFixedThreadPool(5);

这段代码创建了一个大小为5的固定线程池的 ExecutorService。这意味着 ExecutorService 将管理一个5个线程的池来执行您的任务。您可以根据您的需求选择其他配置,如newSingleThreadExecutor(一个线程)或newCachedThreadPool(动态调整线程池大小)。

提交任务

有两种主要方式向ExecutorService提交任务:

  • submit(Callable<T> task) 这个方法接受一个Callable对象作为输入。Callable接口扩展了Runnable,但允许开发者从任务执行中返回结果。当调用submit时,ExecutorService 安排任务执行并返回一个Future对象。
  • execute(Runnable task) 这个方法接受一个Runnable对象作为输入。Runnable接口定义了一个要异步执行的单个方法run()。与submit不同,execute不返回结果。

Future的力量:管理任务执行

当我们使用submit(Callable<T> task)时,ExecutorService 返回一个Future对象。这个Future对象作为任务最终结果的占位符。它提供了几种管理任务执行的方法:

  • get(): 这个方法阻塞调用线程,直到任务完成执行,然后返回Callable中的call()方法产生的结果。
  • isDone(): 这个方法检查任务是否已完成执行。如果任务已完成,则返回true;否则返回false
  • cancel(boolean mayInterruptIfRunning): 这个方法尝试取消任务执行。mayInterruptIfRunning参数指定是否应该中断当前运行的线程。

线程池机制

ExecutorService 的核心是一个强大的概念——线程池。它就像一群等待被工头(ExecutorService)分配任务的工人(线程)。理解线程池对于有效利用 ExecutorService 至关重要。

线程池在行动

工头(ExecutorService)有一群具有特定技能(任务类型)的工人(线程)。当建筑任务(提交)到达时,工头将它们分配给池中的可用工人。这确保了任务的有效执行,而不需要为每个单独的任务创建新的工人。

选择正确的配置

Executors类提供了各种工厂方法,用于创建具有不同线程池配置的 ExecutorService 实例:

  • newFixedThreadPool(int nThreads) 这个方法创建了一个大小为nThreads的固定线程池的ExecutorService。这适用于工作负载可预测的场景。固定池大小确保了一致的并发级别,但如果工作负载超过了可用线程,任务可能会排队等待可用的工人。
  • newSingleThreadExecutor() 这个方法创建了一个池中只有一个线程的ExecutorService。这适用于需要严格顺序执行或彼此依赖的任务。然而,它限制了并发性,并且可能不适用于处理多个独立任务。
  • newCachedThreadPool() 这个方法创建了一个动态调整大小的线程池的ExecutorService。池大小可以根据需要增长以处理传入的任务。然而,这种灵活性带来了潜在的缺点:

平衡性能和资源

线程池的大小和配置显著影响应用程序的性能和资源利用。以下是如何找到平衡的方法:

  • 线程池大小: 更大的线程池允许更多的并发任务执行,但它也消耗更多的资源。选择一个与平均工作负载相符的大小,以避免资源耗尽或未充分利用。
  • 排队行为: 当线程池已满且没有工人可用时,任务可能会排队等待稍后执行。Executors类不直接控制排队行为。然而,一些底层实现可能使用有界队列(如果队列已满,则拒绝任务)或无界队列(即使队列已满,任务也会继续添加,可能导致 OutOfMemoryError 异常)。

ExecutorService 高级功能

现在我们已经探讨了 ExecutorService 的核心功能,让我们深入了解一些高级主题,以获得全面、深刻的理解:

优雅关闭

ExecutorService 不应该被突然遗弃。以下是两个关键方法,用于优雅地关闭它:

  • shutdown() 这个方法向ExecutorService发出信号,表示不应再提交新任务。队列中的任务或当前正在执行的任务将被允许完成,然后ExecutorService才会终止。这就像告知工头 ExecutorService 停止接受新的建筑工程任务,但允许正在进行的项目完成。
  • shutdownNow() 这个方法尝试停止所有当前正在执行的任务,并防止任何新任务被提交。这就像工头紧急停止所有建筑活动。然而,要小心——突然停止任务可能导致工作不完整或数据不一致。

拒绝策略

当我们向一个已满的线程池的 ExecutorService 提交任务时,如果线程池无法接受新的任务,这些任务会被提交到线程池的阻塞队列中。如果阻塞队列也已满,就会触发一个名为 RejectedExecutionHandler 的机制。默认情况下,ThreadPoolExecutor 使用的拒绝策略是 AbortPolicy,即抛出 RejectedExecutionException 异常,导致任务被拒绝。

为了处理被拒绝的任务,我们可以选择不同的策略:

  1. AbortPolicy(默认策略):
  • 直接抛出 RejectedExecutionException 异常,阻止任务被提交。
  • 适用于需要立即发现问题并采取措施的场景。
  • CallerRunsPolicy:
    • 调用者线程会执行被拒绝的任务,这样可以降低新的任务被提交的速度。
    • 适用于希望继续执行被拒绝任务但不需要严格的执行顺序的情况。
  • DiscardPolicy:
    • 直接丢弃被拒绝的任务,不抛出异常。
    • 适用于可以接受部分任务被丢弃的情况,但这种策略可能导致任务丢失而不易察觉。
  • DiscardOldestPolicy:
    • 丢弃阻塞队列中最旧的任务,然后重新尝试提交当前任务。
    • 适用于更关注最新任务而愿意舍弃旧任务的场景。
  • 自定义处理策略:
    • 通过实现 RejectedExecutionHandler 接口,开发者可以定义自己的拒绝处理逻辑。例如,可以记录日志、报警,甚至将任务重新加入队列。

    选择适合的拒绝策略,能帮助你更好地控制线程池在高负载情况下的行为,确保系统的稳定性。

    多任务协调

    ExecutorService 提供了超出简单任务提交的功能:

    • invokeAll(Collection<? extends Callable<T>> tasks) 这个方法允许提交一系列 Callable 任务,并返回包含结果的 List<Future>。它阻塞调用线程直到所有任务完成。这适用于等待一组任务的完成并检索它们各自的结果。
    • invokeAny(Collection<? extends Callable<T>> tasks) 这个方法提交一系列 Callable 任务,但只等待第一个完成。它返回已完成任务的结果,如果所有任务都失败则抛出异常。这在您只需要一个组中任何一个任务的结果时非常有用,无论哪个任务首先完成。

    ExecutorService 实践

    ExecutorService 在许多需要异步处理以提高性能和响应性的场景中表现出色。让我们通过一些代码示例来探索一些常见用例:

    网络请求

    当需要从多个API并发地获取数据以提升Web应用程序的感知性能时,ExecutorService 可以发挥重要作用。通过它,我们可以高效地管理线程池,提交多个并行任务,从而在最短的时间内获取所有API的响应。这种方式不仅提升了数据获取速度,还减少了单个API请求的等待时间,从而显著改善用户体验。

    以下是一个使用 ExecutorService 并发地从多个API获取数据的示例:

    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.List;
    import java.util.ArrayList;

    public class ConcurrentApiRequests {
        public static void main(String[] args) {
            // 创建一个固定大小的线程池
            ExecutorService executor = Executors.newFixedThreadPool(3);

            // 假设我们有多个API的URL列表
            List<String> apiUrls = List.of(
                "https://funtester.com/posts/1",
                "https://funtester.com/posts/2",
                "https://funtester.com/posts/3"
            );

            // 存储每个任务的Future对象
            List<Future<String>> futures = new ArrayList<>();

            for (String url : apiUrls) {
                Callable<String> task = () -> {
                    // 模拟发送HTTP请求(使用 HttpClient 或 OkHttp 等工具)
                    // 这里为了简化,直接返回URL
                    return fetchDataFromApi(url);
                };

                // 提交任务给线程池
                Future<String> future = executor.submit(task);
                futures.add(future);
            }

            // 处理所有的Future对象,获取结果
            for (Future<String> future : futures) {
                try {
                    // 获取任务执行的结果
                    String result = future.get();
                    System.out.println("API Response: " + result);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            // 关闭线程池
            executor.shutdown();
            try {
                // 等待所有任务完成
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    executor.shutdownNow(); // 强制关闭
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
            }
        }

        // 模拟API数据获取
        private static String fetchDataFromApi(String url) throws Exception {
            // 这里可以实际调用HttpClient或其他HTTP库发送请求并获取数据
            // 为了简化,直接返回URL作为结果
            Thread.sleep(1000); // 模拟网络延迟
            return "Data from " + url;
        }
    }

    关键步骤说明:

    1. 线程池创建: 使用 Executors.newFixedThreadPool(3) 创建一个固定大小为3的线程池,允许最多3个任务并发执行。
    2. 提交任务: 将每个API请求封装为一个 Callable 任务,并提交给 ExecutorService,返回一个 Future 对象。Future 用于异步获取任务的执行结果。
    3. 处理任务结果: 通过遍历 Future 列表,调用 future.get() 获取每个任务的结果。此操作是阻塞的,但因为任务是并发执行的,整体性能大大提升。
    4. 线程池关闭: 调用 executor.shutdown() 关闭线程池,并使用 awaitTermination 等待所有任务完成。如果等待超时,则调用 shutdownNow 强制关闭。

    通过 ExecutorService 并发地从多个API获取数据,可以显著提升Web应用程序的感知性能,让用户感受到更快的响应速度和更流畅的交互体验。在实际应用中,根据API的特性和系统资源合理调整线程池配置,是获得最佳性能的关键。

    图像处理

    在需要对一批上传的图像进行后台处理(如调整图像大小)时,ExecutorService 是一个非常有效的工具。它可以异步处理这些任务,而不会阻塞主线程,从而保持应用程序的响应性。

    以下是一个使用 ExecutorService 来异步调整一批图像大小的示例:

    import java.awt.Graphics2D;
    import java.awt.Image;
    import java.awt.image.BufferedImage;
    import java.io.File;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import javax.imageio.ImageIO;

    public class ImageResizer {
        public static void main(String[] args) {
            // 创建一个固定大小的线程池
            ExecutorService executor = Executors.newFixedThreadPool(4);

            // 模拟上传的图像文件列表
            List<File> imageFiles = List.of(
                new File("path/to/image1.jpg"),
                new File("path/to/image2.jpg"),
                new File("path/to/image3.jpg")
            );

            // 存储每个任务的Future对象
            List<Future<File>> futures = new ArrayList<>();

            for (File imageFile : imageFiles) {
                Callable<File> task = () -> resizeImage(imageFile, 200200); // 将图像调整为200x200的大小

                // 提交任务给线程池
                Future<File> future = executor.submit(task);
                futures.add(future);
            }


        // 图像大小调整方法
        private static File resizeImage(File inputFile, int width, int height) throws IOException {
            // 读取原始图像
            BufferedImage originalImage = ImageIO.read(inputFile);

            // 创建一个缩放后的图像
            Image scaledImage = originalImage.getScaledInstance(width, height, Image.SCALE_SMOOTH);
            BufferedImage resizedImage = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB);

            // 在新的BufferedImage中绘制缩放后的图像
            Graphics2D g2d = resizedImage.createGraphics();
            g2d.drawImage(scaledImage, 00null);
            g2d.dispose();

            // 保存调整大小后的图像
            File outputFile = new File("resized_" + inputFile.getName());
            ImageIO.write(resizedImage, "jpg", outputFile);

            return outputFile;
        }
    }

    代码步骤说明:

    1. 线程池创建: 使用 Executors.newFixedThreadPool(4) 创建一个固定大小为4的线程池,允许最多4个图像处理任务并发执行。
    2. 提交任务: 将每个图像的大小调整操作封装为一个 Callable<File> 任务,并提交给 ExecutorService。每个任务返回一个 Future<File>,用于异步获取处理结果。
    3. 图像大小调整: 在 resizeImage 方法中,通过 Image.getScaledInstance 方法调整图像大小,并使用 Graphics2D 将缩放后的图像绘制到新的 BufferedImage 上,然后将其保存为新的文件。
    4. 处理任务结果: 通过遍历 Future 列表,调用 future.get() 获取每个任务的结果。此操作阻塞当前线程直到任务完成,但由于任务是并发执行的,整个过程依然很高效。
    5. 线程池关闭: 调用 executor.shutdown() 关闭线程池,确保所有任务完成后再结束应用。

    这种方法适用于Web应用、桌面应用或服务器后台任务,例如用户上传多张图片时,应用可以迅速响应用户上传操作,而在后台调整图片大小,以便稍后用于展示或存储。

    后台任务

    在应用程序中,某些任务可能需要在后台执行,例如发送电子邮件、记录数据、处理文件等。这些任务通常需要一定的时间完成,而如果在主线程中执行这些任务,可能会导致应用程序的UI变得不响应。为了保持UI的流畅性和用户体验,使用 ExecutorService 来异步处理这些后台任务是非常有效的。

    下面是一个简单的示例,展示了如何使用 ExecutorService 来异步发送电子邮件和记录数据,而不阻塞主线程:

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class BackgroundTaskExample {
        public static void main(String[] args) {
            // 创建一个单线程的ExecutorService,用于处理后台任务
            ExecutorService executor = Executors.newSingleThreadExecutor();

            // 模拟UI操作,例如用户点击按钮后触发的任务
            System.out.println("UI is responsive. Performing background tasks...");

            // 异步发送电子邮件
            executor.submit(() -> sendEmail("user@example.com""Welcome""Thank you for signing up!"));

            // 异步记录数据
            executor.submit(() -> logData("User signed up with email user@example.com"));

            // 主线程继续执行其他操作
            System.out.println("UI can continue to interact with the user...");

            // 关闭ExecutorService
            executor.shutdown();
        }

        // 模拟发送电子邮件的方法
        private static void sendEmail(String recipient, String subject, String body) {
            try {
                // 模拟发送电子邮件的延迟
                Thread.sleep(2000);
                System.out.println("Email sent to " + recipient + " with subject: " + subject);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Email sending was interrupted");
            }
        }

        // 模拟记录数据的方法
        private static void logData(String data) {
            try {
                // 模拟记录数据的延迟
                Thread.sleep(1000);
                System.out.println("Data logged: " + data);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Data logging was interrupted");
            }
        }
    }

    代码说明:

    1. 线程池创建: 使用 Executors.newSingleThreadExecutor() 创建一个单线程的 ExecutorService。这种类型的线程池适合处理多个任务但确保任务按顺序执行。
    2. 异步任务提交: 使用 executor.submit() 提交任务。这些任务在后台异步执行,主线程无需等待任务完成即可继续处理其他操作。
    3. 任务实现: 任务的实现可以是任何需要耗时操作的代码,例如发送电子邮件或记录数据。为了模拟这些操作的延迟,使用了 Thread.sleep()
    4. 主线程的执行: 在任务执行的同时,主线程依然保持对UI的控制权,可以继续响应用户操作。通过这种方式,应用程序的响应性得以保持。
    5. 线程池关闭: 在所有任务提交完毕后,调用 executor.shutdown() 关闭 ExecutorService。这种关闭方式允许已经提交的任务继续执行,而不会接受新任务。

    实际应用场景:

    • 发送通知: 当用户完成某些操作后,应用程序可能需要发送确认邮件或短信。这些操作可以异步进行,不阻碍用户的后续操作。
    • 数据处理: 用户提交数据后,应用可以立即响应用户,而将数据处理的任务(如保存到数据库、生成报告等)交给后台线程执行。
    • 日志记录: 应用程序可以在后台记录重要操作日志,而不会影响前台的用户交互。

    通过 ExecutorService,我们可以有效地将这些后台任务与主线程分离,实现应用程序的高效运行和良好的用户体验。

    经验分享

    最佳实践

    选择正确的 ExecutorService 配置对优化性能和资源至关重要。以下是一些最佳实践:

    • 分析您的工作负载: 了解任务的性质(如 CPU 绑定或 I/O 绑定)和预期的并发任务数量至关重要。CPU 绑定任务需要接近 CPU 核心数的线程池,而 I/O 绑定任务可以使用更多线程。分析工作负载可以帮助确定适当的线程池大小,优化性能和资源使用。
    • 深思熟虑地扩展: 初始时选择较小的线程池规模,根据实际情况逐步增加线程数。这种做法可以避免因线程过多导致的资源耗尽问题。逐步扩展能更好地适应工作负载的变化,并确保系统稳定性。
    • 考虑固定与缓存: 对于可预测的工作负载,固定线程池(FixedThreadPool)提供稳定的性能。对于高度可变的工作负载,可选择缓存线程池(CachedThreadPool),它会动态调整线程数量,但需注意无界队列可能导致资源耗尽。
    • 处理被拒绝的任务: 定义适当的拒绝策略来优雅地处理线程池满的情况。可以记录被拒绝的任务、稍后重试,或抛出异常供应用程序处理。设置自定义拒绝处理器能提高系统的可靠性和灵活性。

    避免的陷阱

    • 资源泄漏: 使用 ExecutorService 后,务必调用 shutdown()shutdownNow() 方法来关闭它。否则,线程池中的空闲线程和其他资源可能会持续占用内存,导致资源泄漏和性能下降。确保在不再需要线程池时进行正确关闭,以维护系统资源的健康。
    • 线程饥饿: 在使用缓存线程池时,频繁的短暂任务可能导致线程池不断创建和销毁线程。这种行为会消耗大量资源,并可能使长期运行的任务无法获得足够的 CPU 时间。为长时间运行的任务考虑使用固定线程池,这样可以保持线程池的稳定性和任务处理的公平性。
    • 未检查的异常: 异步任务在执行过程中可能会抛出异常。如果不进行适当的异常处理,可能导致任务失败并影响应用程序的稳定性。确保在提交任务时实现异常处理机制,捕获并记录异常,防止应用程序因未处理的异常崩溃。通过 Futureget() 方法获取任务结果时也要处理 ExecutionException

    监控和管理

    • JMX(Java 管理扩展): JMX 提供了强大的工具来监控和管理线程池的性能。通过 JMX,可以实时查看线程池的核心指标,如活动线程数、任务队列大小和任务完成时间。这有助于即时了解线程池的运行状况,并做出必要的调整,保持系统性能的稳定性。
    • 自定义监控: 实施自定义监控解决方案可以更精确地跟踪线程池的性能指标。通过定制监控脚本或工具,可以获取特定的指标数据,帮助识别系统瓶颈或资源耗尽问题。例如,可以监控任务提交速率、线程池使用情况和任务处理延迟等,以优化线程池配置和系统资源利用。
    • 分析工具: 使用像 JProfiler 或 YourKit 这样的分析工具,可以深入分析线程行为,识别潜在问题。这些工具能够提供线程活动视图、上下文切换情况和锁竞争分析,帮助发现如线程饥饿、线程过度上下文切换等性能瓶颈。通过这些分析,能够有效优化线程池配置,提高系统性能。
    FunTester
    FunTester 原创精华


    FunTester
    万粉千文|百无一用
     最新文章