企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## BlockingQueue ### 核心方法 #### 放入数据 (1)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法  的线程);         (2)offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。 (3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续. #### 获取数据 (1)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null; (2)poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间 超时还没有数据可取,返回失败。 (3)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;  (4)drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。 ## ArrayBlockingQueue 基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。 ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。 ~~~ public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//获得锁 try { while (count == items.length) notFull.await();//队伍满了 暂时交出锁 enqueue(e); } finally { lock.unlock();//释放锁 } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//获得锁 try { while (count == 0) notEmpty.await();//队伍空了 暂时交出锁 return dequeue(); } finally { lock.unlock();//获得锁 } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();//唤醒take } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal();//唤醒put return x; } ~~~ ## LinkedBlockingQueue 基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX\_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。 ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。 ~~~ public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private final int capacity;//容量 默认为Integer.MAX_VALUE private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } } ~~~ ## SynchronousQueue SynchronousQueue(后面称SQ)内部没有容量,所以不能通过peek方法获取头部元素;也不能单独插入元素,可以简单理解为它**的插入和移除是“一对”对称的操作**。为了兼容 Collection 的某些操作(例如contains),SQ 扮演了一个空集合的角色。 **当生产者插入后如果没有消费者取出,就生产者这就会一直堵塞,直到消费者取出** SQ 的一个典型应用场景是在线程池中,Executors.newCachedThreadPool() 就使用了它,这个构造使线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。 SynchronousQueue是一个队列和栈算法实现,在SynchronousQueue中双队列FIFO提供公平模式,而双栈LIFO提供的则是非公平模式。 ~~~ public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>(); Thread putThread = new Thread(new Runnable() { @Override public void run() { System.out.println("put thread start"); try { queue.put(1); } catch (InterruptedException e) { } System.out.println("put thread end"); } }); Thread takeThread = new Thread(new Runnable() { @Override public void run() { System.out.println("take thread start"); try { System.out.println("take from putThread: " + queue.take()); } catch (InterruptedException e) { } System.out.println("take thread end"); } }); putThread.start(); Thread.sleep(1000); takeThread.start(); } ~~~ 一种输出结果如下: ~~~ put thread start take thread start take from putThread: 1 put thread end take thread end ~~~ 从结果可以看出,put线程执行queue.put(1) 后就被阻塞了,只有take线程进行了消费,put线程才可以返回。可以认为这是一种线程与线程间一对一传递消息的模型。 ## PriorityBlockingQueue  和ArrayBlockingQueue一样内部使用数组实现,插入和获取数据使用同一把锁。不同的是,不管有无指定长度,都会实现可以实现自动扩容;在构造函数需传入comparator,用于插入元素时继续排序,若没有传入comparator,则插入的元素必须实现Comparatable接口。 阻塞队列中的优先级排序是基于一个**堆排序** ~~~ private transient Object[] queue; public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) //扩容 tryGrow(array, cap); try { //插入并排序 Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; } ~~~ ## DelayQueue DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,**在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。** DelayQueue非常有用,可以运用在以下两个应用场景:  * 缓存系统的设计:使用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,就表示有缓存到期了。  * 定时任务调度:使用DelayQueue保存当天要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如Timer就是使用DelayQueue实现的。 ~~~ //入队操作与PriorityBlockingQueue基本一致,这里不再叙述,需要根据延时排序 public boolean offer(E e) { // 获取全局独占锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 向优先队列中插入元素 q.offer(e); // 如果队首元素是刚插入的元素,则设置leader为null,并唤醒阻塞在available上的线程 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { // 释放全局独占锁 lock.unlock(); } } ~~~ ~~~ public E take() throws InterruptedException { // 获取全局独占锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 获取队首元素 E first = q.peek(); // 队首为空,则阻塞当前线程 if (first == null) available.await(); else { // 获取队首元素的超时时间 long delay = first.getDelay(NANOSECONDS); // 已超时,直接出队 if (delay <= 0) return q.poll(); // 释放first的引用,避免内存泄漏 first = null; // don't retain ref while waiting // leader != null表明有其他线程在操作,阻塞当前线程 if (leader != null) available.await(); else { // leader指向当前线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 超时阻塞 available.awaitNanos(delay); } finally { // 释放leader if (leader == thisThread) leader = null; } } } } } finally { // leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列 if (leader == null && q.peek() != null) available.signal(); // 释放全局独占锁 lock.unlock(); } ~~~