章
目
录
一、ForkJoinPool是什么?
ForkJoinPool
是 JDK1.7 开始提供的线程池。为了解决 CPU 负载不均衡的问题,如某个较大的任务,被一个线程去执行,而其他线程处于空闲状态。
其实本质上就是为了尽可能地去让每个cpu高效率的工作,以提高计算效率,但千万别盲目使用ForkJoinPool
,认为其效率一定比ThreadPoolExecutor
高。
因此我们需要明确如下几个结论:
- ForkJoinPool 不是为了替代 ThreadPoolExecutor,而是它的补充,在某些应用场景下性能比 ThreadPoolExecutor更好。
- ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 快速排序 等。
- ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。
二、ForkJoinPool的用法是什么?
ForkJoinPool
的用法其实比较简单,在使用之前我们先了解下涉及到的几个常用类及方法:
1)ForkJoinTask
类: 表示一个任务。
2)ForkJoinTask
的子类中有 RecursiveAction
和 RecursiveTask
。其中RecursiveAction 无返回结果,RecursiveTask 有返回结果。
3)我们需要重写 RecursiveAction
或 RecursiveTask
的 compute()
方法(核心逻辑),完成计算或者可以进行任务拆分。
4)调用 ForkJoinTask 的 fork()
的方法,可以让其他空闲的线程执行这个 ForkJoinTask;
5)调用 ForkJoinTask 的 join()
的方法,将多个小任务的结果进行汇总。
三、ForkJoinPool使用案例
1、无返回值情况
为了方便大家结合打印信息理解其执行过程,这里我们以打印1-20区间的正整数为例
1)创建任务类继承RecursiveAction
package com.panziye.demo.test; import java.util.concurrent.RecursiveAction; public class PrintTask extends RecursiveAction { private static final long serialVersionUID = 1L; // 拆分阈值 private static final int THRESHOLD = 5; // 开始值 private int start; // 结束值 private int end; // 构造 public PrintTask(int start, int end) { super(); this.start = start; this.end = end; } // 重写compute核心方法 @Override protected void compute() { //当结束值比起始值小于阈值时,直接打印该区间的值 if (end - start < THRESHOLD) { for (int i = start; i <= end; i++) { System.out.println(Thread.currentThread().getName() + ", i = " + i); } } else { // 大于阈值时,按数值区间平均拆分为两个子任务 int middle = (start + end) / 2; PrintTask leftTask = new PrintTask(start, middle); PrintTask rightTask= new PrintTask(middle + 1, end); leftTask.fork(); rightTask.fork(); } } }
2)创建测试类测试
package com.panziye.demo.test; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; public class TestForkJoinPool { public static void main(String[] args) throws Exception { // 创建ForkJoinPool ForkJoinPool pool = new ForkJoinPool(); // 提交任务,这里打印1到20 pool.submit(new PrintTask(1, 20)); // 开始2秒时如果任务未完成一直阻塞让submit的任务先执行完 pool.awaitTermination(2, TimeUnit.SECONDS); // 关闭任务 pool.shutdown(); } }
3)运行测试结果如下:
细心的同学会发现,每连续的5个数都是由同一个线程打印的,这就是超出阈值被拆分成子任务的效果。
2、有返回值情况
为了方便理解我们,我们在此计算1-20的整数和,代码实现如下:
1)创建计算求任务类,继承RecursiveTask类,并约定返回值泛型
package com.panziye.demo.test; import java.util.concurrent.RecursiveTask; public class SumTask extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L; // 拆分阈值 private static final int THRESHOLD = 5; // 开始值 private int start; // 结束值 private int end; public SumTask(int start, int end) { super(); this.start = start; this.end = end; } // 重写compute核心方法 @Override protected Integer compute() { // 当结束值比起始值小于阈值时,直接计算累加值 if (end - start <= THRESHOLD) { int result = 0; for (int i = start; i <= end; i++) { result += i; } return result; } else { // 大于阈值时,按数值区间平均拆分为两个子任务,进行两个任务的累加值汇总 int middle = (start + end) / 2; SumTask leftTask = new SumTask(start, middle); SumTask rightTask= new SumTask(middle + 1, end); leftTask.fork(); rightTask.fork(); // 合并子任务结果 int sum1 = leftTask.join(); int sum2 = rightTask.join(); System.out.println("sum1="+sum1+",sum2="+sum2); return sum1 + sum2; } } }
2)实现测试类,计算结果:
package com.panziye.demo.test; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.TimeUnit; public class TestForkJoinPool { public static void main(String[] args) throws Exception { // 创建ForkJoinPool ForkJoinPool pool = new ForkJoinPool(); // 提交任务,求和 1到20 ForkJoinTask<Integer> task = pool.submit(new SumTask(1, 20)); // 获取计算结果 int result = task.get(); System.out.println("并行计算 1-20 累加值:" + result); pool.awaitTermination(2, TimeUnit.SECONDS); pool.shutdown(); } }
3、JDK8之后推荐使用并行流计算
比如我们还是就计算1-20的整数和,使用并行流计算代码如下:
import java.util.stream.LongStream; public class TestParalle { public static void main(String[] args) throws Exception { long result = LongStream.rangeClosed(1,20).parallel().reduce(0,Long::sum); System.out.println(result); } }
输出结果还是210,并行流底层还是Fork/Join框架,只是任务拆分优化得很好。Fork/Join 并行流等当计算的数字非常大的时候,优势才能体现出来。也就是说,如果你的计算比较小,或者不是CPU密集型的任务,不太建议使用并行处理。
四、ForkJoinPool原理解释
1)ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
2)每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
3)在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
4)在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
五、ForkJoinPool和ThreadPoolExecutor比较
ForkJoinPool
同ThreadPoolExecutor
一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。
那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
ForkJoinPool的另外一个特性是它能够实现工作窃取(Work Stealing),在该线程池的每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行。
最后,对于ForkJoinPool,当阈值不同时,对于性能也会有一定影响。因此,在使用ForkJoinPool时,对此阈值进行测试,使用一个最合适的值也有助于整体性能。