企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
> 本文用于系统总结 Java 并发包相关概念,只拣重点阐述,但实现细节不可避免地要展示许多代码 > openjdk-8u 源码:`hg clone http://hg.openjdk.java.net/jdk8/jdk8 openjdk8` [TOC] ## 1 Java 并发包总览 整个 `java.util.concurrent` 包,按照功能模块可以划分为: - 原子类(Atomic) - 锁(Lock)与条件(Condition) - 同步工具类 - 并发容器 - 线程池、Future、ForkJoinPool - CompletableFuture (Java 8 出现,android 不关注) ### 1.1 原子类(Atomic) Atomic类位于 `java.util.concurrent.atomic` 包: - AtomicInteger:保证一个`int` 类型加减操作的原子性,可以代替 `synchronized` 的加锁 - AtomicLong:同 AtomicInteger,类型为 `long` - AtomicBoolean:相比 volatile 修饰的 boolean 类型,可以保证例如 `if (flag == false) {flag = true;}` 的原子性 - AtomicReference:类似 AtomicBoolean,用来保证读写引用的原子性 - AtomicStampedReference:类似 AtomicReference,但是其通过版本号,可以避免 **ABA 问题** - AtomicMarkableReference:类似 AtomicStampedReference,区别在于版本号为 boolean 类型,不能完全避免ABA问题,只是降低了发生的概率 - AtomicIntegerFieldUpdater:对已存在的类,实现其成员变量(int)的原子操作 - AtomicLongFieldUpdater:对已存在的类,实现其成员变量(long)的原子操作 - AtomicReferenceFieldUpdater:对已存在的类,实现其成员变量(Object)的原子操作 - AtomicIntegerArray:支持对 int 数组中的元素执行原子操作 - AtomicLongArray:支持对 long 数组中的元素执行原子操作 - AtomicReferenceArray:支持对 Object 数组中的元素执行原子操作 - LongAddr:作用同 AtomicLong,通过分片技术(Striped64)提供了高并发场景下的性能,保证最终一致性而非强一致性 - DoubleAdder:类似 LongAddr,支持 double 类型 - LongAccumulator:类似 LongAddr,LongAddr只支持的累加操作,其支持自定义二元操作 - DoubleAccumulator:同 LongAccumulator,支持 double 类型 **涉及概念**: `乐观锁` `Compare And Set` `自旋` `原子操作` `最终一致性` `分片` ### 1.2 锁(Lock)与条件(Condition) 锁与条件位于 `java.util.concurrent.locks` 包: - LockSupport:一个用于支持线程唤醒和挂起操作的工具类 - AbstractQueuedSynchronizer:基于 Unsafe + 无锁队列实现的队列同步器(AQS),具备阻塞与唤醒线程和维护这些等待线程的无锁队列 - Condition:该接口定义了条件的基本操作 - Lock:该接口定义了锁的基本操作 - ReadWriteLock:该接口定义了读锁和写锁的获取 - ReentrantLock:基于 AQS 实现的一个可重入的独占锁(读读互斥,读写互斥,写写互斥) - ReentrantReadWriteLock:基于 AQS 实现的一个可重入的读写锁(读读不互斥、读写互斥、写写互斥) - StampedLock:基于 Unsafe + LockSupport 实现的一个可重入的读写锁(读读不互斥、读写不互斥、写写互斥) **涉及概念**: `Compare And Set` `CLH 队列` `重入锁` `公平锁与非公平锁` `线程中断` `互斥锁` `读写锁` `共享锁与独占锁` `MVCC机制` `悲观锁与乐观锁` `重排序` `内存屏障` ### 1.3 同步工具类 同步工具类位于 `java.util.concurrent` 包: - Semaphore:基于 AQS 实现的信号量,提供资源数量的并发访问控制 - CountDownLatch:基于 AQS 实现的计数器,可以使当前线程等待其他线程全部执行完毕后再执行 - CyclicBarrier:基于 ReentrantLock + Condition 实现的同步屏障,区别于 CountDownLatch,计数器可以重置,适用于更复杂的场景 - Exchanger:基于 Unsafe + LockSupport实现,用于线程间交换数据 - Phaser:基于 Unsafe + 无锁栈实现的,可用于代替 CountDownLatch 和 CyclicBarrier 的同步工具(用树结构维护) **涉及概念**: `Compare And Set` `无锁栈` `公平与非公平` ### 1.4 并发容器 并发容器位于 `java.util.concurrent` 包: - BlockingQueue:该接口定义了阻塞队列的基本操作 - ArrayBlockingQueue:基于数组 + ReentrantLock + Condition 实现的环形阻塞队列 - LinkedBlockingQueue:基于单链表 + ReentrantLock + Condition 实现的阻塞队列 - PriorityBlockingQueue:基于数组 + ReentrantLock + Condition 实现的阻塞队列(按元素优先级从小到大出队) - DelayQueue:基于 PriorityQueue + ReentrantLock + Condition 实现的阻塞队列(按延迟时间从小到大出队) - SynchronousQueue:基于 Unsafe 实现的特殊的阻塞队列(本身没有容量,直接通知等待线程,高效) - BlockingDeque:该接口定义了双端阻塞队列的基本操作 - LinkedBlockingDeque:基于双向链表 + ReentrantLock + Condition 实现的双端阻塞队列 - CopyOnWriteArrayList:基于数组 + ReentrantLock 实现的 List - CopyOnWriteArraySet:基于 CopyOnWriteArrayList 实现的 Set - ConcurrentLinkedQueue:基于单链表 + Unsafe 实现的 Queue - ConcurrentLinkedDueue:基于双向链表 + Unsafe 实现的 Deque - ConcurrentHashMap:基于数组 + 链表/红黑树 + Unsafe 实现的 HashMap - ConcurrentSkipListMap:基于跳查表 + Unsafe 实现的 TreeMap - ConcurrentSkipListSet:基于 ConcurrentSkipListMap 实现的 TreeSet ### 1.5 线程池与Future 线程池与Future位于 `java.util.concurrent` 包 ## 2 Java 并发包底层实现依赖 Java 并发包底层实现依赖于Java 内存模式、volatile 变量、CAS 算法等理论以及 Unsafe 类提供的一系列 native 方法 ### 2.1 Java 内存模型、volatile 变量与 CAS 算法 Java 内存模型与 volatitle 变量的语义,以及与CAS 的关系,参考 [\[Java内存模型\]](https://ku.baidu-int.com/knowledge/HFVrC7hq1Q/pKzJfZczuc/8BkidD8KVd/21H5m7F3V7tsFb) 一文 ### 2.2 Unsafe 类的应用 通过**反射**拿到 `sun.misc.Unsafe` 实例后,可以进行 - 读写类字段(int、long、short、boolean、byte、char、float、double、Object) - 定义普通类、匿名类、创建类实例 - 读写变量(普通读写、volatile读写、有序读写) - 比较交换(CAS)操作 - 操作堆外内存 - 操作监视器 - 设置内存屏障 - 唤醒与挂起线程 > Unsafe 实现的源码位置:hotspot/src/share/vm/prims/unsafe.cpp ## 3 AQS 框架的实现原理与应用 AQS 是 `AbstractQueuedSynchronizer` 类的简称,定义了一套多线程访问共享资源的**同步机制** **核心思想:**如果被请求的共享资源空闲,则将请求的线程设置为有效的工作线程,将共享资源锁定;如果共享资源被占用,则需要一定的挂起与唤醒机制(CLH队列变体队列)确保资源的分配 ### 3.1 AQS 实现原理 #### 3.1.1 AQS 数据结构 **AQS 维护了一个由无锁双向链表实现的阻塞队列,队列中的各个元素(**`**AQS.Node**`**)通过volatile变量(** `**AQS#state**`**)来进行状态维护** - `AQS#state` AQS 内部不操作 state 变量,由实现层定义 state 的含义与控制 state 的值 - `AQS#head` 指向双向链表头部,表示等待队列的头部,延迟初始化 - 除初始化外,它仅通过 `setHead` 进行修改(如果 `head` 存在,则会确保 `waitStatus` 不会变为 `CANCELLED`) - `AQS#tail` 指向双向链表尾部,表示等待队列的尾部,延迟初始化 - 仅通过 `enq` 方法修改以添加新的等待节点 - 队列操作: - **入队:将新的** `Node` 加到 `tail` 后面,然后对 `tail` 进行CAS操作 - **出队:**对 `head` 进行CAS操作,把 `head` 向后移一个位置 > AQS 入队的方法 ```java private Node enq(final Node node) { // AQS.enq for (;;) { Node t = tail; if (t == null) { // 必须初始化 if (compareAndSetHead(new Node())) tail = head; } else { // 将双向链表结点插入到 tail 后 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` ---- `AbstractQueuedSynchronizer.Node` 表示 **AQS 队列元素**,各字段与方法含义如下: - 构造方法 - `Node()` 提供给自定义初始化逻辑或 `SHARED` 状态使用 - `Node(Thread thread, Node mode)` 提供给 `addWaiter` 方法使用 - `Node(Thread thread, int waitStatus)` 提供给 `Condition` 使用 - 成员字段 - `volatile Node prev;` 前继结点 - `volatile Node next;` 后继结点 - `volatile Thread thread;` 结点中的线程 - `volatile int waitStatus;` - 数字 `0` 表示 Node 被初始化时的默认值 - `static final int CANCELLED = 1;` 表示线程获取锁的请求已经被取消了 - `static final int SIGNAL = -1;` 表示线程正在等待释放资源 - `static final int CONDITION = -2;` 表示结点在等待队列中,线程等待唤醒 - `static final int PROPAGATE = -3;` 当前线程处于SHARED情况,才会使用 - `Node nextWaiter;` AQS 中的条件队列是是通过 nextWaiter,以单向链表的形式保存的, `SHARED` 模式不存在 Condition, `EXCLUSIVE` 模式才存在 `Condition` - `static final Node SHARED = new Node();` 共享模式,多个线程可同时执行;例如 `ReadWriteReentrantLock.readLock` `Semaphore`(state != 1时) `CountDownLatch` - `static final Node EXCLUSIVE = null;` 独占模式,只有一个线程能执行;例如 `ReadWriteReentrantLock.writeLock` `ReentrantLock` `Semaphore`(state = 1 时) - 成员方法 - `final boolean isShared()` 是否为共享结点 - `final Node predecessor()` #### 3.1.2 AQS 方法架构 - **(1) API 层(*****Main exported methods*****):只需要重写API 层方法,即可使用AQS框架,定制自定义同步器** - 自定义同步器**可重写的方法** - `protected boolean tryAcquire(int arg)` 尝试通过独占方式,获取资源;返回 true 表示成功,false 表示失败 - `protected boolean tryRelease(int arg)` 尝试通过独占方式,释放资源;返回 true 表示成功,false 表示失败 - `protected int tryAcquireShared(int arg)` 尝试通过共享方式,获取资源;返回负数表示失败,0表示成功,但没有剩余可用资源,正数表示成功,还有剩余资源 - `protected boolean tryReleaseShared(int arg)` 尝试通过共享方式,释放资源;返回 true 表示释放后允许唤醒后继结点,false 表示不允许 - `protected boolean isHeldExclusively()` 返回当前线程是否独占资源,使用 Condition 时才需实现 - 自定义同步器不可重写的方法 - `public final void acquire(int arg)` 通过独占方式,获取资源(忽略中断) - `public final void acquireInterruptibly(int arg)` 通过独占方式,获取资源(响应中断) - `public final boolean tryAcquireNanos(int arg, long nanosTimeout)` 通过独占方式,获取资源(超时中止) - `public final void acquireShared(int arg)` 通过共享方式,获取资源(忽略中断) - `public final void acquireSharedInterruptibly(int arg)` 通过共享方式,获取资源(响应中断) - `public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)` 通过共享方式,获取资源(超时中止) - `public final boolean release(int arg)` 通过独占方式,释放资源 - `public final boolean releaseShared(int arg)` 通过共享方式,释放资源 - **(2) 资源获取层(*****Utilities for various versions of acquire*****):通过自定义同步器获取与释放资源时,会进入到锁获取层** - `private void cancelAcquire(Node node)` - `private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)` - `static void selfInterrupt()` - `private final boolean parkAndCheckInterrupt()` - `inal boolean acquireQueued(final Node node, int arg)` - `private void doAcquireInterruptibly(int arg)` - `private boolean doAcquireNanos(int arg, long nanosTimeout)` - `private void doAcquireShared(int arg)` - `private void doAcquireSharedInterruptibly(int arg)` - `private boolean doAcquireSharedNanos(int arg, long nanosTimeout)` - **(3) 队列检查层(*****Queue inspection methods*****):获取资源失败时,会进入到队列检查层,排队等待** - `public final boolean hasQueuedThreads()` 判断是否有线程正在等待获取,例如 ReentrantLock 用该方法来实现公平与非公平获取锁 - `public final boolean hasContended()` 询问是否有线程曾争夺过该同步器;也就是说,acquire方法是否曾经被阻止过 - `public final Thread getFirstQueuedThread()` 获取队列中的第一个线程 - `public final boolean isQueued(Thread thread)` 判断指定线程是否在队列中排队 - `final boolean apparentlyFirstQueuedIsExclusive()` 如果第一个排队线程(如果存在)以独占模式等待,则返回 true - `public final boolean hasQueuedPredecessors()` 查询是否有线程等待获取的时间长于当前线程 - **(4) 入队出队层:提供双向链表首尾结点的CAS操作** - `private Node addWaiter(Node mode)` - `private Node enq(final Node node)` - `final boolean transferForSignal(Node node)` - `final boolean transferAfterCancelledWait(Node node)` - `private void unparkSuccessor(Node node)` - `private static final boolean compareAndSetWaitStatus(Node node, int expect, int update)` - **(5) 数据提供层:** - `state` 变量的读写 ( `private volatile int state`),**可重写** - `protected final int getState()` 获取 state 变量的值 - `protected final void setState(int newState)` 设置 state 变量的值 - `protected final boolean compareAndSetState(int expect, int update)` CAS 设置 state 变量的值 - 测量和监控队列(*Instrumentation and monitoring methods*) - `public final int getQueueLength()` - `public final Collection<Thread> getQueuedThreads()` - `public final Collection<Thread> getExclusiveQueuedThreads()` - `public final Collection<Thread> getSharedQueuedThreads()` - *条件相关的内部方法(Internal support methods for Conditions)* - `final boolean isOnSyncQueue(Node node)` - `private boolean findNodeFromTail(Node node)` - `final boolean transferForSignal(Node node)` - `final boolean transferAfterCancelledWait(Node node)` - `final int fullyRelease(Node node)` - *条件相关的测量方法(Instrumentation methods for conditions)* - `public final boolean owns(ConditionObject condition)` - `public final boolean hasWaiters(ConditionObject condition)` - `public final int getWaitQueueLength(ConditionObject condition)` - `public final Collection<Thread> getWaitingThreads(ConditionObject condition)` - CAS 操作包装 - `private final boolean compareAndSetHead(Node update)` - `private final boolean compareAndSetTail(Node expect, Node update)` - `private static final boolean compareAndSetWaitStatus(Node node, int expect, int update)` - `private static final boolean compareAndSetNext(Node node, Node expect, Node update)` ### 3.2 ReentrantLock (独占锁)实现原理 ReentrantLock 的常用方法 : - lock 方法 - unlock 方法 - tryLock 方法 ---- ReentrantLock 具有以下特性: - 独占锁 - 可重入 - 支持公平与非公平(非公平可提高吞吐量) - 具备线程挂起与唤醒功能 ---- **锁实现的基本原理**与AQS的关系: - 可标记锁的状态( `AbstractQueuedSynchronizer#state`) - 可记录当前持有锁的线程( `AbstractOwnableSynchronizer#exclusiveOwnerThread`) - 支持对线程进行挂起和唤醒 ( `LockSupport#park` 和 `LockSupport#unpark` ) - 有一个维护所有阻塞线程的无锁队列( `AbstractQueuedSynchronizer.Node`) #### 3.2.1 ReentrantLock#state 变量的含义 `ReentrantLock#state` 变量的含义: - state=0,表示还没有线程获取锁 - state=1,表示有线程独占了锁 - state>1,表示锁被重入的次数 #### 3.2.2 ReentrantLock#lock 方法分析 > (1) `ReentrantLock#lock` 的公平与非公平实现 ```java static final class FairSync extends Sync { // 公平实现 final void lock() { acquire(1); // 在方法内部排队 } } static final class NonfairSync extends Sync { // 非公平实现 final void lock() { if (compareAndSetState(0, 1)) { // 直接抢锁 setExclusiveOwnerThread(Thread.currentThread()); // 抢锁成功,则独占 } else { acquire(1); // 抢锁失败,再在方法内部排队 } } } ``` > (2) `AQS#acquire` 方法分析:包括 `tryAcquire` 的公平与非公平实现、 `addWaiter`和 `acquireQueued`的实现过程 ```java public final void acquire(int arg) { if (!tryAcquire(arg) // 再次尝试拿锁,由子类实现 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 把线程放入阻塞队列,阻塞该线程 selfInterrupt(); // 返回true表示被中断过,通知进行中断 } } ``` > (3) `tryAcquire`的公平与非公平实现 ```java static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { // tryAcquire的公平实现 final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 没有线程持有锁,开始抢锁 if (!hasQueuedPredecessors() // 如果排在队列第一个 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } final boolean nonfairTryAcquire(int acquires) { // Sync#nonfairTryAcquire,tryAcquire的非公平实现 // ... if (c == 0) { if (compareAndSetState(0, acquires)) { // 与公平实现的唯一区别就是,此处没有 !hasQueuedPredecessors() setExclusiveOwnerThread(current); return true; } } // ... } ``` > (4) `addWaiter` 方法分析 ```java private Node addWaiter(Node mode) { // 为当前线程生成一个Node,然后把Node放入双向链表的尾部,线程还未阻塞 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 先尝试快速插入到队列尾部,成功则直接返回 pred.next = node; return node; } } enq(node); // 进行队列的初始化,新建一个空的Node,不断尝试自旋,直至成功把该Node加入队列尾部 return node; } ``` > (5) `acquireQueued` 方法分析 ```java final boolean acquireQueued(final Node node, int arg) { // AQS#acquireQueued boolean failed = true; try { boolean interrupted = false; // 会记录阻塞过程中有没有其他线程向自己发送中断信号 for (;;) { final Node p = node.predecessor(); // 前一个结点 if (p == head && tryAcquire(arg)) { // 如果自己的前一个结点是head指向的空结点,即队列头部,则尝试拿锁 setHead(node); // 拿锁成功,出队列(head前移一个结点) p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { // 调用park挂起自己 interrupted = true; } } } finally { if (failed) { cancelAcquire(node); } } } ``` #### 3.2.3 ReentrantLock#unlock 方法分析 > `unlock` 不区分公平或非公平 ```java public void unlock() { // java.util.concurrent.locks.ReentrantLock#unlock sync.release(1); } public final boolean release(int arg) { // AQS#release if (tryRelease(arg)) { // 1. 释放锁 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 2. 唤醒队列后继者 return true; } return false; } protected final boolean tryRelease(int releases) { // ReentrantLock.Sync#tryRelease int c = getState() - releases; // 减少重入次数 if (Thread.currentThread() != getExclusiveOwnerThread()) // 只有锁的拥有者才能unlock throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 重入次数减到0时释放锁 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { // AQS#unparkSuccessor int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } ``` #### 3.2.4 ReentrantLock#lockInterruptibly 与 tryLock 的分析 (略) `ReentrantLock#lockInterruptibly` 与 `ReentrantLock#tryLock` 方法的实现只是多了层封装,不再赘述 ### 3.3 ReentrantReadWriteLock (读写锁)实现原理 ReentrantReadWriteLock 的常用方法: - readLock().lock - readLock().unlock - writeLock().lock - writeLock().unlock ---- ReentrantReadWriteLock 有以下特性 - **读写互斥、写写互斥、读读不互斥**(利用该特性,可以在**读多写少**的场景,替换独占锁,优化性能) - 可重入 - 具备公平与非公平实现 - 具备线程挂起与唤醒功能 ---- 分析过程包括: - state 变量的含义 - readLock 和 writeLock 的实现 - 公平与非公平实现 - `AQS#acquireShared` 和 `AQS#releaseShared` 的实现 #### 3.3.1 ReentrantReadWriteLock#state 变量的含义 - 用 `state` 低16位用记录写锁的重入次数,高16位记录读锁的重入次数 - `state=0` 表示没有线程持有读锁或写锁 - `state!=0` 时,要么有线程持有读锁,要么持有写锁;可以进一步通过 `sharedCount(state)` 判断是否持有读锁, `execlusiveCount(state)` 判断是否持有写锁 > `java.util.concurrent.locks.ReentrantReadWriteLock.Sync` 中对 state 变量含义的定义 ```java static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } ``` #### 3.3.2 readLock() 和 writeLock() 的实现分析 - `readLock()` 和 `writeLock()` 分别是读锁和写锁的视图,返回是 `Lock` 接口的实现 - writeLock()基于 `AQS#acquire` 和 `AQS#release` 方法实现;readLock()基于 `AQS#acquireShared` 和 `AQS#acquireRelease` 实现 - 通过内部抽象类Sync实现上述AQS的模板方法,又抽象出 `readerShouldBlock()` 和 `writerShouldBlock()` 方法来扩展公平和非公平实现 ---- `AQS#acquire` 和 `AQS#release` 方法已在 `ReentrantLock` 中分析,不再赘述;下一节只分析 `AQS#acquireShared` 和 `AQS#releaseShared` 方法; ---- 先简要给出读写锁公平与非公平实现的过程: > `ReentrantReadWriteLock.NonfairSync` 和 `ReentrantReadWriteLock.FairSync` 实现分析 ```java static final class NonfairSync extends Sync { // 非公平实现,ReentrantReadWriteLock.NonfairSync final boolean writerShouldBlock() { return false; // 写线程在抢锁前永远不被阻塞,非公平 } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); // 读线程抢锁时,如果队首元素是写线程,才阻塞 } } static final class FairSync extends Sync { // 公平实现,ReentrantReadWriteLock.FairSync final boolean writerShouldBlock() { return hasQueuedPredecessors(); // 写线程抢锁前,排队 } final boolean readerShouldBlock() { return hasQueuedPredecessors(); // 读线程抢锁前,排队 } } ``` #### 3.3.3 ReadLock#lock 方法分析 ```java public void lock() { // ReentrantReadWriteLock.ReadLock#lock sync.acquireShared(1); // AQS#acquireShared } public final void acquireShared(int arg) { // AQS#acquireShared if (tryAcquireShared(arg) < 0) // 子类实现,实际在 ReentrantReadWriteLock.Sync#tryAcquireShared 中实现 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { // ReentrantReadWriteLock.Sync.tryAcquireShared Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 // 被独占 && getExclusiveOwnerThread() != current) {// 且不是当前线程独占 return -1; // 拿不到读锁,返回值小于 0 表示失败 } // 能走到这里,就是没有被独占 int r = sharedCount(c); // 获取读线程数量 if (!readerShouldBlock() // 公平锁:排队;非公平锁:队首元素非写线程 && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // CAS 更新读线程数,高16位+1(1左移16位+1) if (r == 0) { // 表示当前线程是第1个拿到读锁的线程 firstReader = current; // 只用于统计,不影响流程 firstReaderHoldCount = 1; } else if (firstReader == current) { // 读锁重入 firstReaderHoldCount++; // 第一个读线程的锁重入次数+1 } else { // 其他进来的读线程 HoldCounter rh = cachedHoldCounter; // 只用于统计,不影响流程 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } ``` #### 3.3.4 ReadLock#unlock 方法分析 ```java public void unlock() { // ReentrantReadWriteLock.ReadLock.unlock sync.releaseShared(1); } public final boolean releaseShared(int arg) { // AQS.releaseShared if (tryReleaseShared(arg)) { // 子类实现 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { // ReentrantReadWriteLock.Sync.tryReleaseShared Thread current = Thread.currentThread(); if (firstReader == current) { // 当前线程在队首 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) // 读线程重入次数减到0 firstReader = null; // 则该线程不再持有该锁 else firstReaderHoldCount--; // 重入次数减1 } else { // 其他读线程,在 readHolds 中维护 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // CAS 自旋更新 state 变量 int c = getState(); int nextc = c - SHARED_UNIT; // 高16位减1 if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; // 等于0表示被释放 } } ``` #### 3.3.5 WriteLock#lock 方法分析 ```java public void lock() { // ReentrantReadWriteLock.WriteLock.lock sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) // 再次尝试拿锁,由子类实现 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 把线程放入阻塞队列,阻塞该线程 selfInterrupt(); // } } protected final boolean tryAcquire(int acquires) { // ReentrantReadWriteLock.Sync.tryAcquire Thread current = Thread.currentThread(); int c = getState(); // 用于判断是否有读写线程 int w = exclusiveCount(c); // 写线程的重入数(写线程只能有一个) if (c != 0) { // 被读线程或被写线程占用,此时必然互斥 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) // 锁被读线程持有或者不是被当前线程独占,则返回 return false; // 获取写锁失败 if (w + exclusiveCount(acquires) > MAX_COUNT) // 重入数,低16位用满,抛错误 throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); // 更新写锁重入数 return true; } // 下面进入抢锁环节 if (writerShouldBlock() // 公平实现:队列里有其他线程,则排队;非公平实现:不被阻塞 || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); // 独占写锁 return true; } ``` #### 3.3.6 WriteLock#unlock 方法分析 ```java public void unlock() { // ReentrantReadWriteLock.WriteLock.unlock sync.release(1); } public final boolean release(int arg) { // AQS#release if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { // ReentrantReadWriteLock.Sync.tryRelease if (!isHeldExclusively()) // 确保unlock方法是被持有锁的线程调用 throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 写锁重入数减1 boolean free = exclusiveCount(nextc) == 0; // 是否被释放 if (free) setExclusiveOwnerThread(null); // 释放独占线程 setState(nextc); // 更新状态 return free; } ``` ### 3.4 Condition (条件)实现原理 Condition 的常用方法: - await - signal ---- - 如同 `Object#wait()` 和 `Object#notify()` 方法必须和 `synchronized` 一起使用, `Condition#awiat()` 和 `Condition#signal()` 必须和 `Lock` 一起使用 - Condition 相比 `wait/notify`,避免了生产者通知生产者,消费者通知消费者的问题 - 互斥锁 `ReentrantLock.Sync#newCondition` 和 读写锁的写锁`ReentrantReadWriteLock.WriteLock#newCondition` 都使用了 `AQS#newCondition` ,其中读锁不支持 `newCondition` > Condition 的创建 ```java public Condition newCondition() { // ReentrantLock.newCondition return sync.newCondition(); } final ConditionObject newCondition() { // ReentrantLock.Sync.newCondition 或 ReentrantReadWriteLock.Sync#newCondition return new ConditionObject(); } ``` #### 3.4.1 ConditionObject.await 方法分析 ```java public final void await() throws InterruptedException { // AQS.ConditionObject.await() if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 1. 将当前线程加入等待队列(由于已拿到锁,因此方法内部线程安全) int savedState = fullyRelease(node); // 2. 挂起前必须先释放锁 int interruptMode = 0; while (!isOnSyncQueue(node)) { // Node 是否在 AQS 队列中 LockSupport.park(this); // 挂起 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 3. 重新拿锁 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) // 4. 如果被中断唤醒,向外抛出中断异常 reportInterruptAfterWait(interruptMode); } ``` #### 3.4.2 ConditionObject.signal 方法分析 ```java public final void signal() { // AQS.ConditionObject.signal if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 队首 if (first != null) doSignal(first); // 真正执行唤醒 } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); // 先放到同步队列,再unpark唤醒 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 唤醒 return true; } ``` ### 3.5 CountDownLaunch (计数屏障)实现原理 CountDownLaunch 的常用方法: - await:调用 await 的线程,将等待 count 被减到0 - countDown:每次调用 countDown,都会将 count 值减1 #### 3.5.1 CountDownLaunch#state 变量的含义 用 `CountDownLaunch#state` 变量表示**未 countDown 的数量**,当 `state` 为0时,调用 awiat 的线程会从挂起中唤醒 #### 3.5.2 CountDownLatch#await 方法分析 ```java public void await() throws InterruptedException { // java.util.concurrent.CountDownLatch.await() sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // AQS.acquireSharedInterruptibly if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 在 CountDownLatch.Sync#tryAcquireShared 重写 doAcquireSharedInterruptibly(arg); } private static final class Sync extends AbstractQueuedSynchronizer { // CountDownLatch.Sync Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { // CountDownLatch.Sync#tryAcquireShared return (getState() == 0) ? 1 : -1; } // tryReleaseShared } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // CountDownLatch#await() 实现原理 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } ``` #### 3.5.3 CountDownLatch#countDown 方法分析 ```java public void countDown() { // CountDownLatch.countDown sync.releaseShared(1); } public final boolean releaseShared(int arg) { // AQS.releaseShared if (tryReleaseShared(arg)) { // 子类实现 doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // CountDownLatch.Sync#tryReleaseShared for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } ``` ### 3.6 Semaphore (信号量)实现原理 Semaphore 的常用方法: - acquire:令资源数减 n - release:令资源数加 n #### 3.6.1 Semaphore#state 变量的含义 `Semaphore#state` 变量表示**资源总数**,调用 acquire 方法对 state 进行 CAS 减操作,**减到0后,线程阻塞**;调用 release 方法对 state 进行 CAS 加操作 #### 3.6.2 Semaphore#acquire 与 release 方法分析 > Semaphore 各方法源码,与锁的实现类似,不再赘述 ```java public void acquire() throws InterruptedException { // Semaphore.acquire() sync.acquireSharedInterruptibly(1); } public void acquire(int permits) throws InterruptedException { // Semaphore.acquire(int) if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } public void release() { sync.releaseShared(1); } public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } ``` ## 4 无锁编程模式 ### 4.1 内存屏障(一写一读) > linux 内核的 kfifo 队列:[root/kernel/kfifo.c](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/kernel/kfifo.c?h=linux-2.6.38.y) ,通过 `smp_wmb()` 插入 Store 屏障,确保更新指针的操作不会重排序到修改数据之前,以及更新指针的时候,Store Cahce 被刷新,其他 CPU 可见 ```c static void kfifo_copy_in(struct __kfifo *fifo, const void *src, unsigned int len, unsigned int off) { unsigned int size = fifo->mask + 1; unsigned int esize = fifo->esize; unsigned int l; off &= fifo->mask; if (esize != 1) { off *= esize; size *= esize; len *= esize; } l = min(len, size - off); memcpy(fifo->data + off, src, l); memcpy(fifo->data, src + l, len - l); /* * make sure that the data in the fifo is up to date before * incrementing the fifo->in index counter */ smp_wmb(); } ``` > `java.util.concurrent.locks.StampedLock#validate` 方法中,通过 `sun.misc.Unsafe#loadFence` 插入内存屏障,对非 volatile 的局部变量 `stamp` ,避免调用方进行乐观读时,代码被重排序 ```c long stamp = sl.tryOptimisticRead(); double currentX = x, currentY = y; // validate(stamp) 中插入了内存屏障,这行读取x值和y值的代码不会被重排序到上一行前 if (!sl.validate(stamp)) { // 插入内存屏障,避免前面的代码被重排序 } ``` ### 4.2 volatile(一写多读) `volatile` 修饰的变量,在被修改后会将值刷新到主内存中,确保各个工作内存中读到的值是一致的 ### 4.3 无锁队列(多写多读,可在双端增删元素) 参考 AQS 使用的无锁队列 ### 4.4 无锁栈(多写多读,可在栈顶增删元素) 参考 `java.util.concurrent.Phaser` 和 `ForkJoinPool` 使用的无锁栈 ### 4.5 无锁链表(多写多读,可在中间增删元素) 参考 `java.util.concurrent.ConcurrentSkipListMap` 使用的无锁跳查表 ## 5 并发编程模式\* ### 5.1 信号量、Latch 与同步屏障 - synchronized + wait() + notify() 实现信号量 - synchronized + wait() + notify() 实现 Launch 模式 - 线程池 + 原子类实现 Launch 模式 - Semaphore、CountDownLatch、CyclicBarrier 与 Phaser 的适用场景 ### 5.2 发布订阅模式 **相关概念:** - 观察者模式(Observer)是为了实现**松耦合**,和发布订阅模式(Publish–Subscribe)通过注册中心,实现了**完全解耦** - 担保-挂起(Guarded Suspension)模式,是很多设计模式(例如生产者-消费者模式)的基础 - Balking 模式与担保-挂起模式类似,但选择的是**放弃**而不是挂起 **相关示例:** - synchronized + wait() + notify() 实现阻塞队列 - Lock + Condition 实现阻塞队列 ### 5.3 线程池模式 **线程池(** `java.util.concurrent.ThreadPoolExecutor`**)原理:** - 线程池入参的含义 - 在不断往线程池(ThreadPoolExecutor)中提交任务(Runnable)时,先由核心线程处理,核心线程数(corePoolSize)不够时,存储到任务队列(BlockingQueue),任务队列满后,将创建线程至最大线程数(maximumPoolSize)来处理任务,如果还是不够处理任务,则使用拒绝策略(RejectedExecutionHandler) - 当任务处理完后,除了核心线程,其他线程会在经过保持时间(keepAliveTime)后被销毁 - 线程池中的 `ctl` 字段含义:高3位表示线程池的运行状态(runState),低29位表示工作线程数量(workerCount) - 线程池状态(runState)迁移(有向无环图): - RUNNING(-1) 经过 shutdown() 变为 SHUTDOWN(0) - RUNNING(-1) 经过 shutdownNow() 变为 STOP(1) - SHUTDOWN(0) 经过 shutdownNow() 变为 STOP(1) - SHUTDOWN(0) 在队列和线程池为空时,变为TIDYING(2) - STOP(1) 在队列和线程池为空时,变为TIDYING(2) - TIDYING(2) 经过 terminated() 变为 TERMINATED(3) - shutdown() 与 shutdownNow() 的区别 - shutdown() 不会清空任务队列,会等所有任务执行完成;shutdownNow() 会清空任务队列 - shutdown() 只中断空闲线程,shutdownNow() 会中断所有线程 > 线程池提交任务的 `execute` 方法分析 ```java public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 1. 当前线程数小于核心线程数,创建核心线程,并start任务 if (addWorker(command, true)) // start 任务成功,则返回 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 2. 当前线程数大于等于核心线程数,则放入队列 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // 3. 放入队列失败,则新建空闲线程 start 任务 reject(command); // 4. 使用空闲线程 start 任务失败,使用拒绝策略 } private boolean addWorker(Runnable firstTask, boolean core) { // 新开线程 retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 判断线程池是否处于 shutdown 后续状态(如果正好是shutdown,还有任务则不退出) if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (; ; ) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 线程数超过上界 return false; if (compareAndIncrementWorkerCount(c)) // workCouut 加1成功则跳出循环 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // runState 发生了变化,重新开始for循环 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // workCount 成加1,开始添加线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 创建一个工作线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 添加线程 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 添加成功,则启动线程 workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; } ``` > 工作线程的执行过程分析 ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // ThreadPoolExecutor.Worker,继承自AQS /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // ... } final void runWorker(Worker w) { // ThreadPoolExecutor#runWorker Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 不断从队列中取任务执行 w.lock(); // 执行任务前先加锁,shutdown时会tryLock() if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 检查运行状态是否已停止 wt.interrupt(); try { beforeExecute(wt, task); // 钩子方法,默认空实现 Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 钩子方法,默认空实现 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 确保 Worker 退出时,能执行下面的退出处理逻辑 processWorkerExit(w, completedAbruptly); } } ``` ### 5.4 Future 模式 **背景:** - 同步的API调用比较耗时的时候,可以先选择获取一个立即返回的凭据(Future),调用者所在线程不用陷入未知时长的阻塞中,在未来的某个时间再根据Future获取结果 - 还可以增加回调(Callback)的机制,无需通过阻塞方法(get)获取结果,而是注入一个回调方法,以此提高系统的响应时间,充分利用CPU资源 实现模式可以参考JDK源码或 [pattern/future](https://console.cloud.baidu-int.com/devops/icode/repos/baidu/personal-code/z8g/tree/master/pattern/src/main/java/com/baidu/z8g/pattern/concurrent/future) ### 5.5 CopyOnWrite 模式 参考 `CopyOnWriteArrayList`、 `StampedLock` 等工具的实现 ### 5.6 ForkJoin 模式 Forkjoin 是 JDK 7 中提供分治算法的多线程并行计算框架: - 分治算法的实现 - 工作窃取的实现 - 并行计算的实现 ### 5.7 Thread-Per-Message 模式\* ### 5.8 Worker-Thread 模式\* ### 5.9 Active Objects 模式\* ### 5.10 消息总线(Event Bus)模式\*