Java线程池的实现原理及其在业务中的最佳实践

企业   2024-05-23 10:28   广东  

导读


本文讲述了Java线程池的实现原理和源码分析以及线程池在业务中的最佳实践。

一、线程池简介

1.什么是线程池?

线程池是一种用于管理和复用线程的机制。
线程池的核心思想是预先创建一定数量的线程,并把它们保存在线程池中,当有任务需要执行时,线程池会从空闲线程中取出一个线程来执行该任务。任务执行完毕后,线程不是被销毁,而是返还给线程池,可以立即或稍后被再次用来执行其他任务。这种机制可以避免因频繁创建和销毁线程而带来的性能开销,同时也能控制同时运行的线程数量,从而提高系统的性能和资源利用率。
线程池的主要组成部分包括工作线程、任务队列、线程管理器等。线程池的设计有助于优化多线程程序的性能和资源利用,同时简化了线程的管理和复用的复杂性。

2.线程池有什么好处?

  • 减少线程创建和销毁的开销,线程的创建和销毁需要消耗系统资源,线程池通过复用线程,避免了对资源的频繁操作,从而提高系统性能;

  • 控制和优化系统资源利用,线程池通过控制线程的数量,可以尽可能地压榨机器性能,提高系统资源利用率;

  • 提高响应速度,线程池可以预先创建线程且通过多线程并发处理任务,提升任务的响应速度及系统的并发性能;

二、Java线程池的实现原理

1.类继承关系

Java线程池的核心实现类是ThreadPoolExecutor,其类继承关系如图所示,其中的核心方法如下图:

ThreadPoolExecutor的部分核心方法
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理
submit(Runnable r):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是null
submit(Runnable r,Object result):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是传入的第二个参数result
shutdown():关闭线程池,不接受新任务,但是等待队列中的任务处理完毕才能真正关闭
shutdownNow():立即关闭线程池,不接受新任务,也不再处理等待队列中的任务,同时中断正在执行的线程
setCorePoolSize(int corePoolSize):设置核心线程数 setKeepAliveTime(long time, TimeUnit unit):设置线程的空闲时间 setMaximumPoolSize(int maximumPoolSize):设置最大线程数 setRejectedExecutionHandler(RejectedExecutionHandler rh):设置拒绝策略 setThreadFactory(ThreadFactory tf):设置线程工厂
beforeExecute(Thread t, Runnable r):任务执行之前的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑 afterExecute(Runnable r, Throwable t):任务执行之后的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑

2.线程池的状态

  • RUNNING:线程池一旦被创建,就处于RUNNING状态,任务数为0,能够接收新任务,对已排队的任务进行处理。

  • SHUTDOWN:不接收新任务,但能处理已排队的任务。当调用线程池的shutdown()方法时,线程池会由RUNNING转变为SHUTDOWN状态。

  • STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。当调用线程池的shutdownNow()方法时,线程池会由RUNNING或SHUTDOWN转变为STOP状态。

  • TIDYING:当线程池在SHUTDOWN状态下,任务队列为空且执行中任务为空,或者线程池在STOP状态下,线程池中执行中任务为空时,线程池会变为TIDYING状态,会执行terminated()方法。这个方法在线程池中是空实现,可以重写该方法进行相应的处理。

  • TERMINATED:线程池彻底终止。线程池在TIDYING状态执行完terminated()方法后,就会由TIDYING转变为TERMINATED状态。

3.线程池的执行流程

4.问题思考

  • 线程池的核心线程可以回收吗?
问题分析与解答
ThreadPoolExecutor默认不回收核心线程,但是提供了allowCoreThreadTimeOut(boolean value)方法,当参数为true时,可以在达到线程空闲时间后,回收核心线程,在业务代码中,如果线程池是周期性的使用,可以考虑将该参数设置为true;
  • 线程池在提交任务前,可以提前创建线程吗?
问题分析与解答
ThreadPoolExecutor提供了两个方法:
prestartCoreThread():启动一个线程,等待任务,如果核心线程数已达到,这个方法返回false,否则返回true;
prestartAllCoreThreads():启动所有的核心线程,返回启动成功的核心线程数 ;
通过这种设置,可以在提交任务前,完成核心线程的创建,从而实现线程池预热的效果;

三、源码分析

1.execute(Runnable command)

首先会获取ctl,ctl由32位组成,高3位记录了线程池的状态,低29位记录了线程池中工作线程的数量;拿到ctl后判断当前工作线程数是否小于核心线程数,小于则创建核心线程执行任务,否则则尝试将任务添加到任务队列中,如果添加任务队列失败,尝试创建非核心线程执行任务。

