## 29 一手交钱,一手交货—Exchanger详解
> 世上无难事,只要肯登攀
> ——毛泽东
Exchanger 从字面来看是一个交换器,它用来在线程间交换数据。如果两个线程并行处理,但在某个时刻需要互相交换自己已经处理完的中间数据,然后才能继续往下执行。这个时候就可以使用 Exchanger。
两个线程不用谋面,但是通过 Exchanger 就能实现数据交换,这是如何做到的呢?我们可以认为 Exchanger 有两个槽位,线程可以向其中放入自己想要交换的数据,然后阻塞在此,等待其它线程填充另外的槽位。等两个槽位都填充数据后,Exchanger 会把槽位调换位置,让线程取到另外线程放入的数据。如下图所示:
![图片描述](https://img.mukewang.com/5dd5fb5f00016d5b16000815.jpg)
前面的描述只是为了帮助理解,其实Exchanger 真实的设计更为复杂。下面我们先通过一个简单的例子来看看如何使用 Exchanger。
## 1、Exchanger 的使用
我们设想如下场景:如果今天上午你需要给你女朋友快递一部手机,但是突然你要去一个紧急的会议。于是你把快递的东西留给你的同事。当快递取件员上门时,他会把快递底单给你同事,你同事则会把快递给取件员。然后你的同事再把快递底单给你。
这个场景里,你和快递员就是两个不同的线程。而你的同事是 Exchanger,负责你和快递员间进行物品交换。我们看看代码应该如何编写:
~~~java
public class ExchangerClient {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> workmate = new Exchanger<>();
Thread michael = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm Michael and want to delivery a phone to my friend");
System.out.println(threadName+": There is an emergency meeting");
System.out.println(threadName+": I give the phone to my workmate.");
System.out.println(threadName+": He will help me to delivery the phone and return the express document to me");
String expressDocument = workmate.exchange("---phone---");
System.out.println(threadName+": I got the express document"+ expressDocument);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Michael");
Thread deliveryman = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm a deliveryman");
System.out.println(threadName+": I'm going to get Michael's express from his workmate and give the express document to his workmate");
String expressGoods = workmate.exchange("---phone express document----");
System.out.println(threadName+": I got the goods of "+expressGoods);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Deliveryman");
michael.start();
TimeUnit.MILLISECONDS.sleep(100);
deliveryman.start();
}
}
~~~
Michael 通过 Exchanger 的实例 workmate,把 phone 交给了快递员。而快递员也通过 workmate 把快递单据给了 Michael。我们看一下输出:
~~~
Michael: I'm Michael and want to delivery a phone to my friend
Michael: There is an emergency meeting
Michael: I give the phone to my workmate.
Michael: He will help me to delivery and return the express document to me
Deliveryman: I'm a deliveryman
Deliveryman: I'm going to get Michael's express from his workmate and give the express document to his workmate
Deliveryman: I got the goods of ---phone---
Michael: I got the express document---phone express document----
~~~
可以看出 Michael 在调用 workmate.exchange("—phone—") 后被阻塞。而当 deliveryman 调用 workmate.exchange("—phone express document——") 后,两个线程才继续往下走,并且进行了数据交换。 Michael 发出了货物 “—phone—”,得到了 “—phone express document----”。而 deliveryman 则得到了 “—phone—”,发出了 “—phone express document——”。
两个线程通过 Exchanger 实现了数据的交换。
## 2、多于两个线程使用 Exchanger
Exchanger 支持两个线程数据互换,那么你有没有想过多于两个线程使用 Exchanger 做数据交换会怎么样呢?我们把上买呢例子改一下,假如另外一个同事 Green 也想通过 workmate 协助发送快递,那么会怎么样?
~~~java
public static void main(String[] args) throws InterruptedException {
Exchanger<String> workmate = new Exchanger<>();
Thread michael = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm Michael and want to delivery a phone to my friend");
System.out.println(threadName+": There is an emergency meeting");
System.out.println(threadName+": I give the phone to my workmate.");
System.out.println(threadName+": He will help me to delivery the phone and return the express document to me");
String expressDocument = workmate.exchange("---phone---");
System.out.println(threadName+": I got the express document"+ expressDocument);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Michael");
Thread green = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm green and want to delivery a macbook to my friend");
System.out.println(threadName+": There is an emergency meeting");
System.out.println(threadName+": I give the macbook to my workmate.");
System.out.println(threadName+": He will help me to delivery the macbook and return the express document to me");
String expressDocument = workmate.exchange("---macbook---");
System.out.println(threadName+": I got the express document"+ expressDocument);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Green");
Thread deliveryman = new Thread(() -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+": I'm a deliveryman");
System.out.println(threadName+": I'm going to get Michael's express from his workmate and give the express document to his workmate");
String expressGoods = workmate.exchange("---phone express document----");
System.out.println(threadName+": I got the goods of "+expressGoods);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Deliveryman");
michael.start();
TimeUnit.MILLISECONDS.sleep(100);
green.start();
TimeUnit.MILLISECONDS.sleep(100);
deliveryman.start();
}
~~~
我们加入 Green 线程,并且在 Michael 线程启动后,先启动 Green 线程。输出如下:
~~~
Michael: I'm Michael and want to delivery a phone to my friend
Michael: There is an emergency meeting
Michael: I give the phone to my workmate.
Michael: He will help me to delivery the phone and return the express document to me
Green: I'm green and want to delivery a macbook to my friend
Green: There is an emergency meeting
Green: I give the macbook to my workmate.
Green: He will help me to delivery the macbook and return the express document to me
Green: I got the express document---phone---
Michael: I got the express document---macbook---
Deliveryman: I'm a deliveryman
Deliveryman: I'm going to get Michael's express from his workmate and give the express document to his workmate
~~~
可见 Michael 和 Green 进行了数据交换,双方都没有和 Deliveryman 做数据交换。结果是双方的快递都没送到快递员手中,而快递员由于没有收到快递,所以一直被阻塞住了。
根据以上实验结果我们总结一下:
1、Exchanger 支持多个线程做数据交换;
2、多个线程使用同一个 Exchanger 做数据交换时,结果随机,只要凑满一对,就会进行交换。
## 3、Exchanger实现分析
Exchanger 虽然使用起来很简单,但是其源码还是比较复杂的。下面我们来分析它实现的原理。
Exchanger 类中的注解很详细地阐述了其实现原理,并且用一段伪代码描述了它的原理:
~~~java
for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}
~~~
可以看到 Exhcanger 使用 slot 作为容器,保存数据 item。当 A 线程进入时,如果发现 slot 是空的,则把 item 放入 node,然后放入 slot。A 线程此时开始阻塞。B 线程进入后发现 slot 不为空,则从 slot 中取出node,把 slot 置空。此线程从 node 中取出数据,并且把自己的数据放入 node 中,然后通知阻塞的线程恢复,阻塞线程恢复后从 node 中取出数据。从而实现了数据的交换。
Java5 的时候,采用的就是这种算法。一般来说没有什么问题。但是当大量并发时,会存在激烈的竞争。于是在Java6 开始,加入 slot 数组,让不同线程使用不同的 slot,提升并发的性能。不过在没有并发的时候还是使用一个 slot 来做交换。
我们看下核心的 exchange 方法代码:
~~~java
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
~~~
arena 为 slot 数组。可以看到 arena 为 null 时,还是使用的 slot 交换算法 slotExchange。如果 arena 不为 null进入 if 的 && 的第二个条件判断。在判断中执行了 arenaExchange,此时通过 arena 也就是 slot 数组完成交换。
具体的算法不再详述,和上面伪代码的思想是一致的。
## 4、总结
Exchanger 在我们实际开发中使用不多,但是我们也应该了解其用法和原理,以便在合适场景出现时,我们能识别出应该采用 Exchanger。在 Exchanger 类的注解中,可以看到它适用于基因算法。因为不同基因的搭配测试就应该是随机进行交换的。此外还适用于管道的设计(pipeline design),让数据在管道中做交换。
}
- 前言
- 第1章 Java并发简介
- 01 开篇词:多线程为什么是你必需要掌握的知识
- 02 绝对不仅仅是为了面试—我们为什么需要学习多线程
- 03 多线程开发如此简单—Java中如何编写多线程程序
- 04 人多力量未必大—并发可能会遇到的问题
- 第2章 Java中如何编写多线程
- 05 看若兄弟,实如父子—Thread和Runnable详解
- 06 线程什么时候开始真正执行?—线程的状态详解
- 07 深入Thread类—线程API精讲
- 08 集体协作,什么最重要?沟通!—线程的等待和通知
- 09 使用多线程实现分工、解耦、缓冲—生产者、消费者实战
- 第3章 并发的问题和原因详解
- 10 有福同享,有难同当—原子性
- 11 眼见不实—可见性
- 12 什么?还有这种操作!—有序性
- 13 问题的根源—Java内存模型简介
- 14 僵持不下—死锁详解
- 第4章 如何解决并发问题
- 15 原子性轻量级实现—深入理解Atomic与CAS
- 16 让你眼见为实—volatile详解
- 17 资源有限,请排队等候—Synchronized使用、原理及缺陷
- 18 线程作用域内共享变量—深入解析ThreadLocal
- 第5章 线程池
- 19 自己动手丰衣足食—简单线程池实现
- 20 其实不用造轮子—Executor框架详解
- 第6章 主要并发工具类
- 21 更高级的锁—深入解析Lock
- 22 到底哪把锁更适合你?—synchronized与ReentrantLock对比
- 23 按需上锁—ReadWriteLock详解
- 24 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap上
- 25 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap下
- 26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒数计时开始,三、二、一—CountDownLatch详解
- 28 人齐了,一起行动—CyclicBarrier详解
- 29 一手交钱,一手交货—Exchanger详解
- 30 限量供应,不好意思您来晚了—Semaphore详解
- 第7章 高级并发工具类及并发设计模式
- 31 凭票取餐—Future模式详解
- 32 请按到场顺序发言—Completion Service详解
- 33 分阶段执行你的任务-学习使用Phaser运行多阶段任务
- 34 谁都不能偷懒-通过 CompletableFuture 组装你的异步计算单元
- 35拆分你的任务—学习使用Fork/Join框架
- 36 为多线程们安排一位经理—Master/Slave模式详解
- 第8章 总结
- 37 结束语