企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## 常见线程池 ### newFixedThreadPool ~~~ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } ~~~ 线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程。 FixedThreadPool的工作队列为无界队列LinkedBlockingQueue(队列容量为Integer.MAX\_VALUE), 这会导致以下问题:  * 线程池里的线程数量不超过corePoolSize,这导致了maximumPoolSize和keepAliveTime将会是个无用参数  * 由于使用了无界队列, 所以FixedThreadPool永远不会拒绝, 即饱和策略失效 ### newSingleThreadExecutor ~~~ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } ~~~ 线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行. 由于使用了无界队列, 所以SingleThreadPool永远不会拒绝, 即饱和策略失效 ### newCachedThreadPool ~~~ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } ~~~ 1. 线程池的线程数可达到Integer.MAX\_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列; 2. 和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销; 执行过程与前两种稍微不同:  (1) 主线程调用SynchronousQueue的offer()方法放入task, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue的task, 即调用了SynchronousQueue的poll(), 那么主线程将该task交给空闲线程. 否则执行(2)  (2) 当线程池为空或者没有空闲的线程, 则创建新的线程执行任务.  (3) 执行完任务的线程倘若在60s内仍空闲, 则会被终止. 因此长时间空闲的CachedThreadPool不会持有任何线程资源. ## 构造函数三个参数 ### BlockingQueue workQueue 用来保存等待被执行的任务的阻塞队列. 在JDK中提供了如下阻塞队列:  1. ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;  2. LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;  3. SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;  4. priorityBlockingQuene:具有优先级的无界阻塞队列; LinkedBlockingQueue比ArrayBlockingQueue在插入删除节点性能方面更优,但是二者在put(), take()任务的时均需要加锁,SynchronousQueue使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer(). ### ThreadFactory threadFactory 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory ### RejectedExecutionHandler handler 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:  1. AbortPolicy:直接抛出异常,默认策略; 2. CallerRunsPolicy:用调用者所在的线程来执行任务; 3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; 4. DiscardPolicy:直接丢弃任务;  当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。 ## 内部状态以及变量ctl ~~~ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } ~~~ 其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:  1、RUNNING:-1 << COUNT\_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;  2、SHUTDOWN: 0 << COUNT\_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;  3、STOP : 1 << COUNT\_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;  4、TIDYING : 2 << COUNT\_BITS,即高3位为010, 所有的任务都已经终止;  5、TERMINATED: 3 << COUNT\_BITS,即高3位为011, terminated()方法已经执行完成  ## execute –> addWorker –>runworker (getTask)  ~~~ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } ~~~ 1. 如果运行的线程少于corepoolsize,请尝试用给定的命令作为第一个线程启动新线程任务。对AddWorker的调用自动检查运行状态和WorkerCount,这样可以防止错误的警报不应该的时候线程,返回false。 2. 如果一个任务可以成功地排队,那么我们仍然需要再次检查是否应该添加一个线程(因为自上次检查以来已有的线程已经停止了),或者自进入此方法以来,池已关闭。所以我们重新检查状态,必要时回滚排队(如果停止),或者启动新线程(如果没有)。 3. 如果我们不能将任务排队,那么我们尝试添加一个新线程。如果失败了,我们就知道我们被关闭或者饱和了,所以拒绝这个任务。 //todo