# 自定义一个简单的线程池
## 写在前面
在学习这个的时候,我一直在想,怎样理清思路,把中间的点一个一个串起来,然后自己默写出来,所以这个笔记我改了很多次,之前整理了很多理论知识,比如为什么使用线程池?线程池的优点等等,后来都删掉了。理论知识google一下,百度一下都写得非常好,所以我这里也没必要copy一份粘贴在这里,这里就以理清思路为主。
### 搭建一个最简单的框架
这里先把最基本的代码写出来,我们先定义一个SimpleThreadPool类。
成员属性两个:
**size**: *线程池里线程的数量*
**DEFAULT_SIZE**: *默认线程池的线程数量*
成员方法:
两个构造方法,这里只是构造SimpleThreadPool这个类的size。
然后构造完成后调用init(),我们再编写一个init方法。
定义一个枚举代表我们工作线程的四个状态:
**FREE**:*可以使用状态*
**RUNNING**:*正在运行状态*
**BLOCKED**:*阻塞状态*
**DEAD**:*结束状态*
定义一个WorkerTask代表我们的工作线程,继承Thead并重写run方法,其中的状态的构造等代码暂时自行发挥即可。
```java
public class SimpleThreadPool {
private final int size;
private final static int DEFAULT_SIZE = 10;
public SimpleThreadPool() {
this(DEFAULT_SIZE);
}
public SimpleThreadPool(int size) {
this.size = size;
init();
}
private void init() {
}
/**
* 线程task状态
*/
private enum TaskState {
FREE, RUNNING, BLOCKED, DEAD
}
/**
* workTask
*/
private static class WorkerTask extends Thread {
private volatile TaskState taskState = TaskState.FREE; //默认free状态
/**
* 获取workTask的taskState状态
* @return TaskState
*/
public TaskState getTaskState() {
return this.taskState;
}
@Override
public void run() {
//TODO
}
/**
* 关闭线程重置TaskState为DEAD
*/
public void close() {
this.taskState = TaskState.DEAD;
}
}
}
```
### 完善工作线程逻辑
接下来开始一步一步完善程序:
1. WorkerTask中引入ThreadGroup
```java
public WorkerTask(ThreadGroup threadGroup,String name){
super(threadGroup,name);
}
```
2. 接下来完善run方法,因为run方法不能执行完程序就挂掉,如果执行完就挂掉了也就没意义了,所以这里使用while:
```java
@Override
public void run() {
while(this.taskState != TaskState.DEAD){
//TODO 执行任务
}
}
```
3. 接下来就要考虑我们执行的任务在哪里获取呢?这里就要用到任务队列了:
```java
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
```
完善我们的while循环:
```java
@Override
public void run() {
while (this.taskState != TaskState.DEAD) {
synchronized (TASK_QUEUE) {
// 任务队列为空,则进入阻塞状态
while (TASK_QUEUE.isEmpty()) {
try {
this.taskState = TaskState.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
```
注意看这个位置:
![](https://img.kancloud.cn/f9/16/f91696af27c0aa18dfc87b08fb1a0c4e_610x334.png)
如果我们在任务队列为空的情况下,打断wait()的线程是退出到while循环里的,所以我们要加一个lable:
![](https://img.kancloud.cn/ba/ef/baef3d5574b1accb862d94c31c14ace5_598x332.png)
4. 接下来我们编写**任务队列不为空**情况下的代码:
```java
@Override
public void run() {
OUTER:
while (this.taskState != TaskState.DEAD) {
synchronized (TASK_QUEUE) {
Runnable runnable;
// 任务队列为空,则进入阻塞状态
while (TASK_QUEUE.isEmpty()) {
try {
this.taskState = TaskState.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
System.out.println("Closed.");
break OUTER;
}
}
// 任务队列不为空,取出任务
runnable = TASK_QUEUE.removeFirst();
// 任务不为空,则执行任务
if (runnable != null) {
this.taskState = TaskState.RUNNING;
runnable.run();
this.taskState = TaskState.FREE;
}
}
}
}
```
5.这样工作线程就定义完了,接下来就要完善提交任务了,在提交任务之前,我们首先要构建。
我们在SimpleThreadPool类里面增加creatWorkeTask方法:
```java
private void creatWorkeTask(String name) {
}
```
这里我们打算用creatWorkeTask调用WorkerTask,WorkerTask的两个参数,一个线程组,一个名字,这个名字我们来通过前缀+自增的方式生成,所以:
(注意这里是volatile)
```java
private static volatile int seg=0;
```
增加名字的前缀
```JAVA
/**
* 线程名前缀
*/
private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";
```
增加一个ThreadGroup:
```java
/**
* 线程组
*/
private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
```
这样就开始完善我们的creatWorkeTask方法了:
```java
private void creatWorkeTask(String name) {
WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seg++));
task.start();
THREAD_QUEUE.add(task);
}
```
这里start后我们把他放在线程队列里,我们定义一个List来存放WorkerTask,以便于我们管理。
```java
/**
* 线程队列
*/
private static final List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
```
6.提交任务我们就写到这里,现在开始编写init方法:
```java
private void init() {
for (int i = 0; i < size; i++) {
creatWorkeTask();
}
}
```
7.这时候我们如果调用SimpleThreadPool,他去init的时候,其实WorkerTask是wait状态,因为TASK_QUEUE是空的,这时候我们就需要一个对外开放的接口来操作TASK_QUEUE了:
(这里有个细节就是因为TASK_QUEUE在我们的工作队列里是有读操作的,所以我们这里要加锁才行)
```java
public void submit(Runnable runnable){
synchronized (TASK_QUEUE){
TASK_QUEUE.addLast(runnable);
TASK_QUEUE.notifyAll();
}
}
```
8.接下来就是激动人心的时刻,我们简单调用一下看看效果:
```java
public static void main(String[] args) {
SimpleThreadPool threadPool = new SimpleThreadPool();
IntStream.rangeClosed(0, 40)
.forEach(i -> {
threadPool.submit(() -> {
System.out.println("The Runnable" + i + " be serviced by " + Thread.currentThread().getName()+" start");
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("The Runnable" + i + " be serviced by " + Thread.currentThread().getName()+" finished");
});
});
}
```
9.补充一点,注意看这里的代码:
![](https://img.kancloud.cn/28/03/28033994d5530de8669cce9c00d6520e_729x482.png)
调整后的代码如下:
![](https://img.kancloud.cn/9d/91/9d91ae423ce82148645098345b1bd5f2_578x467.png)
看一下运行效果:
![](https://img.kancloud.cn/5a/2d/5a2dcc3b9f098fe47c36dbe7eeaee46d_1053x349.gif)
一共10个线程,执行了40个任务,后面我们再继续完善线程池,增加拒绝策略,停止等功能。
- 微服务
- 服务器相关
- 操作系统
- 极客时间操作系统实战笔记
- 01 程序的运行过程:从代码到机器运行
- 02 几行汇编几行C:实现一个最简单的内核
- 03 黑盒之中有什么:内核结构与设计
- Rust
- 入门:Rust开发一个简单的web服务器
- Rust的引用和租借
- 函数与函数指针
- Rust中如何面向对象编程
- 构建单线程web服务器
- 在服务器中增加线程池提高吞吐
- Java
- 并发编程
- 并发基础
- 1.创建并启动线程
- 2.java线程生命周期以及start源码剖析
- 3.采用多线程模拟银行排队叫号
- 4.Runnable接口存在的必要性
- 5.策略模式在Thread和Runnable中的应用分析
- 6.Daemon线程的创建以及使用场景分析
- 7.线程ID,优先级
- 8.Thread的join方法
- 9.Thread中断Interrupt方法学习&采用优雅的方式结束线程生命周期
- 10.编写ThreadService实现暴力结束线程
- 11.线程同步问题以及synchronized的引入
- 12.同步代码块以及同步方法之间的区别和关系
- 13.通过实验分析This锁和Class锁的存在
- 14.多线程死锁分析以及案例介绍
- 15.线程间通信快速入门,使用wait和notify进行线程间的数据通信
- 16.多Product多Consumer之间的通讯导致出现程序假死的原因分析
- 17.使用notifyAll完善多线程下的生产者消费者模型
- 18.wait和sleep的本质区别
- 19.完善数据采集程序
- 20.如何实现一个自己的显式锁Lock
- 21.addShutdownHook给你的程序注入钩子
- 22.如何捕获线程运行期间的异常
- 23.ThreadGroup API介绍
- 24.线程池原理与自定义线程池一
- 25.给线程池增加拒绝策略以及停止方法
- 26.给线程池增加自动扩充,闲时自动回收线程的功能
- JVM
- C&C++
- GDB调试工具笔记
- C&C++基础
- 一个例子理解C语言数据类型的本质
- 字节顺序-大小端模式
- Php
- Php源码阅读笔记
- Swoole相关
- Swoole基础
- php的五种运行模式
- FPM模式的生命周期
- OSI网络七层图片速查
- IP/TCP/UPD/HTTP
- swoole源代码编译安装
- 安全相关
- MySql
- Mysql基础
- 1.事务与锁
- 2.事务隔离级别与IO的关系
- 3.mysql锁机制与结构
- 4.mysql结构与sql执行
- 5.mysql物理文件
- 6.mysql性能问题
- Docker&K8s
- Docker安装java8
- Redis
- 分布式部署相关
- Redis的主从复制
- Redis的哨兵
- redis-Cluster分区方案&应用场景
- redis-Cluster哈希虚拟槽&简单搭建
- redis-Cluster redis-trib.rb 搭建&原理
- redis-Cluster集群的伸缩调优
- 源码阅读笔记
- Mq
- ELK
- ElasticSearch
- Logstash
- Kibana
- 一些好玩的东西
- 一次折腾了几天的大华摄像头调试经历
- 搬砖实用代码
- python读取excel拼接sql
- mysql大批量插入数据四种方法
- composer好用的镜像源
- ab
- 环境搭建与配置
- face_recognition本地调试笔记
- 虚拟机配置静态ip
- Centos7 Init Shell
- 发布自己的Composer包
- git推送一直失败怎么办
- Beyond Compare过期解决办法
- 我的Navicat for Mysql
- 小错误解决办法
- CLoin报错CreateProcess error=216
- mysql error You must reset your password using ALTER USER statement before executing this statement.
- VM无法连接到虚拟机
- Jetbrains相关
- IntelliJ IDEA 笔记
- CLoin的配置与使用
- PhpStormDocker环境下配置Xdebug
- PhpStorm advanced metadata
- PhpStorm PHP_CodeSniffer