深度解析Java线程池的异常处理

在逛同性交友网站GitHub的时候看到一个解析线程池异常处理的Issue,正好是曾经自己遇到过的问题。在此记录下来,并将其拓展到其他类型的线程池。

本文因篇幅省略了诸多AQS相关知识,可以查看博客中另一篇博文 一行一行源码分析清楚AQS 以保证清楚理解本文。

1、ThreadPoolExecutor

此部分来源于 GitHub aCoder2013

这一线程池由来已久,是抽象类 AbstractExecutorService 继承类,通过调用不同构造函数实现诸如 newFixedThreadPool 、newCachedThreadPool 线程池功能。

问题:

考虑下面这段代码,有什么区别呢?你可以猜猜会不会有异常打出呢?如果打出来的话是在哪里?:

你会发现单就执行这两句的话,结果只会打印一处异常信息,来源于 execute() 中的 obj.toString() 。

源码解析:

分析下面源码,发现其实重载的 submit() 方法将 Runnable、Callable 都封装成继承 Future 的 RunnableFuture 的实现类 FutureTask 对象(有点忽悠 XD)

接下来就会实际提交到队列中交给线程池调度处理:

那么接下来看看线程池核心的流程:

submit() 的方式

之前我们知道最终传递过去的是FutureTask,也就是说会调用这里的 Future 的 run方法,我们看看实现:

如上面分析到的,这样的话调用 FutureTask.run() 并不会直接抛出异常,所以在 ThreadPool.execute() 中捕获不到异常。但我们可以通过调用 get() 方法来捕捉异常。

submit() 解决方式

1、基本方式 try/catch,直接调用get()

2、重写 protected afterExecute(Runnable r,Throwable t ) { } 方法

想想如果我明明一开始调用的是 submit(Runnable r) ,为了捕捉异常还需刻意调用 get() 未免有点麻烦。Doug Lea 大佬已经在 JDK 文档中教我们可以重写 ThreadPoolExecutor 中的 afterExecute() 方法来实现异常捕获:

execute() 方式:

如代码SrcAnalyse2,此方法不同于submit() 会进行封装成Future ,其传递过去的就直接是Runnable,因此就会直接抛出:

那么这里的异常到底会抛出到哪里呢, 我们看看JVM具体是怎么处理的:

可以看到这里最终会去调用Thread#dispatchUncaughtException方法:

execute() 解决方式

1、基本方式,直接try/catch

2、线程重写 setUncaughtExceptionHandler() 方法

2、ForkJoinPool

ForkJoinPool也是继承AbstractExecutorService的线程池,实现了 Fork/Join 及 work-stealing 以提升多核计算效率(详细的 ForkJoin 设计原理可参考三石 道并发编程网 )。
因各版本 JDK 源码略有差异,此处仅为 JDK8 源码分析,且不具体分析线程池运行机制。

JDK源码中给了我们这样一个任务提交总结表

任务执行需求 在非Fork/Join调用 在Fork/Join计算中调用
异步执行无返回值 execute(ForkJoinTask) ForkJoinTask.fork()
等待获取返回值 invoke(ForkJoinTask) ForkJoinTask.invoke()
异步执行并获取Future submit(ForkJoinTask) ForkJoinTask.fork()
  1. 非Fork/Join 和在 Fork/Join 中是指调用提交时所在的位置,前者为外部(main)通过 ForkJoinPool 本身提交 ForkJoinTask ,后者为 ForkJoinTask 执行中再将子任务提交到 ForkJoinPool 中。
  2. ForkJoinTask fork 方法返回this,而 ForkJoinTask 本身实现 Future ,则第三种中可使用 submit(ForkJoinTask).get()ForkJoinTask.fork().get()获取返回值,将 get 方法换成 join 也同样可行,区别只在于异常处理方式。
  3. 调用 execute(Runnable)submit(Runnable)submit(Callable)将通过 ForkJoinTask 的子类进行适配。
  4. 此外还有 invokeAll(Collection <Callable>) 等方法,类似此处就不讨论。

跟踪捕获

ForkJoinPool 中三种类型的提交方法都将调用 externalPush(ForkJoinTask)externalSubmit(ForkJoinTask)将参数封装成 ForkJoinTask 并 Push 到 已存在或初始化的 WorkQueue 中

我们绕过具体的任务调度、状态转换直接从执行 ForkJoinTask 的 ForkJoinWokerThread 来跟踪执行异常的处理 (每个这样的线程对象都在 ForkJoinPool 中有一对应的 WorkQueue)

  1.  ForkJoinWorkerThread.run()

 

  1. ForkJoinPool.runWorker(WorkQueue)

 

  1. WorkQueue.runTask(ForkJoinTask)

 

  1. ForkJoinTask.doExec()

 

  1. setExceptionalCompletion(Throwable)

抛出异常

ExceptionNode[ ] 实现类似 HashTable 以记录异常及异常的 ForkJoinTask。为什么使用弱引用呢?可以想象当分出的诸多子任务中一个子任务异常,势必造成之上的父任务异常,以此循环向上将造成大量异常对象,全部存储将有损效率。所以采用弱应用,当子任务的异常在父任务执行体中因执行 get、join、invoke 而抛出并被我们上面分析到的过程捕获,异常将被记录为父任务的异常并存储,此时子任务异常已没有意义子任务也无引用,使用弱引用可借用GC帮助清除

既然已经记录下异常,那么怎样会抛出这些异常呢?在 ForkJoinTask 中提供方法获取

上面已提到 ForkJoinTask 中调用到此方法获取异常的有 get、join、invoke、invokeAll
其中的 join、invoke、invokeAll 采用如下抛出 Unchecked 异常,而JDK源码示例中也推荐在任务体中使用这几种。

而 get 因实现自 Future 而将异常封装成统一的 ExecutionException

因此只要在执行体中进行合适 Fork/Join 任务分解,并在非 Fork/Join 中给 ForkJoinPool 提交后调用提交或返回的 ForkJoinTask 中 get、join、invoke 方法(即使无返回值也可以调用来抛出),再加以 try/catch 就能捕获到任务体执行过程中的异常。

拓展

要是我通过 ForkJoinPool.execute(Runnable) 提交的任务,本身就没有 ForkJoinTask 且此方法也没有返回 ForkJoinTask ,这种情况将如何捕获异常呢?

其实此方法将 Runnable 封装为 ForkJoinTask 的子类,此子类拓展了上面提到的 internalPropagateException(Throwable) 异常传播方法,并在其中抛出异常被在上文介绍的 ForkJoinWorkerThread.run()这个方法捕获,并调用 ForkJoinWorkerThread 中 onTermination(Throwable exception) 收尾函数,最终都将调用 ForkJoinPool.deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) 并在其中抛出。

没人管了?肯定不会的。第一部分讲到的 Thread.UncaughtExceptionHandler 在这里也同样起作用,给 ForkJoinPool 构造函数传入UncaughtExceptionHandler,则所有 ForkJoinWorkerThread 都将被设置成此以处理异常(设计得就是这么周到 XD)。当然你要不嫌麻烦选择自定义 ForkJoinWorkerThreadFactory 也是没问题的

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*