合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
# AQS 队列同步器AbstractQueuedSynchronizer,用来构建锁或者其他同步组件的基础框架,内部使用一个变量state来表示同步状态,同时使用一个FIFO队列来完成线程的排队工作。 锁或者其他同步组件一般都会定义一个静态内部类,该静态内部类会继承AQS,同时**重写**AQS中的方法,重写AQS中的方法时需要用到下面三个方法来获取同步状态。 - **getState()** 获取state属性的内容。 - **setState(int newState)** 设置state属性的内容。 - **compareAndSetState(int expect, int update)** 使用CAS设置当前状态,保证状态设置的原子性。 > 总结:如何自定义一个锁或者同步组件? > 创建静态内部类继承AQS,重写AQS中的**可重写**的方法,在里面使用AQS提供的如上三个方法来获取、修改同步状态。最后调用AQS中的模板方法来进行操作,模板方法中会调用重写的方法。 > **即使用者调用模板方法,模板方法调用重写方法,重写方法调用如上三个方法。** :-: ![](https://img.kancloud.cn/ec/d8/ecd87535e99d81e7fc1de29eb7bf6e84_1092x589.png) &nbsp; ## 可重写的方法 1. ``` protected boolean tryAcquire(int arg); ``` **独占式获取**同步状态,查询当前状态并根据具体条件设置同步状态。 2. ``` protected boolean tryRelease(int arg); ``` **独占式释放**同步状态,等待的线程有机会获取同步状态。 3. ``` protected int tryAcquireShared(int arg); ``` **共享式获取**同步状态,返回大于等于0的值表示获取成功,反之获取失败。 4. ``` protected boolean tryReleaseShared(int arg); ``` **共享式释放**同步状态。 5. ``` protected boolean isHeldExclusively(); ``` 表示是否被当前线程占用。 &nbsp; ## 模板方法 1. 独占式获取同步状态 ``` void acquire(int arg); ``` 当前线程获取成功则会返回,否则进入同步队列**等待**,调用重写方法中的**tryAcquire**。 2. 独占式获取同步状态,响应中断 ``` void acquireInterruptible(int arg); ``` 如果当前线程被中断,则会抛出InterruptedException。 3. 超时获取同步状态 ``` boolean tryAcquireNanos(int arg, long nanos); ``` 在acquireInterruptible的基础上设置超时时间,如果超时时间还没有获取到同步状态,会返回false,否则返回true。 4. 共享获取同步状态 ``` void acquireShared(int arg); ``` 5. 共享获取同步状态,响应中断 ``` void acquireSharedInterruptible(int arg); ``` 6. 共享获取同步状态,响应中断,添加超时时间 ``` boolean tryAcquireSharedNanos(int arg, long nanos); ``` 7. 独占式释放同步状态 ``` boolean release(int arg); ``` 同步队列中的第一个节点将会被唤醒。 8. 共享式释放同步状态 ``` boolean releaseShared(int arg); ``` 9. 获取等待在同步队列上的线程集合 ``` Collection<Thread> getQueuedThreads(); ``` 总之:模板可以分为三类:独占式获取与释放同步状态、共享式获取与释放同步状态、查询同步队列线程等待情况。获取又有分为`中断`、`超时`。 &nbsp; ## 自定义同步组件 ~~~ /** * 自定义不可重入锁 */ public class UnReetrantLock implements Lock { // 同步锁 public static class Sync extends AbstractQueuedSynchronizer { /** * 尝试独占式获取锁 * * @param arg * @return */ @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { // 调用CAS,当state为0时获取成功,底层由C++提供 // 加上锁了 // 设置持有锁的线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 尝试释放锁 * * @param arg * @return */ @Override protected boolean tryRelease(int arg) { // 不会有其他线程竞争、直接设置就行了 setExclusiveOwnerThread(null); setState(0); // 保证前面的内容对其他线程可见 return false; } @Override protected boolean isHeldExclusively() { return getState() == 1; } public Condition newCondition() { return new ConditionObject(); } } private Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } } ~~~ &nbsp; ## AQS实现 **底层数据结构:同步队列** AQS中使用一个双向链表来保存等待同步状态的线程,链表的节点用其内部自定义的Node表示,Node类源码: ~~~ static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 同步状态 volatile int waitStatus; volatile Node prev; volatile Node next; // 线程引用 volatile Thread thread; Node nextWaiter; } ~~~ waitStatus有五个状态: - cancelled = 1:同步队列中的线程等待超时或者中断时的状态,后续不会再改变。 - signal = -1:节点获取同步状态,一般是队头节点,后续节点处于等待状态。 - condition = -2:节点在等待队列中(注意不是同步队列),线程等待Condition,当Condition调用了signal()之后,该节点会从**等待队列**转移到**同步队列**。 - propagate = -3: - initial = 0:初始状态。 同步队列采用尾插法的方式,同时会使用CAS保证尾插的时候是线程安全的。其结构如下: :-: ![](https://img.kancloud.cn/88/ff/88ff8ad5fad5ceddd726eac9d92662b6_892x350.png) 其中队头是获取同步状态成功的节点,当首节点的线程释放同步状态的时候,会唤醒后继的节点,后继节点会成为首节点。(这个过程不用CAS,没有竞争的情况。) **acquire方法流程** ~~~ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ~~~ :-: ![](https://img.kancloud.cn/86/0e/860eee5226ec9a94eedf5c3a9d220312_887x722.png) 同步队列中的节点不断的在自旋判断其**前驱节点是不是头节点**,如果是则尝试获取同步状态,否则会阻塞节点中的线程。 **acquireShared方法流程** ~~~ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } ~~~ :-: ![](https://img.kancloud.cn/4d/d8/4dd803aff2d53d8b8b27f923533ba95e_925x699.png) ## ReetantLock ReentrantLock,支持重入锁和公平与非公平锁。 &nbsp; ### ReentrantLock实现可重入 重入锁:支持线程反复的获取锁资源而不会自己阻塞自己,有两个问题要实现: 1. 线程再次获取锁,判断是否是当前线程获取锁。 2. 锁的最终释放,需要计数锁被重入几次,计数器最终释放为0时才表示锁的最终释放。 :-: ![](https://img.kancloud.cn/f0/c6/f0c68a8e0dde268ff6346d568ba131bf_932x506.png) 例如非公平锁每次再尝试获取锁的时候都会判断是不是同个线程,如果是的话增加计数器的值。释放锁时等到计数器的值为0时才将占有锁的线程设置为null。 :-: ![](https://img.kancloud.cn/ff/fd/fffd95b0ff268f37137bdb4768104bc2_920x363.png) &nbsp; ### 公平锁与非公平锁 公平锁:获取锁的线程按照绝对的时间顺序,FIFO。 非公平锁:只要CAS设置同步状态成功,就获取锁,不会按照FIFO顺序。 ReentrantLock的构造方法中传入true时可以创建公平锁: ~~~ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ~~~ 公平锁在tryAcquire的时候会判断当前线程是否有前驱节点,有的话则会等待前驱节点释放之后在获取尝试获取锁。 公平锁的tryAcquire: :-: ![](https://img.kancloud.cn/24/e2/24e231a5f7ab6f03340aa1a89c823e8d_978x611.png) hasQueuePredecessors方法用来判断是否有前驱节点 非公平锁的tryAcquire: :-: ![](https://img.kancloud.cn/20/ce/20ced0fe4146742378f9716402b714a7_868x514.png) > 问:如何实现公平锁? > 构造函数的参数传入true,在重写的tryAcquire方法中判断当前线程是否有前驱线程,有的话尝试获取同步状态失败,以此来达到公平的效果。 对比: 公平锁虽然会按照FIFO原则,但是会进行大量的线程切换,非公平锁虽然可能会造成其他线程饥饿,但是可以极大提高吞吐量。 &nbsp; :-: ![](https://img.kancloud.cn/a2/a4/a2a482a54328499aab8f34ae642e3f8f_1452x306.png)