## 26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue
> 读书而不思考,等于吃饭而不消化。
> ——波尔克
前面两节我们对 ConcurrentHashMap 的源代码进行了分析,是不是意犹未尽?那么做好准备,本小节,我们将对阻塞队列的借口 BlockingQueue 以及它的一个实现 ArrayBlockingQueue 做源码分析。说实话,源码分析十分的枯燥和艰难,不过坚持下来,收获将会很大。
## 1、BlockingQueue 介绍
BlockingQueue 顾名思义----阻塞队列。队列大家应该都很清楚了,阻塞队列是指当队列满时,入队操作需要等待;当队列空时,出队操作需要等待。而非阻塞的方式则会直接返回false。
BlockingQueue 是一个接口。定义了出队和入队的方法。
###### 我们首先看入队的方法:
**1、boolean add(E e)**
该方法向队列添加元素e,如果可以有空间添加,那么添加成功返回 true。如果没有空间,那么直接抛出 IllegalStateException。
**2、boolean offer(E e)**
该方法向队列添加元素 e,如果可以有空间添加,那么添加成功返回 true。如果没有空间,则会返回 false。和 add 方法很像,只不过不是抛异常。
**3、boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException**
该方法向队列添加元素 e,如果可以有空间添加,那么添加成功返回 true。如果没有空间,则会等待一定时长,如果仍旧没有空间则返回 false。
**4、void put(E e) throws InterruptedException**
该方法向队列添加元素 e,如果成功立即返回,如果没有空间,则一直等待空间。等待是可被直接中断。
这四种入队的方法中前两种为非阻塞方式,后两种为阻塞方式。
###### 接下来我们再来分析出队的方法:
**1、E take() throws InterruptedException**
返回并且从队列中移除 head 元素。如果队列为空,则会阻塞等待直到元素入队。
**2、E poll(long timeout, TimeUnit unit)**
返回并且从队列中移除 head 元素。如果队列为空,则会阻塞一定的时间,等待有元素入队。如果等待时间内没有元素入队,那么返回 null。
以上两种出队方法均为阻塞方法。
###### 还有几个其它的方法,也做下介绍:
**1、boolean contains(Object o)**
如果队列中至少含有一个元素 equals 对象 o,那么返回 true。
**2、int drainTo(Collection c)和int drainTo(Collection c, int maxElements)**
第一个方法是一次性“榨干”队列,把所有元素 remove 掉,放入集合 c 中。第二个方法的区别是每次 remove 掉 maxElements 个元素,放入集合 c。
**3、int remainingCapacity()**
返回列表的剩余容量。
###### 我们通过以下表格对出入对方法进行总结:
**入队:**
| 方法名 | 是否阻塞 | 抛出异常 |
| --- | --- | --- |
| add(E e) | 否 | IllegalStateException |
| offer(E e) | 否 | 无。返回false |
| offer(E e, long timeout, TimeUnit unit) | 阻塞指定时长 | InterruptedException |
| put(E e) | 是 | InterruptedException |
**出队:**
| 方法名 | 是否阻塞 | 抛出异常 |
| --- | --- | --- |
| take() | 是 | InterruptedException |
| poll(long timeout, TimeUnit unit) | 阻塞指定时长,超时返回null | InterruptedException |
## 2、ArrayBlockingQueue源码分析
ArrayBlockingQueue 是 BlockingQueue 的一种实现。底层通过数组实现。ArrayBlockingQueue 支持公平和非公平的方式来入队和出队。公平是指当出现多个线程阻塞时,等待时间长的会先获得锁,非公平则不一定。
我们先看 ArrayBlockingQueue 的构造函数,有如下三个
1. public ArrayBlockingQueue(int capacity);
2. public ArrayBlockingQueue(int capacity, boolean fair);
3. public ArrayBlockingQueue(int capacity, boolean fair, Collection c) 。
capacity 为队列的大小,一旦初始化了无法改变。fair 指锁的类型,c 用来初始化队列时设置初始元素。
ArrayBlockingQueue 通过两个 Condition 信号量来控制出队和入队的阻塞,分别为 notEmpty 和 notFull。
接下来,我们先分析入队方法 put 的源代码。
### 2.1 put 方法源码分析
~~~java
public void put(E e) throws InterruptedException {
//检查e是否为null,如果为null,抛出NullPointerException
checkNotNull(e);
//声明显式锁
final ReentrantLock lock = this.lock;
//以lockInterruptibly方式上锁,如果其他线程打断等待的线程,那么等待的线程会立刻终止等待,抛出InterruptedException
lock.lockInterruptibly();
try {
//如果队列已经满了,那么等待
while (count == items.length)
notFull.await();
//直到消费者消费了一个元素后,通过notFull.signal()来通知等待的线程添加元素
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
~~~
enqueue 中实现元素的入队操作,代码如下:
~~~java
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
~~~
putIndex 记录了当前可以写入的数组下标。
count 是 queue 中保存的元素总数。
这段代码中,第 4、5 行不太好理解。当 ++putIndex 和数组长度一样时,说明到了数组的最后一个元素。然后putIndex 被置为0。这是因为这里循环使用数组,由于数组长度就是队列的长度,并且出一个才能进一个,所以进到数组最后一位时,下一个等待出队的位置肯定是 >=0 。此时循环使用数组,下一个 put 的位置转到数组第一位。如果消费得比较快,消费的位置 >0,那么没问题,再次 put 时候就会写入 index=0 的位置。如果恰巧出队位置就是 0。那么下次 put 的时候,由于count == items.length,而会进入等待。直到位置 0 的元素被消费掉,那么写入 0 位置,也不会有任何问题。
这么做的好处是循环使用数组,而不需要每次消费都向前移动数组中元素的位置。
### 2.2 offer 方法和 add 方法源码分析
下面我们再来看看 offer 的源代码:
~~~java
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
~~~
区别就是使用 lock,在等待锁的期间不能被打断。另外如果 count == items.length,队列满了直接返回 false,而不是阻塞等待。
这里一并讲解 add 方法:
~~~java
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
~~~
其实 add 方法中调用的 offer 方法,只不过 add 在 offer 返回 false 时,抛出异常。
### 2.3 有等待超时参数的 offer 方法源码分析
~~~java
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
~~~
其实这个方法和 put 更像一点,put 阻塞直到有空位出现。而此方法阻塞时会有超时时间设置, notFull.awaitNanos(nanos) 返回剩余的超时时长,当 nanos <= 0,也就是说没有时长了,如果还没有等到空位,那么也会放回 false。
以上入队的方法已经讲完了,接下来我们看一下出队的方法,出队方法的区别其实和入队类似,所以这里我不再详细讲解每一个,我们看一个典型就可以了。
### 2.4 take 方法源代码分析
tabke 方法和 put 方法相对应,代码如下:
~~~csharp
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
~~~
可以看到,和put 一样都是使用的 lockInterruptibly 方法上锁,可以被中断。当 count 为0时,说明队列已经空了,无法取出元素,那么通过调用 notEmpty.await() 阻塞。直到某个入队方法添加元素后调用了notEmpty.signal(),通知该线程可以继续出队操作了。
poll() 方法不会阻塞,如果没有元素,则直接返回 null。
poll(long timeout, TimeUnit unit) ,则会阻塞一定时长,如果还是没有元素,才会返回 null。
## 3、总结
关于 ArrayBlockingQueue 的源代码就讲解到这里,可以看出源代码并不复杂,而且不同方法的实现也很类似,只有细微的差别。如果大家感兴趣,相信你轻松就能读懂源代码。另外 BlockingQueue 的实现还有很多,还有个比较常用的是 LinkedBlockingQueue,通过链表存储来实现,如果感兴趣也可以自己去看源代码。
阻塞队列用来实现生产者和消费者很好用。另外我们还应该知道在 Executor 中使用了 BlockingQueue。大家如果已经忘了,可以回过头再去看看 Executor 源码分析那一节的内容。
}
- 前言
- 第1章 Java并发简介
- 01 开篇词:多线程为什么是你必需要掌握的知识
- 02 绝对不仅仅是为了面试—我们为什么需要学习多线程
- 03 多线程开发如此简单—Java中如何编写多线程程序
- 04 人多力量未必大—并发可能会遇到的问题
- 第2章 Java中如何编写多线程
- 05 看若兄弟,实如父子—Thread和Runnable详解
- 06 线程什么时候开始真正执行?—线程的状态详解
- 07 深入Thread类—线程API精讲
- 08 集体协作,什么最重要?沟通!—线程的等待和通知
- 09 使用多线程实现分工、解耦、缓冲—生产者、消费者实战
- 第3章 并发的问题和原因详解
- 10 有福同享,有难同当—原子性
- 11 眼见不实—可见性
- 12 什么?还有这种操作!—有序性
- 13 问题的根源—Java内存模型简介
- 14 僵持不下—死锁详解
- 第4章 如何解决并发问题
- 15 原子性轻量级实现—深入理解Atomic与CAS
- 16 让你眼见为实—volatile详解
- 17 资源有限,请排队等候—Synchronized使用、原理及缺陷
- 18 线程作用域内共享变量—深入解析ThreadLocal
- 第5章 线程池
- 19 自己动手丰衣足食—简单线程池实现
- 20 其实不用造轮子—Executor框架详解
- 第6章 主要并发工具类
- 21 更高级的锁—深入解析Lock
- 22 到底哪把锁更适合你?—synchronized与ReentrantLock对比
- 23 按需上锁—ReadWriteLock详解
- 24 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap上
- 25 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap下
- 26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue
- 27 倒数计时开始,三、二、一—CountDownLatch详解
- 28 人齐了,一起行动—CyclicBarrier详解
- 29 一手交钱,一手交货—Exchanger详解
- 30 限量供应,不好意思您来晚了—Semaphore详解
- 第7章 高级并发工具类及并发设计模式
- 31 凭票取餐—Future模式详解
- 32 请按到场顺序发言—Completion Service详解
- 33 分阶段执行你的任务-学习使用Phaser运行多阶段任务
- 34 谁都不能偷懒-通过 CompletableFuture 组装你的异步计算单元
- 35 拆分你的任务—学习使用Fork/Join框架
- 36 为多线程们安排一位经理—Master/Slave模式详解
- 第8章 总结
- 37 结束语