[TOC]
# 线程池的使用
ThreadPoolExecutor的构造方法如下:
```java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
```
其中:
* corePoolSize为核心线程池大小
* maximumPoolSize为线程池允许的最大线程数
* keepAliveTime为线程池的工作线程空闲后,保持存活的时间
* unit为线程保持存活的时间单位
* workQueue为工作队列,线程池中的工作线程都是从这个工作队列源源不断的取出任务进行执行
* threadFactory为创建新的线程时使用的工厂类
* handler为拒绝任务时的饱和策略
使用:
```java
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
executor.execute(new Runnable() {
@Override
public void run() {
//do something
}
});
```
# 源码分析
## 线程池执行机制
![](https://img.kancloud.cn/13/c5/13c5c8e5646465e3abd210b9fc09ccfe_826x394.png)
1、工作线程数小于核心线程数时,直接新建核心线程执行任务;
2、大于核心线程数时,将任务添加进等待队列;
3、队列满时,创建非核心线程执行任务;
4、工作线程数大于最大线程数时,拒绝任务。
ThreadPoolExecutor类的execute方法源码如下:
```java
// 属性ctl是AtomicInteger类型,高3位存储线程池状态,低29位存储当前线程数量
// wokerCountOf(ctl)返回当前线程数量,runStateOf(ctl)返回当前线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
public void execute(Runnable command) {
int c = ctl.get();
// 如果工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程并执行任务
if (addWorker(command, true)) {
return;
}
// 执行失败时,重新获取值
c = ctl.get();
}
// 工作线程数大于等于核心线程数时
// 检查运行状态,然后将任务插入队列
if (isRunning(c) && workQueue.offer(command)) {
// 将任务插入队列成功
int recheck = ctl.get();
// 再次检查状态,防止状态有变化,如果有变化,将任务移出队列并拒绝任务
if (! isRunning(recheck) && remove(command)) {
// 拒绝任务
reject(command);
}
// 线程数为0
else if (workerCountOf(recheck) == 0) {
// 创建非核心线程
addWorker(null, false);
}
}
// 工作线程数大于等于核心线程数时
// 将任务插入队列失败,说明队列已满,创建非核心线程执行任务
else if (!addWorker(command, false)) {
// 创建非核心线程失败,说明工作线程数量大于最大线程数,拒绝任务
reject(command);
}
}
```
上面提到的属性ctl用来控制线程的状态,并用来表示线程池线程数量,在线程池中有以下几种状态:
1、RUNNABLE:运行状态,接受新任务,持续处理任务队列里的任务
2、SHUTDOWN:不再接受新任务,但会处理任务队列里的任务
3、STOP:不再接受新任务,不再处理任务队列里的任务,中断正在执行的任务
4、TIDYING:表示线程池正在停止运作,终止所有任务,消耗所有工作线程
5、TERMINATED:表示线程池已停止运作,所有工作线程已销毁,所有任务已被清空或执行完毕
## 启动新线程
线程池中的工作线程以Worker作为提现,真正工作的线程为Worker的成员变量,Worker从工作队列中取出任务来执行,并能通过Worker控制任务状态。
Worker的源码如下:
```java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
//...
}
```
接下来看看新建线程的代码,也就是addWorker方法:
```java
/**
* @param firstTask 新线程首先会执行的任务,会在执行完这个任务后再从队列中取任务执行
* @param core 是否是核心线程
* @return boolen 创建结果
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断几种不创建线程的情况
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) {
return false;
}
for (;;) {
// 增加线程数,并跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
//...
}
}
// 开始创建并启动线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//...这里实际有加锁,暂时不关心
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// 将新启动的线程添加到线程池中
workers.add(w);
workerAdded = true;
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
}
return workerStarted;
}
```
## 新线程执行任务
创建线程后,会执行Worker的run方法,run方法会调用ThreadPoolExecutor的runWorker方法:
```java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
//...
try {
// 循环调用getTask方法从队列取出任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
//...
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
}
//...
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
} finally {
// 将Worker从workers中移出
processWorkerExit(w, completedAbruptly);
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//...
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
}
}
```
可以看到,线程执行任务的流程为:
1、如果firstTask不为空,先执行firstTask,执行后置空
2、firstTask为空后,循环调用getTask从队列中取出Task并执行
3、一直到没有Task,退出循环
4、调用processWorkerExit将Woker从workers中移出,线程执行完毕,不再被引用,会自动销毁
下面来看看getTask方法。
getTask方法用于从阻塞队列里拿出任务:
```java
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池和阻塞队列的状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取当前线程数
int wc = workerCountOf(c);
// allowCoreThreadTimeOut代表是否允许核心线程退出
// wc > corePoolSize用于判断是否存在非核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 线程池线程已满
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) {
// 超时
return null;
}
continue;
}
try {
// timed为true,使用poll等待keepAliveTime长的时间来获取任务
// timed为false,使用take获取任务,阻塞线程,直到可以从阻塞队列拿到任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
```
线程池里的线程从阻塞队列里那任务
1、如果存在非核心线程,假设阻塞队列里没有任务,那么非核心线程需要等到keepAliveTime后才会释放。
2、如果只有核心线程,且允许核心线程释放,那么等到keepAliveTime后才会释放。
3、只有核心线程,且不允许核心线程释放,那么会通过take阻塞队列,直到可以从队列拿到任务。
参考文章:
[彻底弄懂 Java 线程池原理](https://juejin.im/post/5c33400c6fb9a049fe35503b)
[Java线程池(ThreadPoolExecutor)原理分析与使用](https://blog.csdn.net/fuyuwei2015/article/details/72758179)
[线程池ThreadPoolExecutor实现原理](https://juejin.im/post/5aeec0106fb9a07ab379574f)
[关于线程池的这 8 个问题你都能答上来吗?](https://url.cn/5J45up8)
- 导读
- Java知识
- Java基本程序设计结构
- 【基础知识】Java基础
- 【源码分析】Okio
- 【源码分析】深入理解i++和++i
- 【专题分析】JVM与GC
- 【面试清单】Java基本程序设计结构
- 对象与类
- 【基础知识】对象与类
- 【专题分析】Java类加载过程
- 【面试清单】对象与类
- 泛型
- 【基础知识】泛型
- 【面试清单】泛型
- 集合
- 【基础知识】集合
- 【源码分析】SparseArray
- 【面试清单】集合
- 多线程
- 【基础知识】多线程
- 【源码分析】ThreadPoolExecutor源码分析
- 【专题分析】volatile关键字
- 【面试清单】多线程
- Java新特性
- 【专题分析】Lambda表达式
- 【专题分析】注解
- 【面试清单】Java新特性
- Effective Java笔记
- Android知识
- Activity
- 【基础知识】Activity
- 【专题分析】运行时权限
- 【专题分析】使用Intent打开三方应用
- 【源码分析】Activity的工作过程
- 【面试清单】Activity
- 架构组件
- 【专题分析】MVC、MVP与MVVM
- 【专题分析】数据绑定
- 【面试清单】架构组件
- 界面
- 【专题分析】自定义View
- 【专题分析】ImageView的ScaleType属性
- 【专题分析】ConstraintLayout 使用
- 【专题分析】搞懂点九图
- 【专题分析】Adapter
- 【源码分析】LayoutInflater
- 【源码分析】ViewStub
- 【源码分析】View三大流程
- 【源码分析】触摸事件分发机制
- 【源码分析】按键事件分发机制
- 【源码分析】Android窗口机制
- 【面试清单】界面
- 动画和过渡
- 【基础知识】动画和过渡
- 【面试清单】动画和过渡
- 图片和图形
- 【专题分析】图片加载
- 【面试清单】图片和图形
- 后台任务
- 应用数据和文件
- 基于网络的内容
- 多线程与多进程
- 【基础知识】多线程与多进程
- 【源码分析】Handler
- 【源码分析】AsyncTask
- 【专题分析】Service
- 【源码分析】Parcelable
- 【专题分析】Binder
- 【源码分析】Messenger
- 【面试清单】多线程与多进程
- 应用优化
- 【专题分析】布局优化
- 【专题分析】绘制优化
- 【专题分析】内存优化
- 【专题分析】启动优化
- 【专题分析】电池优化
- 【专题分析】包大小优化
- 【面试清单】应用优化
- Android新特性
- 【专题分析】状态栏、ActionBar和导航栏
- 【专题分析】应用图标、通知栏适配
- 【专题分析】Android新版本重要变更
- 【专题分析】唯一标识符的最佳做法
- 开源库源码分析
- 【源码分析】BaseRecyclerViewAdapterHelper
- 【源码分析】ButterKnife
- 【源码分析】Dagger2
- 【源码分析】EventBus3(一)
- 【源码分析】EventBus3(二)
- 【源码分析】Glide
- 【源码分析】OkHttp
- 【源码分析】Retrofit
- 其他知识
- Flutter
- 原生开发与跨平台开发
- 整体归纳
- 状态及状态管理
- 零碎知识点
- 添加Flutter到现有应用
- Git知识
- Git命令
- .gitignore文件
- 设计模式
- 创建型模式
- 结构型模式
- 行为型模式
- RxJava
- 基础
- Linux知识
- 环境变量
- Linux命令
- ADB命令
- 算法
- 常见数据结构及实现
- 数组
- 排序算法
- 链表
- 二叉树
- 栈和队列
- 算法时间复杂度
- 常见算法思想
- 其他技术
- 正则表达式
- 编码格式
- HTTP与HTTPS
- 【面试清单】其他知识
- 开发归纳
- Android零碎问题
- 其他零碎问题
- 开发思路