## 36 为多线程们安排一位经理—Master/Slave模式详解
> 没有引发任何行动的思想都不是思想,而是梦想。
> —— 马丁
前文我们讲过 ForkJoinPool 是分而治之的思想。今天我们将要学习的 Master/Slave 也是同样的思想。其中 Master 负责承接一个大的任务,然后它会根据一定策略把大任务拆散为若干个小任务,然后随机分发给一组 Slave。每个 Slave 完成任务后上报自己的任务完成情况。当所有 Slave 都完成了自己的任务时,Master 也就完成了自己的任务。Master 就像是 Slave 的经理,把自己的任务分发下去,而 Slave 则在完成工作后向它汇报。
## 1、Master/Slave 模式设计
### 1.1 Master 设计
在 Master/Slave 模式中,一个 Master 持有一组 Slave 的引用。Master 对外暴露一个承接任务的方法 startTask。这是 Master 的主要方法,在内部做了如下事情:
1. 创建 slave
由于创建 Slave 线程并启动的操作比较重,所以放到提交任务的时候才真正去做;
2. 分发任务
把 Task 进行拆分,然后分发给每个 Slave;
3. 等待处理结果
轮循检查任务是否全部完成,全部完成结束轮循;
4. 返回处理结果
返回任务执行结果。
可以看到这四个方法逻辑十分的清晰。
### 1.2 Slave 设计
下面我们再看看 Slave 的设计:
Slave 继承自 Thread。内部通过阻塞队列 BlockingQueue 保存 Task。这样在取任务时候如果已经没有,则会阻塞等待。它有一个 submitTask 用来提交子任务,这个方法在 Master 分发任务时会被调用。此外还有 run 方法从 BlockingQueue 中取得任务执行。执行结束后通知 Master。
以上的设计并不是固定的模式。但 Master 接收任务,分割任务,派发任务这些功能是要有的,此外 Master 要有能力知道所有子任务都被执行完毕。而 Slave 则需要不断承接子任务,并且执行。执行完毕能够把执行结果回写给 Master。设计如下图:
![图片描述](https://img.mukewang.com/5e01ac580001105815280932.jpg)
其实说这么多,不如直接看代码。下面我们就通过一个小例子,来感受一下 Master/Slave 模式。
## 2、Master/Slave 代码示例
### 2.1 Client 代码
不知道你是否还记得本专栏开始几节反复用来举例的单词抄写的需求。本节是正文最后一篇,正好我们回到最初的例子,用 Master/Slave 方式来实现它。我们这次先看 Client 的代码:
~~~java
public class Client {
public static void main(String[] args) throws InterruptedException {
Task task = new Task(123,"internationalization");
Master master = new Master();
master.startTask(task);
master.printResult();
}
}
~~~
特别的简单,创建一个单词抄写的 Task,然后通过 Master 来执行,最后打印执行结果。
### 2.2 Task 代码
Task 代码如下,省略了 get 方法 :
~~~java
public class Task {
//要抄写的次数
private int copyCount;
//抄写的序号开始
private int from;
//抄写的序号结束
private int to;
//要抄写的单词
private String word;
public Task(int copyCount, int from, int to, String word) {
this.copyCount = copyCount;
this.word = word;
this.from = from;
this.to = to;
}
public Task(int copyCount, String word) {
this.copyCount = copyCount;
this.word = word;
this.from = 1;
this.to = copyCount;
}
}
~~~
接下来我们看看 Master 代码。
### 2.3 Master 代码
我们先来看看 Master 有哪些属性:
~~~java
//保存干活的Slave线程
private List<Slave> slaves;
//slave的数量
private static final int SLAVES_COUNT = 8;
//子任务拆分的力度
private static final int SUB_TASK_SIZE = 4;
//完成的任务数量。各个Slave线程都会更新此数量,所以使用Atomic变量
private AtomicInteger finishedTaskCount = new AtomicInteger(0);
//执行结果,key为线程名字,value为此线程完成的数量
private ConcurrentHashMap<String, Integer> results;
~~~
可以看到 Master 持有一组 slave 线程,用来为它干活。我们的任务是单词抄写,每个子任务由 SUB\_TASK\_SIZE 来控制单个小任务的抄写次数。子线程抄写完成后会更新 finishedTaskCount 和 results 做任务完成记录。
Master 对外提供了如下方法:
~~~java
//主方法,用于执行任务
public ConcurrentHashMap<String, Integer> startTask(Task task)
//子方法完成后向Master提交完成记录
public void subTaskFinished(String slaveName,int finishedSubTaskCount)
//打印执行结果
public void printResult()
~~~
这三个方法里最重要的就是 startTask,Master 主要的执行逻辑都在里面,代码如下:
~~~java
public ConcurrentHashMap<String, Integer> startTask(Task task) throws InterruptedException {
// 1 创建slave
createSlaves(this);
// 2 分发任务
splitAndAssignTask(task);
// 3 等待结果处理
checkTaskFinished(task);
// 4 返回处理结果
return results;
}
~~~
startTask 内部主要调用三个方法,最后返回执行结果。由于创建线程成本高,所以在构造 Master 时并没有创建 Slave,而是延迟到 startTask 的时候来创建。splitAndAssignTask 做的事情就是把大的 task 按照拆分逻辑拆开,分发给 slave 去执行。checkTaskFinished 会轮循检查 task 的执行情况,当全部完成时,执行下面的 return 语句。这几个方法都很重要,接下来我们一个个看。
#### 2.3.1 createSlaves 方法
~~~java
private void createSlaves(Master master) {
if(slaves.size()==0){
IntStream.range(0, this.SLAVES_COUNT).forEach(count ->
slaves.add(new Slave("slave " + count, master))
);
slaves.forEach(slave -> {
slave.start();
});
}
}
~~~
这个方法比较简单,就是创建 SLAVES\_COUNT 个 slave,然后启动起来。
#### 2.3.2 splitAndAssignTask 方法
~~~java
private void splitAndAssignTask(Task task) throws InterruptedException {
int count = task.getCopyCount();
int start = 1;
List<Task> subTasks = new ArrayList<>();
//拆分task
while (start <= count) {
int end = count + 1;
if (start + SUB_TASK_SIZE <= count) {
end = start + SUB_TASK_SIZE;
}
subTasks.add(new Task(end-start, start, end, task.getWord()));
start = end;
}
//分发subTask
for (int i = 0; i < subTasks.size(); i++) {
int slaveIndex = i % SLAVES_COUNT;
slaves.get(slaveIndex).submitTask(subTasks.get(i));
}
}
~~~
这个方法做了两件事情,一是把 task 拆分为多个 subTask。二是把 subTask 分发给 slave 去执行。subTask 中保存了要 copy 的数量,以及 copy 的 from 序号和 to 序号。当然还有要抄写的单词。
#### 2.3.3 checkTaskFinished
这个方法用来检查 task 是否全部执行完成。
~~~java
private void checkTaskFinished(Task task) throws InterruptedException {
while (true) {
if (task.getCopyCount() == finishedTaskCount.get()) {
finished();
break;
}
TimeUnit.MILLISECONDS.sleep(200);
}
}
~~~
方法中使用的轮循的方式来检查 task 的 copy 总数和已完成数量 finishedTaskCount 是否一致,如果一致则说明 task 已经全部完成,那么调用 finished 方法工作做收尾,跳出循环。
#### 2.3.4 subTaskFinished
Master 除了这几个方法还有一个方法用于子线程提交执行结果。代码如下:
~~~java
public void subTaskFinished(String slaveName,int finishedSubTaskCount) {
Integer count = results.get(slaveName);
if(count==null){
results.put(slaveName,finishedSubTaskCount);
}else{
results.put(slaveName,count+finishedSubTaskCount);
}
finishedTaskCount.getAndAdd(finishedSubTaskCount);
}
~~~
首先把执行结果放入 results,如果已经存在,则进行累计。此外更新 finishedTaskCount。
Master 的主要方法都已经介绍完毕。下面我们来看看 Slave。
### 2.4 Slave 代码
Slave 是一个工作的线程,它继承自 Thread 类,
~~~java
public class Slave extends Thread
~~~
我们先看看 Slave 的属性:
~~~java
//slave的线程名字
private String name;
//持有master引用,因为需要向master提交执行结果
private Master master;
//阻塞队列来保存task
private BlockingQueue<Task> tasks;
~~~
slave 中提供两个方法,一个是提交 task 的方法 submitTask,代码如下:
~~~java
public void submitTask(Task task) throws InterruptedException {
tasks.put(task);
}
~~~
代码很简单,只是向阻塞队列中放入 task。
Slave 执行 task 的逻辑在 run 方法中,Slave 继承自 Thread,当他启动后,run 方法就会被调用。代码如下:
~~~java
@Override
public void run() {
try {
while (true) {
Task task = tasks.take();
IntStream.range(task.getFrom(), task.getTo()).forEach(
count -> System.out.println(String.format("线程%s第%d抄写单词%s", name, count, task.getWord()))
);
master.subTaskFinished(name, task.getCopyCount());
}
} catch (InterruptedException e) {
System.out.println(String.format("线程%s被打断", name));
}
}
~~~
这段代码不断的从阻塞队列中 take 出 task。如果没有 task,就会阻塞在此。然后根据 task 内容进行输出。执行完成后调用 master 的 subTaskFinished 方法把自己的执行结果提交给 master。如果阻塞的时候被打断,则打印出日志。
## 3、执行结果分析
在 Client 的 main 方法中我们声明了一个 task = new Task (123,“internationalization”),抄写 internationalization 单词 123 次。运行后输出如下:
~~~
线程slave 1第5抄写单词internationalization
线程slave 5第21抄写单词internationalization
线程slave 4第17抄写单词internationalization
线程slave 2第9抄写单词internationalization
线程slave 5第22抄写单词internationalization
线程slave 3第13抄写单词internationalization
线程slave 0第1抄写单词internationalization
…………………
线程slave 2第107抄写单词internationalization
线程slave 0第100抄写单词internationalization
线程slave 2第108抄写单词internationalization
任务全部完成!
线程slave 4被打断
线程slave 0被打断
线程slave 6被打断
线程slave 1被打断
线程slave 7被打断
线程slave 3被打断
线程slave 5被打断
线程slave 2被打断
线程slave 0,完成了16次抄写
线程slave 7,完成了12次抄写
线程slave 5,完成了16次抄写
线程slave 6,完成了15次抄写
线程slave 3,完成了16次抄写
线程slave 4,完成了16次抄写
线程slave 1,完成了16次抄写
线程slave 2,完成了16次抄写
~~~
中间省略了一些输出。可以看到所有任务完成后 slave 线程都被打断。最后结果输出了每个线程抄写的次数,相加总和为 123。我把上面的 slave 打印日志做了统计,也是打印了 123 条。完全符合我们的预期。
## 4、总结
Master/Slave 模式是常用的多线程设计模式。一般用于大任务的拆分和分发。Master 作为门面对外暴露任务执行的接口,内部则是分发给多个 Slave 线程完成。这一切对于调用者来说是透明的。Master/Slave 模式关键点在于任务的分发和结果的汇总。它的实现方式很灵活,本文只是一种方式,也可以通过线程池来实现。子任务的计算结果也可以使用 Future。此外,分布式系统也有 Master/slave 的设计模式,可以借助 ZooKeeper 来实现。在 Akka 中使用 Actor 也能实现 Master/Slave 模式。实际使用中可以根据业务需求来自己实现。我们只需要掌握模式的核心思想,而不用拘泥于某一种具体的实现方式。
### 附完成代码
Master 代码:
~~~java
public class Master {
private List<Slave> slaves;
private static final int SLAVES_COUNT = 8;
private static final int SUB_TASK_SIZE = 4;
private AtomicInteger finishedTaskCount = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> results;
public Master() {
results = new ConcurrentHashMap<>();
slaves = new ArrayList<>();
}
public ConcurrentHashMap<String, Integer> startTask(Task task) throws InterruptedException {
// 1 创建slave
createSlaves(this);
// 2 分发任务
splitAndAssignTask(task);
// 3 等待结果处理
checkTaskFinished(task);
// 4 返回处理结果
return results;
}
private void createSlaves(Master master) {
if (slaves.size() == 0) {
IntStream.range(0, this.SLAVES_COUNT).forEach(count ->
slaves.add(new Slave("slave " + count, master))
);
slaves.forEach(slave -> {
slave.start();
});
}
}
private void splitAndAssignTask(Task task) throws InterruptedException {
int count = task.getCopyCount();
int start = 1;
List<Task> subTasks = new ArrayList<>();
while (start <= count) {
int end = count + 1;
if (start + SUB_TASK_SIZE <= count) {
end = start + SUB_TASK_SIZE;
}
subTasks.add(new Task(end - start, start, end, task.getWord()));
start = end;
}
for (int i = 0; i < subTasks.size(); i++) {
int slaveIndex = i % SLAVES_COUNT;
slaves.get(slaveIndex).submitTask(subTasks.get(i));
}
}
public void subTaskFinished(String slaveName, int finishedSubTaskCount) {
Integer count = results.get(slaveName);
if (count == null) {
results.put(slaveName, finishedSubTaskCount);
} else {
results.put(slaveName, count + finishedSubTaskCount);
}
finishedTaskCount.getAndAdd(finishedSubTaskCount);
}
private void checkTaskFinished(Task task) throws InterruptedException {
while (true) {
if (task.getCopyCount() == finishedTaskCount.get()) {
finished();
break;
}
TimeUnit.MILLISECONDS.sleep(200);
}
}
private void finished() {
System.out.println("任务全部完成!");
slaves.forEach(slave -> slave.interrupt());
slaves.clear();
}
public void printResult() {
results.forEach((key, value) ->
System.out.println(String.format("线程%s,完成了%d次抄写", key, value)));
}
}
~~~
Slave 代码:
~~~java
public class Slave extends Thread {
private String name;
private Master master;
private BlockingQueue<Task> tasks;
public Slave(String name, Master master) {
this.name = name;
this.master = master;
this.tasks = new ArrayBlockingQueue<Task>(100);
}
public void submitTask(Task task) throws InterruptedException {
tasks.put(task);
}
@Override
public void run() {
try {
while (true) {
Task task = tasks.take();
IntStream.range(task.getFrom(), task.getTo()).forEach(
count -> System.out.println(String.format("线程%s第%d抄写单词%s", name, count, task.getWord()))
);
master.subTaskFinished(name, task.getCopyCount());
}
} catch (InterruptedException e) {
System.out.println(String.format("线程%s被打断", name));
}
}
}
~~~
Task 代码:
~~~java
public class Task {
private int copyCount;
private int from;
private int to;
private String word;
public Task(int copyCount, int from, int to, String word) {
this.copyCount = copyCount;
this.word = word;
this.from = from;
this.to = to;
}
public Task(int copyCount, String word) {
this.copyCount = copyCount;
this.word = word;
this.from = 1;
this.to = copyCount;
}
public int getCopyCount() {
return copyCount;
}
public String getWord() {
return word;
}
public int getFrom() {
return from;
}
public int getTo() {
return to;
}
}
~~~
Client 代码:
~~~java
public class Client {
public static void main(String[] args) throws InterruptedException {
Task task = new Task(123,"internationalization");
Master master = new Master();
master.startTask(task);
master.printResult();
}
}
~~~
- 前言
- 第1章 Java并发简介
- 01 开篇词:多线程为什么是你必需要掌握的知识
- 02 绝对不仅仅是为了面试—我们为什么需要学习多线程
- 03 多线程开发如此简单—Java中如何编写多线程程序
- 04 人多力量未必大—并发可能会遇到的问题
- 第2章 Java中如何编写多线程
- 05 看若兄弟,实如父子—Thread和Runnable详解
- 06 线程什么时候开始真正执行?—线程的状态详解
- 07 深入Thread类—线程API精讲
- 08 集体协作,什么最重要?沟通!—线程的等待和通知
- 09 使用多线程实现分工、解耦、缓冲—生产者、消费者实战
- 第3章 并发的问题和原因详解
- 10 有福同享,有难同当—原子性
- 11 眼见不实—可见性
- 12 什么?还有这种操作!—有序性
- 13 问题的根源—Java内存模型简介
- 14 僵持不下—死锁详解
- 第4章 如何解决并发问题
- 15 原子性轻量级实现—深入理解Atomic与CAS
- 16 让你眼见为实—volatile详解
- 17 资源有限,请排队等候—Synchronized使用、原理及缺陷
- 18 线程作用域内共享变量—深入解析ThreadLocal
- 第5章 线程池
- 19 自己动手丰衣足食—简单线程池实现
- 20 其实不用造轮子—Executor框架详解
- 第6章 主要并发工具类
- 21 更高级的锁—深入解析Lock
- 22 到底哪把锁更适合你?—synchronized与ReentrantLock对比
- 23 按需上锁—ReadWriteLock详解
- 24 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap上
- 25 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap下
- 26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒数计时开始,三、二、一—CountDownLatch详解
- 28 人齐了,一起行动—CyclicBarrier详解
- 29 一手交钱,一手交货—Exchanger详解
- 30 限量供应,不好意思您来晚了—Semaphore详解
- 第7章 高级并发工具类及并发设计模式
- 31 凭票取餐—Future模式详解
- 32 请按到场顺序发言—Completion Service详解
- 33 分阶段执行你的任务-学习使用Phaser运行多阶段任务
- 34 谁都不能偷懒-通过 CompletableFuture 组装你的异步计算单元
- 35拆分你的任务—学习使用Fork/Join框架
- 36 为多线程们安排一位经理—Master/Slave模式详解
- 第8章 总结
- 37 结束语