## 35 拆分你的任务—学习使用Fork/Join框架
> 读书给人以快乐、给人以光彩、给人以才干。
> ——培根
本节我们学习 Excutor 的另外一种实现 ForkJoinPool。顾名思义,ForkJoinPool 的核心功能有两个。第一个是 Fork,拆解你的任务。第二个是 Join,合并任务的执行结果。这个场景很常见,比如我们要处理一批数据,由于数据间没有依赖性,那么我们可以把这一批数据拆解为更小的批次,多线程并行处理。最后再合并处理的结果。 Fork/Join 的核心思想就是分而治之。
## 1、ForkJoinPool 介绍
ForkJoinPool 自 Java 7 引入。它和 ThreadPoolExecutor 都继承自 AbstractExecutorService,实现了 ExecutorService 和 Executor 接口。ForkJoinPool 用来把大任务切分为小任务,如果切分完小任务还不够小(由你设置的阈值决定),那么就继续向下切分。经过切分后,最后的任务是金字塔形状,计算完成后向上汇总。如下图:
![图片描述](https://img.mukewang.com/5e01ac0f0001bcc116000994.jpg)
ForkJoinPool 处理任务的核心思想可以用如下伪代码表示:
~~~java
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
~~~
如果一个任务足够小,那么执行任务逻辑。如果不够小,拆分为两个独立的子任务。子任务执行后, 取得两个子任务的执行结果进行合并。
ForkJoinPool 通过 submit 执行 ForkJoinTask 类型的任务。ForkJoinTask 是抽象类,有着不同的子类实现。比较常用的是如下两种:
1、RecursiveAction,没有返回值;
2、RecurisiveTask,有返回值。
此外 submit 方法还可以执行 Callable 和 Runnable 的接口实现。
ForkJoinTask 就是我们为代码中的 problem。我来举个例子看具体如何使用。假如让你计算 1-10000 的和,
我们可以把任务拆解为 100 个,每个任务计算 100 个数字之和。代码如下.
Task 代码;
~~~java
public class Task extends RecursiveTask<Integer> {
private static final int THRESHOLD = 100;
private int from;
private int to;
public Task(int from, int to) {
super();
this.from = from;
this.to = to;
}
@Override
protected Integer compute() {
if (THRESHOLD > (to - from)) {
return IntStream.range(from, to + 1)
.reduce((a, b) -> a + b)
.getAsInt();
} else {
int forkNumber = (from + to) / 2;
Task left = new Task(from, forkNumber);
Task right = new Task(forkNumber + 1, to);
left.fork();
right.fork();
return left.join() + right.join();
}
}
}
~~~
Task 继承自 RecursiveTask。递归任务的大小力度为 100。重写的 compute 方法和文章开头的伪代码 solve 是一样的思路。先判断任务的大小是否在 THRESHOLD 之内。如果已经拆解到 THRESHOLD 内,那么进行计算。如果任务拆分还没达到 THRESHOLD,那么继续拆解任务。fork 操作会把当前任务放入线程池中来执行。最后再通过 join 取得执行结果做合并。
Client 代码:
~~~java
public class Client {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
ForkJoinTask<Integer> result = forkJoinPool.submit(new Task(1, 10000));
System.out.println("计算结果为"+result.get());
forkJoinPool.shutdown();
}
}
~~~
我们首先通过静态方法 commonPool 声明一个 ForkJoinPool。commonPool 创建的 ForkJoinPool 满足绝大多数的应用场景。然后通过 submit 方法提交我们的 Task,计算 1-10000 的和。提交 Task 后,Task 中的 compute 方法最终会被调用,通过对任务的拆解,以及对任务计算结果的合并,汇总到此处的 Task 中。通过 Task 的 get 方法获取计算结果。最后关闭线程池。
执行结果如下:
~~~
计算结果为50005000
~~~
## 2、ForkJoinPool 原理介绍
ForkJoinPool 中的每个线程都维护自己的工作队列。这是一个双端队列,既可以先进先出,也可以先进后出。简单来说就是队列两端都可以做出队操作。当每个线程产生新的任务时(比如说调用了 fork 操作),会被加入到队尾。线程工作的时候会从自己维护的工作队列的 top 做出队操作(LIFO),取得任务来执行。线程还会去其它线程任务队列窃取任务,此时是从其它队列的 base 取得任务(FIFO)。如下图所示:
![图片描述](https://img.mukewang.com/5e01abfb0001f1d315260880.jpg)
下面简单介绍几个常用方法:
1、fork 方法中会判断如果当前线程不是 ForkJoinWorkerThread,则把任务加入 submission queue。否则加入自己的工作队列中。submission queue 没有关联的线程,是所有线程都可以执行的任务队列。fork 代码如下:
~~~java
public final ForkJoinTask<V> fork() {
Thread t;
//判断本线程是否为ForkJoinWorkerThread,是的话,加入到自己的workQueue中,否则调用externalPush
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
~~~
2、join 方法中,自己任务没有执行完,则取的自己任务队列中的任务执行。如果发现自己的任务已经没有了,则会去窃取其它线程的任务来执行。Join 代码如下:
~~~java
public final V join() {
int s;
//取得doJoin后的状态,位运算后判断是否正常,不正常的话抛出异常。正常的话返回计算结果
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
~~~
主要逻辑在 doJoin 中,代码如下:
~~~java
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
~~~
如果当前线程不是 ForkJoinWorkerThread,则调用 externalAwaitDone。如果是 ForkJoinWorkerThread 那么先通过 tryUnpush 从自己的 workQueue 的 top 位置取得当前 task,然后调用 doExec 执行。这两步成功的话返回执行结果 s,否则调用 awaitJoin。这个方法中判断本任务是否执行完成,完成直接返回,否则会尝试窃执行取别的线程的任务。
3、submit 方法中,会把任务 push 到 submission queue。
ForkJoinPool 通过任务窃取,使得任务的执行更为高效。
## 3、总结
ForkJoinPool 为我们拆分大任务再汇总小任务计算结果提供了很好的支持。它很适合执行计算密集型的任务。但是如果你的任务拆分逻辑比计算逻辑还要复杂,ForkJoinPool 并不能为你带来性能的提升,反而会起到负面作用。因此需要结合自己的场景来选择使用。
- 前言
- 第1章 Java并发简介
- 01 开篇词:多线程为什么是你必需要掌握的知识
- 02 绝对不仅仅是为了面试—我们为什么需要学习多线程
- 03 多线程开发如此简单—Java中如何编写多线程程序
- 04 人多力量未必大—并发可能会遇到的问题
- 第2章 Java中如何编写多线程
- 05 看若兄弟,实如父子—Thread和Runnable详解
- 06 线程什么时候开始真正执行?—线程的状态详解
- 07 深入Thread类—线程API精讲
- 08 集体协作,什么最重要?沟通!—线程的等待和通知
- 09 使用多线程实现分工、解耦、缓冲—生产者、消费者实战
- 第3章 并发的问题和原因详解
- 10 有福同享,有难同当—原子性
- 11 眼见不实—可见性
- 12 什么?还有这种操作!—有序性
- 13 问题的根源—Java内存模型简介
- 14 僵持不下—死锁详解
- 第4章 如何解决并发问题
- 15 原子性轻量级实现—深入理解Atomic与CAS
- 16 让你眼见为实—volatile详解
- 17 资源有限,请排队等候—Synchronized使用、原理及缺陷
- 18 线程作用域内共享变量—深入解析ThreadLocal
- 第5章 线程池
- 19 自己动手丰衣足食—简单线程池实现
- 20 其实不用造轮子—Executor框架详解
- 第6章 主要并发工具类
- 21 更高级的锁—深入解析Lock
- 22 到底哪把锁更适合你?—synchronized与ReentrantLock对比
- 23 按需上锁—ReadWriteLock详解
- 24 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap上
- 25 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap下
- 26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒数计时开始,三、二、一—CountDownLatch详解
- 28 人齐了,一起行动—CyclicBarrier详解
- 29 一手交钱,一手交货—Exchanger详解
- 30 限量供应,不好意思您来晚了—Semaphore详解
- 第7章 高级并发工具类及并发设计模式
- 31 凭票取餐—Future模式详解
- 32 请按到场顺序发言—Completion Service详解
- 33 分阶段执行你的任务-学习使用Phaser运行多阶段任务
- 34 谁都不能偷懒-通过 CompletableFuture 组装你的异步计算单元
- 35 拆分你的任务—学习使用Fork/Join框架
- 36 为多线程们安排一位经理—Master/Slave模式详解
- 第8章 总结
- 37 结束语