企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ### Executor Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期。这里的异步是指多个任务的执行互不干扰,不需要进行同步操作。 主要有三种 Executor: * CachedThreadPool:一个任务创建一个线程; * FixedThreadPool:所有任务只能使用固定大小的线程; * SingleThreadExecutor:相当于大小为 1 的 FixedThreadPool。 ~~~java public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { executorService.execute(new MyRunnable()); } executorService.shutdown(); } ~~~ **为什么引入Executor线程池框架?** new Thread() 的缺点 * 每次 new Thread() 耗费性能 * 调用 new Thread() 创建的线程缺乏管理,被称为野线程,而且可以无限制创建,之间相互竞争,会导致过多占用系统资源导致系统瘫痪。 * 不利于扩展,比如如定时执行、定期执行、线程中断 采用线程池的优点 * 重用存在的线程,减少对象创建、消亡的开销,性能佳 * 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞 * 提供定时执行、定期执行、单线程、并发数控制等功能 <br> ## **线程池实现原理** Java里面线程池的顶级接口是**Executor**,但是严格意义上讲Executor并不是一个线程池,而 只是一个执行线程的工具。真正的线程池接口是**ExecutorService**。 > 蘑菇街面试,设计一个线程池 :-: ![](https://box.kancloud.cn/0ed46a8dce0dc55a3b1501819c4ba1e3_1304x480.jpg) ### 并发队列 **入队** 非阻塞队列:当队列中满了时候,放入数据,数据丢失 阻塞队列:当队列满了的时候,进行等待,什么时候队列中有出队的数据,那么第11个再放进去 **出队** 非阻塞队列:如果现在队列中没有元素,取元素,得到的是null 阻塞队列:等待,什么时候放进去,再取出来 线程池使用的是阻塞队列 ### 线程池概念 线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处: 1. 降低资源消耗; 2. 提高响应速度; 3. 提高线程的可管理性。 Java1.5 中引入的 Executor 框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行、被哪个线程执行,以及什么时候执行。 ### Executor类图 [![](https://github.com/frank-lam/fullstack-tutorial/raw/master/notes/JavaArchitecture/assets/820628cf179f4952812da4e8ca5de672.png)](https://github.com/frank-lam/fullstack-tutorial/blob/master/notes/JavaArchitecture/assets/820628cf179f4952812da4e8ca5de672.png) ### 线程池工作原理 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果阻塞队列满了,那就创建新的线程执行当前任务;直到线程池中的线程数达到 maxPoolSize,这时再有任务来,只能执行 reject() 处理该任务。 ## 常用的几个线程池 ### 初始化线程池 **newFixedThreadPool()** 说明:**初始化一个指定线程数的线程池**,其中 corePoolSize == maxiPoolSize,使用 **LinkedBlockingQuene** 作为阻塞队列 特点:即使当线程池没有可执行任务时,也不会释放线程。 **newCachedThreadPool()** 说明:**初始化一个可以缓存线程的线程池**,默认缓存60s,线程池的线程数可达到 Integer.MAX\_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列; 特点:在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源;当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销; 因此,使用时要注意控制并发的任务数,防止因创建大量的线程导致而降低性能。 **newSingleThreadExecutor()** 说明:**初始化只有一个线程的线程池**,内部使用 LinkedBlockingQueue 作为阻塞队列。 特点:如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行 **newScheduledThreadPool()** 特点:初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。 ### FixThreadPool 固定线程池 FixThreadPool :可重用固定线程数的线程池。 ~~~ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } ~~~ **执行机制 :** * 若当前运行的线程数小于 `corePoolSize`,来新任务时,就创建新的线程来执行任务; * 当前运行的线程数等于 `corePoolSize` 后,如果再来新任务的话,会将任务加到 `LinkedBlockingQueue`; * 线程池中的线程执行完手头的工作后,会在循环中反复从 `LinkedBlockingQueue` 中获取任务来执行。 FixThreadPool 使用的是无界队列 `LinkedBlockingQueue`(队列容量为 Integer.MAX\_VALUE),而它会给线程池带来如下**影响 :** * 当线程池中的线程数达到 `corePoolSize` 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 `corePoolSize`; * 由于使用的是一个无界队列,所以 `maximumPoolSize` 将是一个无效参数,因为不可能存在任务队列满的情况,所以 FixedThreadPool 的 `corePoolSize`、`maximumPoolSize` 被设置为同一个值,且 `keepAliveTime` 将是一个无效参数; * 运行中的 FixedThreadPool(指未执行 `shutdown()` 或 `shutdownNow()` 的)不会拒绝任务,因此在任务较多的时候可能会导致 OOM。 ### SingleThreadExecutor 单一线程池 SingleThreadExecutor 是只有一个线程的线程池。 ~~~ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } ~~~ 除了池中只有一个线程外,其他和 FixThreadPool 是基本一致的。 ### CachedThreadPool 缓存线程池 CachedThreadPool 是一个会根据需要创建新线程的线程池,但会在先前构建的线程可用时重用它。 ~~~ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } ~~~ 其 `corePoolSize` 被设置为 0,`maximumPoolSize` 被设置为 Integer.MAX.VALUE,也就是无界的。虽然是无界,但由于该线程池还存在一个销毁机制,即如果一个线程 60 秒内未被使用过,则该线程就会被销毁,这样就节省了很多资源。 但是,如果主线程提交任务的速度高于 `maximunPool` 中线程处理任务的速度,CachedThreadPool 将会源源不断地创建新的线程,从而依然可能导致 CPU 耗尽或内存溢出。 执行机制 : * 首先执行 `offer` 操作,提交任务到任务队列。若当前 maximumPool 中有空闲线程正在执行 `poll` 操作,且主线程的 `offer` 与空闲线程的 `poll` 配对成功时,主线程将把任务交给空闲线程执行,此时视作 `execute()` 方法执行完成;否则,将执行下面的步骤。 * 当初始 `maximum` 为空,或 `maximumPool` 中没有空闲线程时,将没有线程执行 `poll` 操作。此时,CachedThreadPool 会创建新线程执行任务,`execute()` 方法执行完成。 ### ScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下: ~~~ ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(new Runnable() { @Override public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS); ~~~ 表示延迟3秒执行。 ~~~ scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS); ~~~ 表示延迟1秒后每3秒执行一次。 ScheduledExecutorService比Timer更安全,功能更强大 #### 原理 ![](https://user-gold-cdn.xitu.io/2017/12/18/16069204d8d7c9e3?w=464&h=325&f=jpeg&s=23453) * 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:scheduledAtFixedRate, scheduledWithFixedDelay * 它采用DelayQueue存储等待的任务 DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序; DelayQueue也是一个无界队列; 工作线程会从DelayQueue取已经到期的任务去执行; 执行结束后重新设置任务的到期时间,再次放回DelayQueue #### 初始化方法 ~~~java // 使用Executors静态方法进行初始化 ExecutorService service = Executors.newSingleThreadExecutor(); // 常用方法 service.execute(new Thread()); service.submit(new Thread()); service.shutDown(); service.shutDownNow(); ~~~ ### 常用方法 #### execute与submit的区别 1. 接收的参数不一样 2. submit有返回值,而execute没有 用到返回值的例子,比如说我有很多个做 validation 的 task,我希望所有的 task 执行完,然后每个 task 告诉我它的执行结果,是成功还是失败,如果是失败,原因是什么。然后我就可以把所有失败的原因综合起来发给调用者。 3. submit方便Exception处理 如果你在你的 task 里会抛出 checked 或者 unchecked exception,而你又希望外面的调用者能够感知这些 exception 并做出及时的处理,那么就需要用到 submit,通过捕获 Future.get 抛出的异常。 #### shutDown与shutDownNow的区别 当线程池调用该方法时,线程池的状态则立刻变成 SHUTDOWN 状态。此时,则不能再往线程池中添加任何任务,否则将会抛出 RejectedExecutionException 异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。 ### 内部实现 ~~~java public ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 线程存活时间(在 corePore<*<maxPoolSize 情况下有用) TimeUnit unit, // 存活时间的时间单位 BlockingQueue<Runnable> workQueue // 阻塞队列(用来保存等待被执行的任务) ThreadFactory threadFactory, // 线程工厂,主要用来创建线程; RejectedExecutionHandler handler // 当拒绝处理任务时的策略 ){ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } ~~~ 关于 workQueue 参数,有四种队列可供选择: * ArrayBlockingQueue:基于数组结构的有界阻塞队列,按 FIFO 排序任务; * LinkedBlockingQuene:基于链表结构的阻塞队列,按 FIFO 排序任务; * SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 ArrayBlockingQuene; * PriorityBlockingQuene:具有优先级的无界阻塞队列; 关于 handler 参数,线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略: * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) * ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。 ### 线程池的状态 ~~~java private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); ~~~ 其中 AtomicInteger 变量 ctl 的功能非常强大:利用低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态: * **RUNNING**:-1 << COUNT\_BITS,即高 3 位为 111,该状态的线程池会接收新任务,并处理阻塞队列中的任务; * **SHUTDOWN**: 0 << COUNT\_BITS,即高 3 位为 000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务; * **STOP**: 1 << COUNT\_BITS,即高 3 位为 001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务; * **TIDYING**: 2 << COUNT\_BITS,即高 3 位为 010,该状态表示线程池对线程进行整理优化; * **TERMINATED**: 3 << COUNT\_BITS,即高 3 位为 011,该状态表示线程池停止工作; ### 线程池其他常用方法 如果执行了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有核心线程。 ThreadPoolExecutor 提供了动态调整线程池容量大小的方法:setCorePoolSize() 和 setMaximumPoolSize()。 ### 为什么推荐使用 ThreadPoolExecutor 来创建线程或者说为什么不推荐直接用Executors来创建线程池? 规约一 :线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。 > 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。 规约二 :强制线程池不允许使用 `Executors` 去创建,而是通过 `ThreadPoolExecutor` 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 > Executors 返回线程池对象的弊端如下: > > `FixedThreadPool` 和 `SingleThreadExecutor` : 允许请求的**队列长度**为 Integer.MAX\_VALUE,可能会**堆积大量请求**,从而导致 OOM。 > > `CachedThreadPool` 和 `ScheduledThreadPool` : 允许创建的**线程数量**为 Integer.MAX\_VALUE,可能会**创建大量线程**,从而导致 OOM。 ## 如何拟定线程池的大小? ### 上下文切换 多线程变编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用。为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。 概括来说就是,当前任务在执行完 CPU 时间片切换到另一个任务之前,会先保存自己的状态,以便下次再切换回这个任务时,可以直接加载到上次的状态。任务从保存到再加载的过程就是一次上下文切换。 上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。 > Linux 相比与其他操作系统(包括其他类 Unix 系统)有许多,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。 > ### 简单的拟定判断 **CPU 密集型任务(N+1):** 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。 **I/O 密集型任务(2N):** 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。 美团技术团队深入解析线程池原理:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html