2.addWorker(Runnable firstTask, boolean core)

在双层死循环中,依然是获取ctl,校验当前线程池的状态,校验通过后,会在内层死循环中尝试cas增加工作线程数量,只有增加成功才能跳出外层for循环,真正开始创建线程。

创建线程需要加锁保证并发安全,线程池中使用Worker类封装Thread对象,在线程池运行中或shutdown状态均可创建线程并执行阻塞队列中的任务。

线程创建成功并添加到线程池后,会调用start()方法,启动线程,执行任务。

3.runWorker(Worker w)

在runWorker()方法中,首先会以Worker中封装的任务作为第一个任务,并在while()循环中不断从阻塞队列中获取任务执行,实现线程的复用,这里有一个细节,源码是直接抛出了task.run()方法执行的异常,并没有进行捕获。

4.getTask()

校验线程池状态,如果是非运行状态,且不是shutdown且阻塞队列不为空时对工作线程数-1,并返回null,注意这里只是对工作线程数-1,并没有真正的销毁线程,销毁线程的逻辑收敛在processWorkerExit();

根据是否需要超时控制,提供两个阻塞方法获取阻塞队列中的任务。



5.processWorkerExit(w, completedAbruptly)

当线程执行异常或者获取不到阻塞任务时,会进入该方法。

四、线程池在业务中的最佳实践

1.如何选择合适的线程池参数

    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) 
线程池的核心参数
1.corePoolSize:  核心线程数
2.maximumPoolSize: 最大线程数
3.keepAliveTime: 线程的空闲时间
4.unit: 空闲时间的单位(秒、分、小时等等)
5.workQueue: 等待队列
6.threadFactory: 线程工厂
7.handler: 拒绝策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程直接处理该任务(可能为主线程Main),保证每个任务执行完毕
推荐使用自定义的线程工厂,重写创建线程的方法,支持自定义创建线程的名称、优先级等属性,方便排查问题;
问题思考
  • 如何选择合适的线程池参数?
问题分析与解答
1)根据任务场景选择
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间;
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N;
2)根据线程池用途选择
用途一:快速响应用户请求
比如说用户查询商品详情页,会涉及查询商品关联的一系列信息如价格、优惠、库存、基础信息等,站在用户体验的角度,希望商详页的响应时间越短越好,此时可以考虑使用线程池并发地查询价格、优惠、库存等信息,再聚合结果返回,降低接口总rt。这种线程池用途追求的是最快响应速度,所以可以考虑不设置队列去缓冲并发任务,而是尽可能设置更大的corePoolSize和maxPoolSize;
用途二:快速处理批量任务
比如说项目中在对接渠道同步商品供给时,需要查询大量的商品数据并同步给渠道,此时可以考虑使用线程池快速处理批量任务。这种线程池用途关注的是如何使用有限的机器资源,尽可能地在单位时间内处理更多的任务,提升系统吞吐量,所以需要设置阻塞队列缓冲任务,并根据任务场景调整合适的corePoolSize;

2.如何正确地创建线程池对象

使用Executors创建特定的线程池,线程池参数比较固定,不推荐使用。
Executors是一个java.util.concurrent包中的工具类,可以方便的为我们创建几种特定参数的线程池。
  • FixedThreadPool:具有固定线程数量的线程池,无界阻塞队列;

  • CachedThreadPool:线程数量可以动态伸缩的线程池,最大线程数为Integer.MAX_VALUE

  • SingleThreadPool:单个线程的线程,核心线程数和最大线程数都是1,无界阻塞队列
...
推荐使用饿汉式的单例模式创建线程池对象,支持灵活的参数配置,在类加载阶段即完成线程池对象的创建,且只会实例化一个对象,再封装统一的获取线程池对象的方法,暴露给业务代码使用,参考代码:
public class TestThreadPool {
/** * 线程池 */ private static ExecutorService executor = initDefaultExecutor();
/** * 统一的获取线程池对象方法 */ public static ExecutorService getExecutor() { return executor; }
private static final int DEFAULT_THREAD_SIZE = 16; private static final int DEFAULT_QUEUE_SIZE = 10240;
private static ExecutorService initDefaultExecutor() { return new ThreadPoolExecutor(DEFAULT_THREAD_SIZE, DEFAULT_THREAD_SIZE, 300, TimeUnit.SECONDS, new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE), new DefaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); }}
问题思考
  • 局部变量定义的线程池对象在方法结束后可以被垃圾回收吗?
