## 32 请按到场顺序发言—Completion Service详解
> 时间像海绵里的水,只要你愿意挤,总还是有的。
> ——鲁迅
讲解 CompletionService 之前,我们先回忆一下 ExcutorSevice。ExcutorService 实现了通过线程池来并发执行任务。其中有一种方式是通过线程池执行 Callable 任务,然后通过 Future 获取异步执行的结果,如下面的代码:
~~~java
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
Callable callable1 = () -> {
Thread.sleep(10000);
return "任务1完成";
};
Callable callable2 = () -> {
Thread.sleep(5000);
return "任务2完成";
};
Future future1 = executor.submit(callable1);
Future future2 = executor.submit(callable2);
System.out.println(future1.get());
System.out.println(future2.get());
}
~~~
任务一执行需要 10 秒,任务二执行只需要 5 秒。但是当执行到 future1.get () 时,主线程会被阻塞。等待 10 秒后第一个任务执行完才会去获取第二个任务。然后执行和第二个任务相关的打印操作。大家有没有看出问题?任务 2 明明在 5 秒前就已经执行完成,却不能立刻打印。主线程阻塞在任务一结果的获取上。这样程序执行的效率并不高。如果任务完成后能够立刻被取得执行结果,然后执行后面的逻辑,效率就会有显著的提升。今天我们要讲解的 CompletionService 就是用来做这个事情的。CompletionService 可以按照执行完成结果的到场顺序,被主线程获取到,从而继续执行后面逻辑。
## 1、了解 CompletionService
了解一个类最好、最快的方法就是阅读源代码的注解。而大多数人通常的做法却是去百度或者 google。这样有两个弊端,一是效率并不一定高,可能搜出来很多无用的内容。二是看到的文章并不权威,甚至可能是错的。有的同学可能觉得英文阅读费劲,其实作为开发人员,英语阅读已经是必备技能。这就如同你要熟知 IDE 的快捷键一样,所以如果觉得英文阅读困难,可以刻意练习。其实多读一些技术文档,会发现用词基本都是类似的。
扯的有点远,我们收回来,先看看源代码中对 CompletionService 的解释:
对异步任务执行和执行结果消费解藕。生产者提交任务执行。消费者则获取完成的任务,然后按照完成任务的顺序对任务结果进行处理。
官方的解释是不是十分简洁明了?
# 2、使用 CompletionService
下面我们使用 CompletionService 实现一个吃苹果的程序。首先我声明一个流,里面是一些水果,每个水果会对应一个洗干净的任务。然后主线程拿到洗干净的水果再一个个吃掉。代码如下:
~~~java
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<String> service = new ExecutorCompletionService<String>(pool);
Stream.of("苹果", "梨", "葡萄", "桃")
.forEach(fruit -> service.submit(() -> {
if(fruit.equals("苹果")){
TimeUnit.SECONDS.sleep(6);
}else if(fruit.equals("梨")){
TimeUnit.SECONDS.sleep(1);
}else if(fruit.equals("葡萄")){
TimeUnit.SECONDS.sleep(10);
}else if(fruit.equals("桃")){
TimeUnit.SECONDS.sleep(3);
}
return "洗干净的"+fruit;
})
);
String result;
while((result=service.take().get())!=null){
System.out.println("吃掉"+result);
}
}
~~~
可以看到有四种水果。会为每个水果启一个洗水果的任务。每种水果洗的时间不同,其中葡萄最不好洗要 10 秒,而梨最好洗,只需要 1 秒。等待水果洗好后,主线程通过 service.take () 取得执行完成的 Future,然后从里面 get 出返回值,把洗干净的水果吃掉。
我们可以看到输出如下:
~~~
吃掉洗干净的梨
吃掉洗干净的桃
吃掉洗干净的苹果
吃掉洗干净的葡萄
~~~
可以看到哪个水果先洗干净就会先被吃掉。这也证明了 service.take () 的顺序是任务的完成顺序,而不是任务提交的顺序。
通过 CompletionService 我们就可以一端生产,另一端按照完成的顺序进行消费。这避免提交大量任务时,不知道哪个任务先完成,从而在调用 Future 的 get 方法时产生阻塞。使用 CompletionService,永远都是完成一个返回一个,然后消费一个。这样你的程序才更为高效。
主线程收到返回后,可以再继续使用 CompletionService 来异步执行下一步的逻辑,这和非阻塞的编程方式异曲同工。
## 3、CompletionService 源码分析
### 3.1 CompletionService 构造方法
我们先看如何初始化 CompletionService:
~~~java
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<String> service = new ExecutorCompletionService<String>(pool);
~~~
首先初始化 ExecutorService,在构造 ExecutorCompletionService 时作为参数传入。其实 CompletionService 对任务的执行其实就是借助于 ExecutorService 来完成的。接下来我们进入它的构造函数:
~~~java
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
~~~
构造函数中构造了三个属性:
~~~java
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
~~~
executor 就是你传入的 ExecutorService,用来执行任务。
aes 的作用是创建新的 task。它的初始化过程比较有意思,判断了是否为 AbstractExecutorService 的实例。至于为什么这么做,我们后面再详细讲解。
completionQueue 是一个存放 Future 的阻塞队列,并且是无界的。这意味着如果源头不断的产生 Future,但是没有去消费,就会造成内存泄漏。
executor 执行完成的 Future 会被放入 completionQueue 中,take 方法将会从
completionQueue 中取得最新的 future 对象(最近执行完的 task 的结果)。
### 3.2 CompletionService 的 submit 方法
~~~java
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
~~~
首先将 Callable 类型的 task 转为 RunnableFuture 类型。RunnableFuture 是个接口,FutureTask 是其一种实现。
然后通过 new QueueingFuture (f),再将 RunnableFuture 包装为 QueueingFuture 类型的对象。QueueingFuture 的作用就是在 Future 完成时,加入到 completionQueue 中。
我们先看 newTaskFor 的源码:
~~~java
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
~~~
如果 aes 为空,那么直接 new FutureTask。如果不为空则调用 aes 的 newTaskFor 方法。什么情况 aes 会为空呢?我们再看下 aes 初始化的代码:
~~~java
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
~~~
当传入的 executor 为 AbstractExecutorService 类型时,那么 aes 不为空。否则 aes 为空。这两处逻辑处理是相关联的,这么做的原因如下:
1、如果 executor 是 AbstractExecutorService 的子类,有可能会重写 newTaskFor 方法,所以这里优先使用 executo r 的方法来创建 Task,这样后面通过 executor 执行 task 才能正确。比如 ForkJoinPool 就对 newTaskFor 方法进行了重写;
2、如果 executor 不是继承自 AbstractExecutorService。那么它可能并没有 newTaskFor 方法。所以需要 CompletionService 自己来创建 FutureTask。
这样看来 aes 的存在,只是为了尽量使用 executor 提供的 newTaskFor 方法来创建 task,以使后面 excute 方法能够正常运行。
接下来我们分析 QueueingFuture 方法:
~~~java
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
~~~
QueueingFuture 是内部静态类,并且是 FutureTask 的子类。他只是重写了 done 方法。大家回忆上一节对 Future 的分析,应该还记得 done 方法在任务执行结果返回后被调用,但是留给子类来实。这里就用上了这个特性。done 方法里面做的就是把 task 加入阻塞队列中。这意味着,先完成的 task 会先把自己的 Future 放入队列中。那么当然也会被 take 方法先取到。而由于是阻塞队列,所以 take 方法取不到 task 时,就会阻塞。但由于能被 take 到的 task 肯定已经有了返回值,所以调用 task 的 get 方法时就不会再次阻塞了。也就是说 client 代码中的下面一行只会在 take 时发生阻塞:
~~~java
while((result=service.take().get())!=null){
System.out.println("吃掉"+result);
}
~~~
executor 执行任务的代码就不用再次分析了,这在之前学习 Executor 的时候已经详细分析过了。submit 方法分析完后我们再来看看 take 方法。
### 3.3 CompletionService 的 take 方法
相比较 submit 方法,take 方法就更为简单了,如下:
~~~java
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
~~~
只有一行代码,就是从 completionQueue 中取得 Futrue 对象。由于 completionQueue 是阻塞队列,当没有 Future 时,就会阻塞在此。而 completionQueue 中保存 Future 的顺序是完成顺序。
## 4、 总结
CompletionService 给我们提供了一种非阻塞的异步执行方式。让程序更为高效。他的实现非常的简单和巧妙,值得我们借鉴。其实我们学习到这里,不知道你是否有这种体会,这些工具实际上就是我们之前学习内容的组合运用,如果前面你掌握的很牢固,学习起来一点也不费劲。如果前面就似懂非懂,那么就会越看越糊涂。其实我们在学习上至少有一半的时间都是在打基础,但这个过程必不可少,并且受益更为深远。
- 前言
- 第1章 Java并发简介
- 01 开篇词:多线程为什么是你必需要掌握的知识
- 02 绝对不仅仅是为了面试—我们为什么需要学习多线程
- 03 多线程开发如此简单—Java中如何编写多线程程序
- 04 人多力量未必大—并发可能会遇到的问题
- 第2章 Java中如何编写多线程
- 05 看若兄弟,实如父子—Thread和Runnable详解
- 06 线程什么时候开始真正执行?—线程的状态详解
- 07 深入Thread类—线程API精讲
- 08 集体协作,什么最重要?沟通!—线程的等待和通知
- 09 使用多线程实现分工、解耦、缓冲—生产者、消费者实战
- 第3章 并发的问题和原因详解
- 10 有福同享,有难同当—原子性
- 11 眼见不实—可见性
- 12 什么?还有这种操作!—有序性
- 13 问题的根源—Java内存模型简介
- 14 僵持不下—死锁详解
- 第4章 如何解决并发问题
- 15 原子性轻量级实现—深入理解Atomic与CAS
- 16 让你眼见为实—volatile详解
- 17 资源有限,请排队等候—Synchronized使用、原理及缺陷
- 18 线程作用域内共享变量—深入解析ThreadLocal
- 第5章 线程池
- 19 自己动手丰衣足食—简单线程池实现
- 20 其实不用造轮子—Executor框架详解
- 第6章 主要并发工具类
- 21 更高级的锁—深入解析Lock
- 22 到底哪把锁更适合你?—synchronized与ReentrantLock对比
- 23 按需上锁—ReadWriteLock详解
- 24 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap上
- 25 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap下
- 26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒数计时开始,三、二、一—CountDownLatch详解
- 28 人齐了,一起行动—CyclicBarrier详解
- 29 一手交钱,一手交货—Exchanger详解
- 30 限量供应,不好意思您来晚了—Semaphore详解
- 第7章 高级并发工具类及并发设计模式
- 31 凭票取餐—Future模式详解
- 32 请按到场顺序发言—Completion Service详解
- 33 分阶段执行你的任务-学习使用Phaser运行多阶段任务
- 34 谁都不能偷懒-通过 CompletableFuture 组装你的异步计算单元
- 35拆分你的任务—学习使用Fork/Join框架
- 36 为多线程们安排一位经理—Master/Slave模式详解
- 第8章 总结
- 37 结束语