## 20 其实不用造轮子—Executor框架详解
> 人的一生可能燃烧也可能腐朽,我不能腐朽,我愿意燃烧起来!
> ——奥斯特洛夫斯基
上一节我们动手实现了一个非常简单的线程池。其实 JDK 已经为我们准备了功能丰富的线程池工具。本章我们就来学习一下 JDK 中的线程池—Executor 框架。
## 1、Executor 框架的使用
我们首先来看看 Executor 框架是如何实用的。看如下代码:
~~~java
public class Client {
public static Executor executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
Stream.iterate(1, item -> item + 1).limit(20).forEach(item -> {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " hello!");
});
}
);
}
}
~~~
可以看到在使用上和我们自己实现的线程池几乎一模一样。只不过在声明 Executor 的时候,没有直接 new 对象。而是通过Executors的静态方法 newFixedThreadPool 来创建 Executor。
而执行任务的方式则是和我们自己实现的一模一样。都是调用 executor 方法,传入 Runnable 接口的实现,也就是运行的逻辑。那么它的内部实现是否也和我们实现的一样呢?先别急,我们一点点展开来分析。
## 2、Executor 框架设计简介
我们先来看下Executor框架的继承关系:
![图片描述](https://img.mukewang.com/5dba76b6000189b606480516.jpg)
**1、Executor**
可以看到最顶层是 Executor 的接口。这个接口很简单,只有一个 execute 方法。此接口的目的是为了把任务提交和任务执行解耦。
**2、ExecutorService**
这还是一个接口,继承自 Executor,它扩展了 Executor 接口,定义了更多线程池相关的操作。
**3、AbstractExecutorService**
提供了 ExecutorService 的部分默认实现。
**4、ThreadPoolExecutor**
实际上我们使用的线程池的实现是 ThreadPoolExecutor。它实现了线程池工作的完整机制。也是我们接下来分析的重点对象。
**5、ForkJoinPool**
实现 Fork/Join 模式的线程池,后面会有小节专门讲解。本节不做深入分析。
**6、ScheduledExecutorService**
这个接口扩展了ExecutorService,定义个延迟执行和周期性执行任务的方法。
**7、ScheduledThreadPoolExecutor**
此接口则是在继承 ThreadPoolExecutor 的基础上实现 ScheduledExecutorService 接口,提供定时和周期执行任务的特性。
**Executors**
Executor 框架还提供 Executors 对象。注意看这个对象比 Executor 接口后面对了个 s,要区分开,不要搞混。Executors 是一个工厂及工具类。提供了例如 newFixedThreadPool(10) 的方法,来创建各种不同的 Executor。
## 3、Executor 框架源码分析
Executor 设计的类和实现比较多。本节对 Executor 框架的源码分析以 ThreadPoolExecutor 作为主线,其它的内容也会有所提及,不过请同学们抓住重点,别偏离了主线。
### 3.1 Executor
代码如下:
~~~java
public interface Executor {
void execute(Runnable command);
}
~~~
很简单,只是为了把提交任务解耦出来。
### 3.2 ExecutorService
ExecutorService 定义了线程池管理和更多执行任务的方法,如下:
![图片描述](https://img.mukewang.com/5dba76d70001cff905560264.jpg)
挑选几个重点的说一下:
**shutdown 方法**
终止 executorService,不再执行任务新的任务,已经执行的任务会被执行完。
**shutdownNow 方法**
不等待正在执行的任务完成,强行关闭。不过此方法并不保证正在执行的任务能被强行终止。返回从来没有被执行的任务列表。
**submit 方法**
对 execute 方法的扩展,会返回一个 Future 对象,持有任务执行结果。
**invokeAll 方法**
执行一组任务,所有任务都返回或者 timeout 的时候,invokeAll 方法返回执行结果列表。该方法一旦返回结果,没有完成的任务则被取消。
**invokeAny 方法**
执行一组任务,任意一个任务有返回时,invokeAny 返回该任务的执行结果。其余没有完成的任务则被取消。
### 3.3 AbstractExecutorService
提供了 newTaskFor 方法对 Runnable 进行包装:
~~~java
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
~~~
它对 submit 的实现,就是过 newTaskFor 方法,代码如下:
~~~java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
~~~
这里用到的 RunnableFuture,就是为了这个功能而生,它实现了 Runbale 接口及 Future 接口。所以它可以被传入execute方法,从而添加进任务列表。此外它还保存了执行的结果,并被返回。
### 3.4 构造ThreadPoolExecutor
下面才是本节的重头戏,对 ThreadPoolExecutor 的源代码分析。我们从 ThreadPoolExecutor 的创建开始。
~~~java
Executors.newFixedThreadPool(10)
~~~
可以看到是通过 Executors 的工厂方法来创建的,Executor 提供了多种工厂方法创建 ThreadPool。其实根本是调用 ThreadPoolExecutor 构造方法时传入参数不同。我们以 newFixedThreadPool 方法为例,看一下代码:
~~~java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
~~~
ThreadPoolExecutor 构造方法如下:
~~~java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
~~~
现在我们可以翻一下 newFixedThreadPool 定义了一个什么样的线程池:
核心线程数量为 n,最大线程数量也为 n 的线程池。线程池中线程永远存活。线程池创建线程使用 defaultTHreadFactory。当无法创建线程时,使用 defaultHandler。
**corePoolSize**即线程池的核心线程数量,其实也是最小线程数量。不设置 allowCoreThreadTimeOut 的情况下,核心线程数量范围内的线程一直存活。
**maximumPoolSize**即线程池的最大线程数量。受限于线程池的 CAPACITY。线程池的 CAPACITY 为 2 的 29 次方 -1。这是由于线程池把线程数量和状态保存在一个整形原子变量中。状态保存在高位,占据了两位,所以线程池中线程数量最多到 2 的 29 次方 -1。
**workQueue**是一个阻塞的 queue,用来保存线程池要执行的所有任务。
**Executors.defaultThreadFactory()**,我们看下源代码,发现其最终返回了一个 DefaultThreadFactory。代码如下:
~~~java
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
~~~
其实就是规范了生成的 Thread。避免调用 new Thread 创建,导致创建出来的Thread可能存在差异。在Executor中,对线程的创建都是通过 ThreadFactory,禁止使用 new Thread 来创建。
ThreadPoolExecutor 中还有个重要的属性:
~~~java
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
~~~
通过注释可以看出,这个 HashSet 中存的是 Thread。而 Worker 其实就是对 Thread 的进一步封装。
我们再回过头来,看一下 ThreadPoolExecutor 的构造函数中做了什么事情:
~~~java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
~~~
可以看到只是对属性的赋值,并没有启动任何线程。这样做是很好的设计,因为没有任何任务添加时就启动线程,是对系统资源的浪费。
通过以上分析,我们对 ThreadPoolExecutor 的结构应该比较清晰了,其实核心和我们自己实现的线程池是一样的。ThreadPoolExecutor 也有一个任务的列表 workQueue,还有一个线程的列表 worker
那么按照我们自己实现的逻辑,线程池应该是通过启动线程轮询从 workQueue 中获取任务执行来实现线程池的运转。结下来我们看看猜想是否正确。
### 3.5 启动 ThreadPoolExecutor
既然在创建 ThreadPoolExecutor 时并没有启动线程池,那么线程池是何时被启动的呢?我猜应该是添加第一个任务的时候,也就是调用 execute 方法时。我们来看看 execute 方法的代码:
~~~java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
~~~
源代码中有一段关键的注释我没有贴进来,下面我先把这段关键的注释翻译讲解下:
分三步做处理:
* 1、如果运行的线程数量小于 corePoolSize,那么尝试创建新的线程,并把传入的 command 作为它的第一个 task 来执行。调用 addWorker 会自动检查 runState 和 workCount,以此来防止在不应该添加线程时添加线程的错误警告;
* 2、即使 task 可以被成功加入队列,我们仍旧需要再次确认我们是否应该添加 thread(因为最后一次检查之后可能有线程已经死掉了)还是线程池在进入此方法后已经停掉了。所以我们会再次检查状态,如果有必要的话,可以回滚队列。或者当没有线程时,开启新的 thread;
* 3、如果无法将 task 加入 queue,那么可以尝试添加新的 thread。如果添加失败,这是因为线程池被关闭或者已经饱和了,所以拒绝这个 task。
以上是原文的翻译。结合代码,其实就是如下三步:
* 1、线程数量不足 corePoolSize时,添加新线程作为 core thread 执行 command;
* 2、将 command 加入 workQueue,然后再次检查线程池状态。如果不是 isRunning,则移除 command 并且reject command。如果线程数量已经为 0,那么则再次 addWorker;
* 3、如果无法将 task 加入 workQueue,则尝试 addWorker。但不作为 core thread。如果添加失败,则 reject command(由于没有加入 workQueue,所以不需要从 queue 中移除 command)。
可以看到 execute 流程的核心方法为**addWorker**。我们继续分析 addWorker方法。
**addWork**中主要执行如下逻辑:
1、更新 worker 的数量,代码如下:
~~~java
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
~~~
retry 是一个标记,和循环配合使用,continue retry 的时候,会跳到 retry 的地方再次执行。如果 break retry,则跳出整个循环体。前文提到过,ThreadPoolExecutor 把状态和线程池数量两个属性存在了一个 Atomic 变量中,就是这里用到的 ctl。源码中先检查了状态,然后格局创建线程类型的不同,进行数量的校验。在通过 CAS方 式更新状 ctl,成功的话则跳出循环。否则再次取得线程池状态,如果和最初已经不一致,那么从头开始执行。如果状态并未改变则继续更新 worker 的数量。流程参考下图:
![图片描述](https://img.mukewang.com/5dba77580001323211010534.jpg)
2、添加 worker 到 workers 的 set 中。并且启动 worker 中持有的线程。代码如下:
~~~java
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
~~~
可以看到添加 work 时需要先获得锁,这样确保多线程并发安全。如果添加 worker 成功,那么调用 worker 中线程的 start 方法启动线程。如果启动失败则调用 addWorkerFailed 方法进行回滚。过程比较简单,这里就不再提流程图了。
分析到这里,我们先进行下总结。
* 1、ThreadPoolExecutor 在初始化后并没有启动和创建任何线程;
* 2、在调用 execute 方法时才会调用 addWorker 创建线程,并且把 command 加入到 workQueue(如果已经拥有超过 core 数量的线程,则不会再调用 addWorker 创建线程);
* 3、addWorker 方法中会创建新的 worker,并启动其持有的线程来执行任务。
第二步中,如果线程数量已经达到 corePoolSize,则只会把 command 加入到 workQueue 中,那么加入到 workQueue 中的 command 是如何被执行的呢?我们下面来分析 Worker 的源代码。
### 3.6 Worker
Worker 封装了线程,是 executor 中的工作单元。worker 继承自 AbstractQueuedSynchronizer,并实现 Runnable。
worker 中的属性如下:
~~~java
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
~~~
如果存在 firstTask,那么 worker 中线程启动时,会先执行 firstTask。
构造方法如下:
~~~java
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
~~~
可以看到通过 ThreadFactory 创建线程,并没有直接 new。原因上文已经将结果。此处还需要特别注意的是,创建 thread 时把 worker 自己作为 Runnable 的实现传入了 thread 中。那么 addWork 时调用的 t.start(),实际上运行的是 t 所属 worker的run 方法。worker 的 run 方法如下:
~~~java
public void run() {
runWorker(this);
}
~~~
实际运行的是 ThreadPoolExecutor 的 runWorker 方法,代码如下:
~~~java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
~~~
主流程如下:
* 1、先取出 worker 中的 firstTask,并清空;
* 2、如果没有 firstTask,则调用 getTask 方法,从 workQueue 中获取task;
* 3、获取锁;
* 4、执行 beforeExecute。这里是空方法,如有需要在子类实现;
* 5、执行 task.run;
* 6、执行 afterExecute。这里是空方法,如有需要在子类实现;
* 7、清空 task,completedTasks++,释放锁;
* 8、当有异常或者没有 task 可执行时,会进入外层 finnaly 代码块。调用 processWorkerExit 退出当前 worker。从 works 中移除本 worker 后,如果 worker 数量小于 corePoolSize,则创建新的 worker,以维持 corePoolSize 大小的线程数。
这行代码 while (task != null || (task = getTask()) != null) ,确保了 worker 不停地从 workQueue 中取得 task 执行。getTask 方法会从 BlockingQueue workQueue 中 poll 或者 take 其中的 task 出来。
到这里关于 executor 如何创建并启动线程执行 task 的过程已经分析清楚了。其实和我们自己实现的线程池的核心思想一致,都是通过维护一定数量的线程,并且不断从任务队列取得任务执行来实现线程池的运转。但是 Executor 框架考虑得更为全面,健壮性也要好很多。我们在实际开发中不要自己再去设计线程池,请直接使用 executor。
## 4、总结
本节的内容相对比较多,源代码阅读也比较枯燥。我们在阅读源代码时一定抓住核心流程·,从高层级逻辑开始自顶向下分析和阅读。不要过多纠缠于细节,等到大体能够读懂时,再去看感兴趣的细节实现。否则很容易在层层嵌套的源代码中迷失了方向,陷入某个细节不能自拔。其实关于 ThreadPoolExecutor 还有些方法,本节没有给出分析,比如 shutdown 和 shutdownNow,大家可以尝试自己分析下。
- 前言
- 第1章 Java并发简介
- 01 开篇词:多线程为什么是你必需要掌握的知识
- 02 绝对不仅仅是为了面试—我们为什么需要学习多线程
- 03 多线程开发如此简单—Java中如何编写多线程程序
- 04 人多力量未必大—并发可能会遇到的问题
- 第2章 Java中如何编写多线程
- 05 看若兄弟,实如父子—Thread和Runnable详解
- 06 线程什么时候开始真正执行?—线程的状态详解
- 07 深入Thread类—线程API精讲
- 08 集体协作,什么最重要?沟通!—线程的等待和通知
- 09 使用多线程实现分工、解耦、缓冲—生产者、消费者实战
- 第3章 并发的问题和原因详解
- 10 有福同享,有难同当—原子性
- 11 眼见不实—可见性
- 12 什么?还有这种操作!—有序性
- 13 问题的根源—Java内存模型简介
- 14 僵持不下—死锁详解
- 第4章 如何解决并发问题
- 15 原子性轻量级实现—深入理解Atomic与CAS
- 16 让你眼见为实—volatile详解
- 17 资源有限,请排队等候—Synchronized使用、原理及缺陷
- 18 线程作用域内共享变量—深入解析ThreadLocal
- 第5章 线程池
- 19 自己动手丰衣足食—简单线程池实现
- 20 其实不用造轮子—Executor框架详解
- 第6章 主要并发工具类
- 21 更高级的锁—深入解析Lock
- 22 到底哪把锁更适合你?—synchronized与ReentrantLock对比
- 23 按需上锁—ReadWriteLock详解
- 24 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap上
- 25 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap下
- 26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒数计时开始,三、二、一—CountDownLatch详解
- 28 人齐了,一起行动—CyclicBarrier详解
- 29 一手交钱,一手交货—Exchanger详解
- 30 限量供应,不好意思您来晚了—Semaphore详解
- 第7章 高级并发工具类及并发设计模式
- 31 凭票取餐—Future模式详解
- 32 请按到场顺序发言—Completion Service详解
- 33 分阶段执行你的任务-学习使用Phaser运行多阶段任务
- 34 谁都不能偷懒-通过 CompletableFuture 组装你的异步计算单元
- 35 拆分你的任务—学习使用Fork/Join框架
- 36 为多线程们安排一位经理—Master/Slave模式详解
- 第8章 总结
- 37 结束语