企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
### **JUC LinkedBlockingQueue** java.util.concurrent.LinkedBlockingQueue是一个基于单向链表,范围任意的(其实是有界的),FIFO阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁, 队列是否为空,是否已满任然是通过元素数量的计算器,(count)进行判断,由于可以同时在队头,队尾并发地进行访问,添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类,AtomicInteger,这就决定了它的容量范围是1-integer.MAX_VALUE.。 ### **源码解析** ``` //内部构造方法 public LinkedBlockingQueue(){ this(Integer.MAX_VALUE); } ``` 实例化时如果未给初始控制值,则按照Integer最大值(2的31次方-1)去走 当实例化时给了一个初始化空间值。 ~~~ //内部构造方法 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } ~~~ 当capacity的值等于或者小于0时,则抛出IllegalArgumentException,最后链表的队头和队尾都初始化为空。 ## **在说第三个构造函数时,先看一下ReentrantLock是什么东西,跟synchronized有什么区别** 1. synchronized是独占锁,加锁和解锁的过程是自动进行,易于操作,但不够灵活。 ReentrantLock也是独占锁,加锁和解锁的过程是手动进行,不易于操作,但非常灵活 2. synchronized是可重入锁,以为加锁和解锁是自动进行,不必担心最后是否释放锁; ReentrantLock也可重入锁,但加锁和解锁是手动进行,且次数需一样,否则其他线程无法得到锁。 3. Synchronized不可响应中断,一个线程获取不到锁就一直等着; ReentrantLock可以响应中断 使用: ``` private static final Lock lock = new ReentrantLock(); public static void main(String[] args) { new Thread( ()-> test(),"线程A").start(); new Thread( ()-> test(),"线程B").start(); } public static void test(){ try { lock.lock(); System.out.println(Thread.currentThread().getName()+"获取了锁"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"释放了锁"); lock.unlock(); } } ``` 执行结果是: ``` 线程A获取了锁 线程A释放了锁 线程B获取了锁 线程B释放了锁 ``` 看一下把test方法加上synchronized 注释掉lock.lock()和lock.unlock() 其执行结果是一致的,那不同点在哪呢?那就是公平锁的实现。 ``` private static final Lock lock = new ReentrantLock(true); public static void main(String[] args) { new Thread( ()-> test(),"线程A").start(); new Thread( ()-> test(),"线程B").start(); new Thread( ()-> test(),"线程C").start(); } public static void test(){ for(int i =0;i<2;i++){ try { lock.lock(); System.out.println(Thread.currentThread().getName()+"获取了锁"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"释放了锁"); lock.unlock(); } } } ``` 在初始化时加上了公平锁策略,构造参数值为true 上面代码执行结果如下列: 左边是使用ReentranyLock锁,右边是使用synchronized ![](https://img.kancloud.cn/fb/57/fb579bdb7d6d8caf82801429f101ba00_669x467.png) 使用ReentranLock公平锁,会按照顺序执行。 非公平锁: &nbsp;&nbsp;&nbsp;&nbsp;非公平锁那就随机的获取,看cpu时间片轮到哪个线程,哪个线程就能获取锁,和上面公平锁的区别很简单,就在于实例化时构造参数为false或者true,默认为false。 响应中断: &nbsp;&nbsp;&nbsp;&nbsp;响应中断就是一个线程获取不到锁,不会一直等下去,Reentranlock会给予一个中断回应。看下面代码: ``` public class aaa { static final Lock lock = new ReentrantLock(); static final Lock lock1 = new ReentrantLock(); public static void main(String[] args) { Thread thread = new Thread(new ThreadA(lock,lock1)); Thread thread1 = new Thread(new ThreadA(lock,lock1)); thread.start(); thread1.start(); thread.interrupt(); } } class ThreadA implements Runnable{ Lock firstLock; Lock sencondLock; public ThreadA(Lock firstLock,Lock sencondLock){ this.firstLock = firstLock; this.sencondLock = sencondLock; } @Override public void run() { try { firstLock.lockInterruptibly(); TimeUnit.SECONDS.sleep(5); sencondLock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); }finally { firstLock.unlock(); sencondLock.unlock(); System.out.println(Thread.currentThread().getName()+"获取到了资源,正常结束"); } } } ``` 定义了两个锁,lock和lock1,然后使用两个线程thread和thread1造成是说场景。正常情况下,这两个线程相互等待获取资源而处于死循环状态。但是thread中断,另外一个线程就可以获取到资源,正常执行, 限时等待: &nbsp;&nbsp;&nbsp;&nbsp;通过tryLock方法来实现,可以选择传入时间参数,表示等待指定的时间,无参则表示立即返回锁申请的结果,true表示获取锁成功,false表示获取锁失败,可以用这种发放解决死锁的问题。 ### **回到原文** ``` //LinkedBlockingQueue第三构造方法 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } ``` 传入一个集合,创建一个容量为这个集合的LinkedBlockingQueue,并且按照集合迭代器顺序添加。 这里使用ReentranLock锁来实现同步并且线程安全。 初始化给了Integer的最大值。并创建了一个锁,并锁当前方法 迭代传入的集合,当集合内的元素为null时 抛出一个空指针异常。当计数类的数值等于数组内的元素的个数时,会抛出Queue Full 异常。<br/><br/> ``` public int size() { return count.get(); } ``` 获取当前已使用的容量。是通过原子类AtomicInteger的get方法获取<br/><br/> ~~~ public int remainingCapacity() { return capacity - count.get(); } ~~~ 获取剩余容量,是通过最大的容量减已使用的容量<br/><br/> ~~~ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } ~~~ 在队列的尾部添加元素 未写完。。。。。。。。(2020年03月24日16:50)