public static void main(String[] args) {    test1();    test2();}
public static void test1(){
Object obj = new Object(); System.out.println("方法一执行完成");}
public static void test2(){
ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.execute(new Runnable() { @Override public void run() { System.out.println("方法二执行完成"); } });}

问题分析与解答

先说结论,上图代码,obj是定义在test1()方法体内的局部变量,正常来说局部变量会保存在栈中,随着方法的结束,栈帧出栈,栈帧中的局部变量也会销毁,此时没有任何变量指向堆内存中的new Object()对象,所以堆中的new Object()对象可以被垃圾回收;executorService同样也是定义在方法体中的局部变量,但在方法结束后,线程池中还存在活跃的线程,根据GC Roots可达性分析原理,可作为GC Roots的对象有:
  • 虚拟机栈(栈帧中的本地变量表)中引用的对象;

  • 方法区中的类静态属性引用的对象;

  • 方法区中常量引用的对象;

  • 本地方法栈中JNI(即一般说的Native方法)引用的对象;

  • 正在运行的线程;
所以在test2()方法执行结束后,线程池中的线程会进入getTask()的阻塞状态,但依然是活跃线程,此时堆中的线程池对象依然GC Roots可达,所以不会被垃圾回收;
为了证明上述结论,我们只需要证明运行中的线程对象持有线程池对象的引用即可。在源码解析中我们知道线程池中的线程都是用Worker对象封装的,所以我们只需要找到Worker和ThreadPoolExecutor的引用关系,在源码中,Worker是ThreadPoolExecutor类中的内部类,
private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable
但并没有看到Worker类直接引用了外部线程池对象,难道线程池对象GC Roots不可达,可以被垃圾回收?但如果线程池被垃圾回收了,其中的线程却依然存活,这非常矛盾,想想就很奇怪。
其实,这个问题涉及到Java的内部类和外部类的引用关系,可以看下面这个demo:
public class Outer {
private String name;
private Inner inner;
public int outerMethod() { return 1; }
/** * 非静态内部类 */ class Inner {
private void innerMethod() { //在非静态内部类中可以直接调用外部类的方法 outerMethod(); }
private String address; }}
在非静态内部类中,可以不实例化外部类对象,直接调用外部类的方法,原因就是在Java中,非静态内部类会持有外部类的引用,可以使用javac反编译验证这个结论,反编译生成的Outer$Inner.class文件中,外部类会作为非静态内部类构造方法的参数,也就是说非静态内部类会持有外部类的引用,
class Outer$Inner {    private String address;
Outer$Inner(Outer var1) { this.this$0 = var1; }
private void innerMethod() { this.this$0.outerMethod(); }}
为什么强调是非静态内部类,因为静态内部类并不会持有外部类的引用,
public class Outer {
private String name;
private Inner inner;
public int outerMethod() { return 1; }
/** * 静态内部类 */ static class Inner {
private String address; }}
javac反编译后的Outer$Inner.class文件并没有引用外部类
class Outer$Inner {    private String address;
Outer$Inner() { }}

这个问题带来两个启发:

1)不要在代码中使用局部变量定义线程池对象,这样不仅会导致频繁创建线程池对象,违背了线程复用的设计原则,还有可能造成局部变量的线程池对象无法及时垃圾回收的内存泄漏问题;
2)业务代码中,优先定义静态内部类而不是非静态内部类,可以有效防止内存泄露的风险;

3.相互依赖的子任务避免使用同一线程池

