企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 线程池的使用 ThreadPoolExecutor的构造方法如下: ```java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ``` 其中: * corePoolSize为核心线程池大小 * maximumPoolSize为线程池允许的最大线程数 * keepAliveTime为线程池的工作线程空闲后,保持存活的时间 * unit为线程保持存活的时间单位 * workQueue为工作队列,线程池中的工作线程都是从这个工作队列源源不断的取出任务进行执行 * threadFactory为创建新的线程时使用的工厂类 * handler为拒绝任务时的饱和策略 使用: ```java ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); executor.execute(new Runnable() { @Override public void run() { //do something } }); ``` # 源码分析 ## 线程池执行机制 ![](https://img.kancloud.cn/13/c5/13c5c8e5646465e3abd210b9fc09ccfe_826x394.png) 1、工作线程数小于核心线程数时,直接新建核心线程执行任务; 2、大于核心线程数时,将任务添加进等待队列; 3、队列满时,创建非核心线程执行任务; 4、工作线程数大于最大线程数时,拒绝任务。 ThreadPoolExecutor类的execute方法源码如下: ```java // 属性ctl是AtomicInteger类型,高3位存储线程池状态,低29位存储当前线程数量 // wokerCountOf(ctl)返回当前线程数量,runStateOf(ctl)返回当前线程池状态 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); public void execute(Runnable command) { int c = ctl.get(); // 如果工作线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 创建核心线程并执行任务 if (addWorker(command, true)) { return; } // 执行失败时,重新获取值 c = ctl.get(); } // 工作线程数大于等于核心线程数时 // 检查运行状态,然后将任务插入队列 if (isRunning(c) && workQueue.offer(command)) { // 将任务插入队列成功 int recheck = ctl.get(); // 再次检查状态,防止状态有变化,如果有变化,将任务移出队列并拒绝任务 if (! isRunning(recheck) && remove(command)) { // 拒绝任务 reject(command); } // 线程数为0 else if (workerCountOf(recheck) == 0) { // 创建非核心线程 addWorker(null, false); } } // 工作线程数大于等于核心线程数时 // 将任务插入队列失败,说明队列已满,创建非核心线程执行任务 else if (!addWorker(command, false)) { // 创建非核心线程失败,说明工作线程数量大于最大线程数,拒绝任务 reject(command); } } ``` 上面提到的属性ctl用来控制线程的状态,并用来表示线程池线程数量,在线程池中有以下几种状态: 1、RUNNABLE:运行状态,接受新任务,持续处理任务队列里的任务 2、SHUTDOWN:不再接受新任务,但会处理任务队列里的任务 3、STOP:不再接受新任务,不再处理任务队列里的任务,中断正在执行的任务 4、TIDYING:表示线程池正在停止运作,终止所有任务,消耗所有工作线程 5、TERMINATED:表示线程池已停止运作,所有工作线程已销毁,所有任务已被清空或执行完毕 ## 启动新线程 线程池中的工作线程以Worker作为提现,真正工作的线程为Worker的成员变量,Worker从工作队列中取出任务来执行,并能通过Worker控制任务状态。 Worker的源码如下: ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } //... } ``` 接下来看看新建线程的代码,也就是addWorker方法: ```java /** * @param firstTask 新线程首先会执行的任务,会在执行完这个任务后再从队列中取任务执行 * @param core 是否是核心线程 * @return boolen 创建结果 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断几种不创建线程的情况 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) { return false; } for (;;) { // 增加线程数,并跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; //... } } // 开始创建并启动线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //...这里实际有加锁,暂时不关心 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 将新启动的线程添加到线程池中 workers.add(w); workerAdded = true; } if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } return workerStarted; } ``` ## 新线程执行任务 创建线程后,会执行Worker的run方法,run方法会调用ThreadPoolExecutor的runWorker方法: ```java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //... try { // 循环调用getTask方法从队列取出任务执行 while (task != null || (task = getTask()) != null) { w.lock(); //... try { beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } //... } finally { task = null; w.completedTasks++; w.unlock(); } } } finally { // 将Worker从workers中移出 processWorkerExit(w, completedAbruptly); } } private void processWorkerExit(Worker w, boolean completedAbruptly) { //... try { completedTaskCount += w.completedTasks; workers.remove(w); } } ``` 可以看到,线程执行任务的流程为: 1、如果firstTask不为空,先执行firstTask,执行后置空 2、firstTask为空后,循环调用getTask从队列中取出Task并执行 3、一直到没有Task,退出循环 4、调用processWorkerExit将Woker从workers中移出,线程执行完毕,不再被引用,会自动销毁 下面来看看getTask方法。 getTask方法用于从阻塞队列里拿出任务: ```java private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 检查线程池和阻塞队列的状态 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 获取当前线程数 int wc = workerCountOf(c); // allowCoreThreadTimeOut代表是否允许核心线程退出 // wc > corePoolSize用于判断是否存在非核心线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 线程池线程已满 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) { // 超时 return null; } continue; } try { // timed为true,使用poll等待keepAliveTime长的时间来获取任务 // timed为false,使用take获取任务,阻塞线程,直到可以从阻塞队列拿到任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` 线程池里的线程从阻塞队列里那任务 1、如果存在非核心线程,假设阻塞队列里没有任务,那么非核心线程需要等到keepAliveTime后才会释放。 2、如果只有核心线程,且允许核心线程释放,那么等到keepAliveTime后才会释放。 3、只有核心线程,且不允许核心线程释放,那么会通过take阻塞队列,直到可以从队列拿到任务。 参考文章: [彻底弄懂 Java 线程池原理](https://juejin.im/post/5c33400c6fb9a049fe35503b) [Java线程池(ThreadPoolExecutor)原理分析与使用](https://blog.csdn.net/fuyuwei2015/article/details/72758179) [线程池ThreadPoolExecutor实现原理](https://juejin.im/post/5aeec0106fb9a07ab379574f) [关于线程池的这 8 个问题你都能答上来吗?](https://url.cn/5J45up8)