助力软件开发企业降本增效 PHP / java源码系统,只需一次付费,代码终身使用! 广告
# 自定义一个简单的线程池 ## 写在前面 在学习这个的时候,我一直在想,怎样理清思路,把中间的点一个一个串起来,然后自己默写出来,所以这个笔记我改了很多次,之前整理了很多理论知识,比如为什么使用线程池?线程池的优点等等,后来都删掉了。理论知识google一下,百度一下都写得非常好,所以我这里也没必要copy一份粘贴在这里,这里就以理清思路为主。 ### 搭建一个最简单的框架 这里先把最基本的代码写出来,我们先定义一个SimpleThreadPool类。 成员属性两个: **size**: *线程池里线程的数量* **DEFAULT_SIZE**: *默认线程池的线程数量* 成员方法: 两个构造方法,这里只是构造SimpleThreadPool这个类的size。 然后构造完成后调用init(),我们再编写一个init方法。 定义一个枚举代表我们工作线程的四个状态: **FREE**:*可以使用状态* **RUNNING**:*正在运行状态* **BLOCKED**:*阻塞状态* **DEAD**:*结束状态* 定义一个WorkerTask代表我们的工作线程,继承Thead并重写run方法,其中的状态的构造等代码暂时自行发挥即可。 ```java public class SimpleThreadPool { private final int size; private final static int DEFAULT_SIZE = 10; public SimpleThreadPool() { this(DEFAULT_SIZE); } public SimpleThreadPool(int size) { this.size = size; init(); } private void init() { } /** * 线程task状态 */ private enum TaskState { FREE, RUNNING, BLOCKED, DEAD } /** * workTask */ private static class WorkerTask extends Thread { private volatile TaskState taskState = TaskState.FREE; //默认free状态 /** * 获取workTask的taskState状态 * @return TaskState */ public TaskState getTaskState() { return this.taskState; } @Override public void run() { //TODO } /** * 关闭线程重置TaskState为DEAD */ public void close() { this.taskState = TaskState.DEAD; } } } ``` ### 完善工作线程逻辑 接下来开始一步一步完善程序: 1. WorkerTask中引入ThreadGroup ```java public WorkerTask(ThreadGroup threadGroup,String name){ super(threadGroup,name); } ``` 2. 接下来完善run方法,因为run方法不能执行完程序就挂掉,如果执行完就挂掉了也就没意义了,所以这里使用while: ```java @Override public void run() { while(this.taskState != TaskState.DEAD){ //TODO 执行任务 } } ``` 3. 接下来就要考虑我们执行的任务在哪里获取呢?这里就要用到任务队列了: ```java private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>(); ``` 完善我们的while循环: ```java @Override public void run() { while (this.taskState != TaskState.DEAD) { synchronized (TASK_QUEUE) { // 任务队列为空,则进入阻塞状态 while (TASK_QUEUE.isEmpty()) { try { this.taskState = TaskState.BLOCKED; TASK_QUEUE.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } ``` 注意看这个位置: ![](https://img.kancloud.cn/f9/16/f91696af27c0aa18dfc87b08fb1a0c4e_610x334.png) 如果我们在任务队列为空的情况下,打断wait()的线程是退出到while循环里的,所以我们要加一个lable: ![](https://img.kancloud.cn/ba/ef/baef3d5574b1accb862d94c31c14ace5_598x332.png) 4. 接下来我们编写**任务队列不为空**情况下的代码: ```java @Override public void run() { OUTER: while (this.taskState != TaskState.DEAD) { synchronized (TASK_QUEUE) { Runnable runnable; // 任务队列为空,则进入阻塞状态 while (TASK_QUEUE.isEmpty()) { try { this.taskState = TaskState.BLOCKED; TASK_QUEUE.wait(); } catch (InterruptedException e) { System.out.println("Closed."); break OUTER; } } // 任务队列不为空,取出任务 runnable = TASK_QUEUE.removeFirst(); // 任务不为空,则执行任务 if (runnable != null) { this.taskState = TaskState.RUNNING; runnable.run(); this.taskState = TaskState.FREE; } } } } ``` 5.这样工作线程就定义完了,接下来就要完善提交任务了,在提交任务之前,我们首先要构建。 我们在SimpleThreadPool类里面增加creatWorkeTask方法: ```java private void creatWorkeTask(String name) { } ``` 这里我们打算用creatWorkeTask调用WorkerTask,WorkerTask的两个参数,一个线程组,一个名字,这个名字我们来通过前缀+自增的方式生成,所以: (注意这里是volatile) ```java private static volatile int seg=0; ``` 增加名字的前缀 ```JAVA /** * 线程名前缀 */ private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-"; ``` 增加一个ThreadGroup: ```java /** * 线程组 */ private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group"); ``` 这样就开始完善我们的creatWorkeTask方法了: ```java private void creatWorkeTask(String name) { WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seg++)); task.start(); THREAD_QUEUE.add(task); } ``` 这里start后我们把他放在线程队列里,我们定义一个List来存放WorkerTask,以便于我们管理。 ```java /** * 线程队列 */ private static final List<WorkerTask> THREAD_QUEUE = new ArrayList<>(); ``` 6.提交任务我们就写到这里,现在开始编写init方法: ```java private void init() { for (int i = 0; i < size; i++) { creatWorkeTask(); } } ``` 7.这时候我们如果调用SimpleThreadPool,他去init的时候,其实WorkerTask是wait状态,因为TASK_QUEUE是空的,这时候我们就需要一个对外开放的接口来操作TASK_QUEUE了: (这里有个细节就是因为TASK_QUEUE在我们的工作队列里是有读操作的,所以我们这里要加锁才行) ```java public void submit(Runnable runnable){ synchronized (TASK_QUEUE){ TASK_QUEUE.addLast(runnable); TASK_QUEUE.notifyAll(); } } ``` 8.接下来就是激动人心的时刻,我们简单调用一下看看效果: ```java public static void main(String[] args) { SimpleThreadPool threadPool = new SimpleThreadPool(); IntStream.rangeClosed(0, 40) .forEach(i -> { threadPool.submit(() -> { System.out.println("The Runnable" + i + " be serviced by " + Thread.currentThread().getName()+" start"); try { Thread.sleep(1_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("The Runnable" + i + " be serviced by " + Thread.currentThread().getName()+" finished"); }); }); } ``` 9.补充一点,注意看这里的代码: ![](https://img.kancloud.cn/28/03/28033994d5530de8669cce9c00d6520e_729x482.png) 调整后的代码如下: ![](https://img.kancloud.cn/9d/91/9d91ae423ce82148645098345b1bd5f2_578x467.png) 看一下运行效果: ![](https://img.kancloud.cn/5a/2d/5a2dcc3b9f098fe47c36dbe7eeaee46d_1053x349.gif) 一共10个线程,执行了40个任务,后面我们再继续完善线程池,增加拒绝策略,停止等功能。