public class FartherAndSonTask {
public static ExecutorService executor= TestThreadPool.getExecutor();
public static void main(String[] args) throws Exception { FatherTask fatherTask = new FatherTask(); Future<String> future = executor.submit(fatherTask); future.get(); }
/** * 父任务,里面异步执行子任务 */ static class FatherTask implements Callable<String> { @Override public String call() throws Exception { System.out.println("开始执行父任务"); SonTask sonTask = new SonTask(); Future<String> future = executor.submit(sonTask); String s = future.get(); System.out.println("父任务已拿到子任务执行结果"); return s; } } /** * 子任务 */ static class SonTask implements Callable<String> { @Override public String call() throws Exception {
//处理一些业务逻辑 System.out.println("子任务执行完成"); return null; } }}
相互依赖的任务提交到同一线程池,父任务依赖子任务的执行结果,父任务get()执行结果时可能因为子任务还没执行完导致线程阻塞,如果提交的任务过多,线程池中的线程都被类似的父任务占用并阻塞,导致任务队列中的子任务没有线程去执行,最终出现线程饥饿的死锁现象。
优化思路:
  • 使用不同的线程池隔离有相互依赖的任务;

  • 调用future.get()方法设置超时时间,这样做可以避免线程阻塞,但是依然会出现大量的超时异常;

4.合理选择submit()和execute()方法

  • execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理,轻量级方法,适用于处理不需要返回结果的任务;

  • submit(Runnable r):返回值为Future类型,future可以用来检查任务是否已经完成,获取任务的结果等,适用于需要处理返回结果的任务;
比如说下面这段代码,作用是推送日历房的报价信息给到三方渠道,因为涉及到的日历房供给范围非常大,所以采用线程池技术提高报价推送的效率,同样由于整个推送任务的耗时非常长,防止任务中断,所以需要记录推送任务的执行进度,并实现“断点续传”功能,为了实现上述功能,采用submit()方法提交子任务,并阻塞拿到所有子任务的执行结果,完成推送任务进度的更新。
private void asyncSupplyPriceSync(List<Long> shidList, SupplyPriceSyncMsg msg) {
if (CollectionUtils.isEmpty(shidList)) { return; } PlatformLogUtil.logInfo("异步推送酒店报价信息供给总数:", shidList.size()); final Map<String, Future<?>> futures = Maps.newLinkedHashMap(); //分批提交线程池处理 Lists.partition(shidList, SwitchConfig.HOTEL_PRICE_ASYNC_LIST_SIZE) .forEach(subList -> { try { futures.put(UUID.randomUUID().toString(), executorService .submit(() -> batchSupplyPriceSync(subList, msg))); } catch (Exception e) { PlatformLogUtil.logFail("异步推送报价信息线程池子任务执行异常", LogListUtil.newArrayList(subList), e); } }); //阻塞,等所有子任务都处理完,才返回结果 futures.forEach((uuid, future) -> { try { future.get(); } catch (InterruptedException | ExecutionException e) { PlatformLogUtil.logFail("异步推送报价信息获取子任务执行结果异常", LogListUtil.newArrayList(e)); } }); }

5.请捕获线程池中子任务的代码异常

public class ExceptionTest {
public static ExecutorService executor = TestThreadPool.getExecutor();
public static void main(String[] args) {
executor.execute(() -> test("正常")); executor.execute(() -> test("正常")); executor.execute(() -> test("任务执行异常")); executor.execute(() -> test("正常")); executor.shutdown(); }
public static void test(String str) {
String result = "当前ThreadName为" + Thread.currentThread().getName() + ":结果" + str; if (str.equals("任务执行异常")) { throw new RuntimeException(result + "****执行异常"); } else { System.out.println(result); } }}
问题思考
  • 如果线程池中执行任务的线程异常,发生异常的线程会销毁吗?其他任务还能正常执行吗?
问题分析与解答
上述代码执行结果如图:



可以发现

1)线程池中执行任务的线程异常,并不会影响其他任务的执行,而且execute()提交任务,直接打印了异常信息。如果使用submit()提交任务,控制台会怎么输出呢?这里可以感兴趣的同学可以探索一下;
2)注意看打印的线程名称为1,2,5,3,为什么没有打印线程名称4呢?
首先可以确认线程异常后创建了新的线程5,至于线程3怎么处理,以及为什么没有打印线程4,我们可以从源码进行分析,执行task.run()方法后,如果有异常,runWorker()方法内会抛出异常,并依次进入三个finally代码块。

在processWorkerExit(w, completedAbruptly)方法内,可以看到如果运行中的线程池有线程执行异常,会调用workers.remove()移除当前线程,并调用addWorker()重新创建新的线程。

所以在任务3销毁线程再重新创建线程,和任务4创建线程这两个动作会有时序问题,具体看下图:

那么控制打印的异常信息是怎么来的呢?

在runWorker()执行完成之后,由于未捕获task.run()执行的异常,最终会由jvm回调java.lang.Thread#dispatchUncaughtException方法,处理线程池中未捕获的异常信息,最终调用java.lang.ThreadGroup#uncaughtException,可以看到,在这里支持我们自定义未捕获异常处理器UncaughtExceptionHandler,否则默认是直接打印出异常信息。

所以,在业务代码中,请捕获子任务中的异常,否则会导致线程池中的工作线程频繁销毁、创建,造成资源浪费,违背了线程复用的设计原则。


活动介绍:GIAC全球互联网架构大会于5/24~25日在深圳举行,大会中有更多来自Java、AI及大模型精彩案例,点击“阅读原文”了解更多议题。识别下图二维码还可以申请大会单日体验票福利。



参考阅读:



本文由高可用架构转载。技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿


高可用架构
高可用架构公众号。
 最新文章