> 本文用于系统总结 Java 并发包相关概念,只拣重点阐述,但实现细节不可避免地要展示许多代码
> openjdk-8u 源码:`hg clone http://hg.openjdk.java.net/jdk8/jdk8 openjdk8`
[TOC]
## 1 Java 并发包总览
整个 `java.util.concurrent` 包,按照功能模块可以划分为:
- 原子类(Atomic)
- 锁(Lock)与条件(Condition)
- 同步工具类
- 并发容器
- 线程池、Future、ForkJoinPool
- CompletableFuture (Java 8 出现,android 不关注)
### 1.1 原子类(Atomic)
Atomic类位于 `java.util.concurrent.atomic` 包:
- AtomicInteger:保证一个`int` 类型加减操作的原子性,可以代替 `synchronized` 的加锁
- AtomicLong:同 AtomicInteger,类型为 `long`
- AtomicBoolean:相比 volatile 修饰的 boolean 类型,可以保证例如 `if (flag == false) {flag = true;}` 的原子性
- AtomicReference:类似 AtomicBoolean,用来保证读写引用的原子性
- AtomicStampedReference:类似 AtomicReference,但是其通过版本号,可以避免 **ABA 问题**
- AtomicMarkableReference:类似 AtomicStampedReference,区别在于版本号为 boolean 类型,不能完全避免ABA问题,只是降低了发生的概率
- AtomicIntegerFieldUpdater:对已存在的类,实现其成员变量(int)的原子操作
- AtomicLongFieldUpdater:对已存在的类,实现其成员变量(long)的原子操作
- AtomicReferenceFieldUpdater:对已存在的类,实现其成员变量(Object)的原子操作
- AtomicIntegerArray:支持对 int 数组中的元素执行原子操作
- AtomicLongArray:支持对 long 数组中的元素执行原子操作
- AtomicReferenceArray:支持对 Object 数组中的元素执行原子操作
- LongAddr:作用同 AtomicLong,通过分片技术(Striped64)提供了高并发场景下的性能,保证最终一致性而非强一致性
- DoubleAdder:类似 LongAddr,支持 double 类型
- LongAccumulator:类似 LongAddr,LongAddr只支持的累加操作,其支持自定义二元操作
- DoubleAccumulator:同 LongAccumulator,支持 double 类型
**涉及概念**: `乐观锁` `Compare And Set` `自旋` `原子操作` `最终一致性` `分片`
### 1.2 锁(Lock)与条件(Condition)
锁与条件位于 `java.util.concurrent.locks` 包:
- LockSupport:一个用于支持线程唤醒和挂起操作的工具类
- AbstractQueuedSynchronizer:基于 Unsafe + 无锁队列实现的队列同步器(AQS),具备阻塞与唤醒线程和维护这些等待线程的无锁队列
- Condition:该接口定义了条件的基本操作
- Lock:该接口定义了锁的基本操作
- ReadWriteLock:该接口定义了读锁和写锁的获取
- ReentrantLock:基于 AQS 实现的一个可重入的独占锁(读读互斥,读写互斥,写写互斥)
- ReentrantReadWriteLock:基于 AQS 实现的一个可重入的读写锁(读读不互斥、读写互斥、写写互斥)
- StampedLock:基于 Unsafe + LockSupport 实现的一个可重入的读写锁(读读不互斥、读写不互斥、写写互斥)
**涉及概念**: `Compare And Set` `CLH 队列` `重入锁` `公平锁与非公平锁` `线程中断` `互斥锁` `读写锁` `共享锁与独占锁` `MVCC机制` `悲观锁与乐观锁` `重排序` `内存屏障`
### 1.3 同步工具类
同步工具类位于 `java.util.concurrent` 包:
- Semaphore:基于 AQS 实现的信号量,提供资源数量的并发访问控制
- CountDownLatch:基于 AQS 实现的计数器,可以使当前线程等待其他线程全部执行完毕后再执行
- CyclicBarrier:基于 ReentrantLock + Condition 实现的同步屏障,区别于 CountDownLatch,计数器可以重置,适用于更复杂的场景
- Exchanger:基于 Unsafe + LockSupport实现,用于线程间交换数据
- Phaser:基于 Unsafe + 无锁栈实现的,可用于代替 CountDownLatch 和 CyclicBarrier 的同步工具(用树结构维护)
**涉及概念**: `Compare And Set` `无锁栈` `公平与非公平`
### 1.4 并发容器
并发容器位于 `java.util.concurrent` 包:
- BlockingQueue:该接口定义了阻塞队列的基本操作
- ArrayBlockingQueue:基于数组 + ReentrantLock + Condition 实现的环形阻塞队列
- LinkedBlockingQueue:基于单链表 + ReentrantLock + Condition 实现的阻塞队列
- PriorityBlockingQueue:基于数组 + ReentrantLock + Condition 实现的阻塞队列(按元素优先级从小到大出队)
- DelayQueue:基于 PriorityQueue + ReentrantLock + Condition 实现的阻塞队列(按延迟时间从小到大出队)
- SynchronousQueue:基于 Unsafe 实现的特殊的阻塞队列(本身没有容量,直接通知等待线程,高效)
- BlockingDeque:该接口定义了双端阻塞队列的基本操作
- LinkedBlockingDeque:基于双向链表 + ReentrantLock + Condition 实现的双端阻塞队列
- CopyOnWriteArrayList:基于数组 + ReentrantLock 实现的 List
- CopyOnWriteArraySet:基于 CopyOnWriteArrayList 实现的 Set
- ConcurrentLinkedQueue:基于单链表 + Unsafe 实现的 Queue
- ConcurrentLinkedDueue:基于双向链表 + Unsafe 实现的 Deque
- ConcurrentHashMap:基于数组 + 链表/红黑树 + Unsafe 实现的 HashMap
- ConcurrentSkipListMap:基于跳查表 + Unsafe 实现的 TreeMap
- ConcurrentSkipListSet:基于 ConcurrentSkipListMap 实现的 TreeSet
### 1.5 线程池与Future
线程池与Future位于 `java.util.concurrent` 包
## 2 Java 并发包底层实现依赖
Java 并发包底层实现依赖于Java 内存模式、volatile 变量、CAS 算法等理论以及 Unsafe 类提供的一系列 native 方法
### 2.1 Java 内存模型、volatile 变量与 CAS 算法
Java 内存模型与 volatitle 变量的语义,以及与CAS 的关系,参考 [\[Java内存模型\]](https://ku.baidu-int.com/knowledge/HFVrC7hq1Q/pKzJfZczuc/8BkidD8KVd/21H5m7F3V7tsFb) 一文
### 2.2 Unsafe 类的应用
通过**反射**拿到 `sun.misc.Unsafe` 实例后,可以进行
- 读写类字段(int、long、short、boolean、byte、char、float、double、Object)
- 定义普通类、匿名类、创建类实例
- 读写变量(普通读写、volatile读写、有序读写)
- 比较交换(CAS)操作
- 操作堆外内存
- 操作监视器
- 设置内存屏障
- 唤醒与挂起线程
> Unsafe 实现的源码位置:hotspot/src/share/vm/prims/unsafe.cpp
## 3 AQS 框架的实现原理与应用
AQS 是 `AbstractQueuedSynchronizer` 类的简称,定义了一套多线程访问共享资源的**同步机制**
**核心思想:**如果被请求的共享资源空闲,则将请求的线程设置为有效的工作线程,将共享资源锁定;如果共享资源被占用,则需要一定的挂起与唤醒机制(CLH队列变体队列)确保资源的分配
### 3.1 AQS 实现原理
#### 3.1.1 AQS 数据结构
**AQS 维护了一个由无锁双向链表实现的阻塞队列,队列中的各个元素(**`**AQS.Node**`**)通过volatile变量(** `**AQS#state**`**)来进行状态维护**
- `AQS#state` AQS 内部不操作 state 变量,由实现层定义 state 的含义与控制 state 的值
- `AQS#head` 指向双向链表头部,表示等待队列的头部,延迟初始化
- 除初始化外,它仅通过 `setHead` 进行修改(如果 `head` 存在,则会确保 `waitStatus` 不会变为 `CANCELLED`)
- `AQS#tail` 指向双向链表尾部,表示等待队列的尾部,延迟初始化
- 仅通过 `enq` 方法修改以添加新的等待节点
- 队列操作:
- **入队:将新的** `Node` 加到 `tail` 后面,然后对 `tail` 进行CAS操作
- **出队:**对 `head` 进行CAS操作,把 `head` 向后移一个位置
> AQS 入队的方法
```java
private Node enq(final Node node) { // AQS.enq
for (;;) {
Node t = tail;
if (t == null) { // 必须初始化
if (compareAndSetHead(new Node()))
tail = head;
} else { // 将双向链表结点插入到 tail 后
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
```
----
`AbstractQueuedSynchronizer.Node` 表示 **AQS 队列元素**,各字段与方法含义如下:
- 构造方法
- `Node()` 提供给自定义初始化逻辑或 `SHARED` 状态使用
- `Node(Thread thread, Node mode)` 提供给 `addWaiter` 方法使用
- `Node(Thread thread, int waitStatus)` 提供给 `Condition` 使用
- 成员字段
- `volatile Node prev;` 前继结点
- `volatile Node next;` 后继结点
- `volatile Thread thread;` 结点中的线程
- `volatile int waitStatus;`
- 数字 `0` 表示 Node 被初始化时的默认值
- `static final int CANCELLED = 1;` 表示线程获取锁的请求已经被取消了
- `static final int SIGNAL = -1;` 表示线程正在等待释放资源
- `static final int CONDITION = -2;` 表示结点在等待队列中,线程等待唤醒
- `static final int PROPAGATE = -3;` 当前线程处于SHARED情况,才会使用
- `Node nextWaiter;` AQS 中的条件队列是是通过 nextWaiter,以单向链表的形式保存的, `SHARED` 模式不存在 Condition, `EXCLUSIVE` 模式才存在 `Condition`
- `static final Node SHARED = new Node();` 共享模式,多个线程可同时执行;例如 `ReadWriteReentrantLock.readLock` `Semaphore`(state != 1时) `CountDownLatch`
- `static final Node EXCLUSIVE = null;` 独占模式,只有一个线程能执行;例如 `ReadWriteReentrantLock.writeLock` `ReentrantLock` `Semaphore`(state = 1 时)
- 成员方法
- `final boolean isShared()` 是否为共享结点
- `final Node predecessor()`
#### 3.1.2 AQS 方法架构
- **(1) API 层(*****Main exported methods*****):只需要重写API 层方法,即可使用AQS框架,定制自定义同步器**
- 自定义同步器**可重写的方法**
- `protected boolean tryAcquire(int arg)` 尝试通过独占方式,获取资源;返回 true 表示成功,false 表示失败
- `protected boolean tryRelease(int arg)` 尝试通过独占方式,释放资源;返回 true 表示成功,false 表示失败
- `protected int tryAcquireShared(int arg)` 尝试通过共享方式,获取资源;返回负数表示失败,0表示成功,但没有剩余可用资源,正数表示成功,还有剩余资源
- `protected boolean tryReleaseShared(int arg)` 尝试通过共享方式,释放资源;返回 true 表示释放后允许唤醒后继结点,false 表示不允许
- `protected boolean isHeldExclusively()` 返回当前线程是否独占资源,使用 Condition 时才需实现
- 自定义同步器不可重写的方法
- `public final void acquire(int arg)` 通过独占方式,获取资源(忽略中断)
- `public final void acquireInterruptibly(int arg)` 通过独占方式,获取资源(响应中断)
- `public final boolean tryAcquireNanos(int arg, long nanosTimeout)` 通过独占方式,获取资源(超时中止)
- `public final void acquireShared(int arg)` 通过共享方式,获取资源(忽略中断)
- `public final void acquireSharedInterruptibly(int arg)` 通过共享方式,获取资源(响应中断)
- `public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)` 通过共享方式,获取资源(超时中止)
- `public final boolean release(int arg)` 通过独占方式,释放资源
- `public final boolean releaseShared(int arg)` 通过共享方式,释放资源
- **(2) 资源获取层(*****Utilities for various versions of acquire*****):通过自定义同步器获取与释放资源时,会进入到锁获取层**
- `private void cancelAcquire(Node node)`
- `private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)`
- `static void selfInterrupt()`
- `private final boolean parkAndCheckInterrupt()`
- `inal boolean acquireQueued(final Node node, int arg)`
- `private void doAcquireInterruptibly(int arg)`
- `private boolean doAcquireNanos(int arg, long nanosTimeout)`
- `private void doAcquireShared(int arg)`
- `private void doAcquireSharedInterruptibly(int arg)`
- `private boolean doAcquireSharedNanos(int arg, long nanosTimeout)`
- **(3) 队列检查层(*****Queue inspection methods*****):获取资源失败时,会进入到队列检查层,排队等待**
- `public final boolean hasQueuedThreads()` 判断是否有线程正在等待获取,例如 ReentrantLock 用该方法来实现公平与非公平获取锁
- `public final boolean hasContended()` 询问是否有线程曾争夺过该同步器;也就是说,acquire方法是否曾经被阻止过
- `public final Thread getFirstQueuedThread()` 获取队列中的第一个线程
- `public final boolean isQueued(Thread thread)` 判断指定线程是否在队列中排队
- `final boolean apparentlyFirstQueuedIsExclusive()` 如果第一个排队线程(如果存在)以独占模式等待,则返回 true
- `public final boolean hasQueuedPredecessors()` 查询是否有线程等待获取的时间长于当前线程
- **(4) 入队出队层:提供双向链表首尾结点的CAS操作**
- `private Node addWaiter(Node mode)`
- `private Node enq(final Node node)`
- `final boolean transferForSignal(Node node)`
- `final boolean transferAfterCancelledWait(Node node)`
- `private void unparkSuccessor(Node node)`
- `private static final boolean compareAndSetWaitStatus(Node node, int expect, int update)`
- **(5) 数据提供层:**
- `state` 变量的读写 ( `private volatile int state`),**可重写**
- `protected final int getState()` 获取 state 变量的值
- `protected final void setState(int newState)` 设置 state 变量的值
- `protected final boolean compareAndSetState(int expect, int update)` CAS 设置 state 变量的值
- 测量和监控队列(*Instrumentation and monitoring methods*)
- `public final int getQueueLength()`
- `public final Collection<Thread> getQueuedThreads()`
- `public final Collection<Thread> getExclusiveQueuedThreads()`
- `public final Collection<Thread> getSharedQueuedThreads()`
- *条件相关的内部方法(Internal support methods for Conditions)*
- `final boolean isOnSyncQueue(Node node)`
- `private boolean findNodeFromTail(Node node)`
- `final boolean transferForSignal(Node node)`
- `final boolean transferAfterCancelledWait(Node node)`
- `final int fullyRelease(Node node)`
- *条件相关的测量方法(Instrumentation methods for conditions)*
- `public final boolean owns(ConditionObject condition)`
- `public final boolean hasWaiters(ConditionObject condition)`
- `public final int getWaitQueueLength(ConditionObject condition)`
- `public final Collection<Thread> getWaitingThreads(ConditionObject condition)`
- CAS 操作包装
- `private final boolean compareAndSetHead(Node update)`
- `private final boolean compareAndSetTail(Node expect, Node update)`
- `private static final boolean compareAndSetWaitStatus(Node node, int expect, int update)`
- `private static final boolean compareAndSetNext(Node node, Node expect, Node update)`
### 3.2 ReentrantLock (独占锁)实现原理
ReentrantLock 的常用方法 :
- lock 方法
- unlock 方法
- tryLock 方法
----
ReentrantLock 具有以下特性:
- 独占锁
- 可重入
- 支持公平与非公平(非公平可提高吞吐量)
- 具备线程挂起与唤醒功能
----
**锁实现的基本原理**与AQS的关系:
- 可标记锁的状态( `AbstractQueuedSynchronizer#state`)
- 可记录当前持有锁的线程( `AbstractOwnableSynchronizer#exclusiveOwnerThread`)
- 支持对线程进行挂起和唤醒 ( `LockSupport#park` 和 `LockSupport#unpark` )
- 有一个维护所有阻塞线程的无锁队列( `AbstractQueuedSynchronizer.Node`)
#### 3.2.1 ReentrantLock#state 变量的含义
`ReentrantLock#state` 变量的含义:
- state=0,表示还没有线程获取锁
- state=1,表示有线程独占了锁
- state>1,表示锁被重入的次数
#### 3.2.2 ReentrantLock#lock 方法分析
> (1) `ReentrantLock#lock` 的公平与非公平实现
```java
static final class FairSync extends Sync { // 公平实现
final void lock() {
acquire(1); // 在方法内部排队
}
}
static final class NonfairSync extends Sync { // 非公平实现
final void lock() {
if (compareAndSetState(0, 1)) { // 直接抢锁
setExclusiveOwnerThread(Thread.currentThread()); // 抢锁成功,则独占
} else {
acquire(1); // 抢锁失败,再在方法内部排队
}
}
}
```
> (2) `AQS#acquire` 方法分析:包括 `tryAcquire` 的公平与非公平实现、 `addWaiter`和 `acquireQueued`的实现过程
```java
public final void acquire(int arg) {
if (!tryAcquire(arg) // 再次尝试拿锁,由子类实现
&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 把线程放入阻塞队列,阻塞该线程 selfInterrupt(); // 返回true表示被中断过,通知进行中断
}
}
```
> (3) `tryAcquire`的公平与非公平实现
```java
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) { // tryAcquire的公平实现
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 没有线程持有锁,开始抢锁
if (!hasQueuedPredecessors() // 如果排在队列第一个
&& compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
final boolean nonfairTryAcquire(int acquires) { // Sync#nonfairTryAcquire,tryAcquire的非公平实现
// ...
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 与公平实现的唯一区别就是,此处没有 !hasQueuedPredecessors()
setExclusiveOwnerThread(current);
return true;
}
}
// ...
}
```
> (4) `addWaiter` 方法分析
```java
private Node addWaiter(Node mode) { // 为当前线程生成一个Node,然后把Node放入双向链表的尾部,线程还未阻塞
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 先尝试快速插入到队列尾部,成功则直接返回
pred.next = node;
return node;
}
}
enq(node); // 进行队列的初始化,新建一个空的Node,不断尝试自旋,直至成功把该Node加入队列尾部
return node;
}
```
> (5) `acquireQueued` 方法分析
```java
final boolean acquireQueued(final Node node, int arg) { // AQS#acquireQueued
boolean failed = true;
try {
boolean interrupted = false; // 会记录阻塞过程中有没有其他线程向自己发送中断信号
for (;;) {
final Node p = node.predecessor(); // 前一个结点
if (p == head && tryAcquire(arg)) { // 如果自己的前一个结点是head指向的空结点,即队列头部,则尝试拿锁
setHead(node); // 拿锁成功,出队列(head前移一个结点)
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)
&& parkAndCheckInterrupt()) { // 调用park挂起自己
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
```
#### 3.2.3 ReentrantLock#unlock 方法分析
> `unlock` 不区分公平或非公平
```java
public void unlock() { // java.util.concurrent.locks.ReentrantLock#unlock
sync.release(1);
}
public final boolean release(int arg) { // AQS#release
if (tryRelease(arg)) { // 1. 释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 2. 唤醒队列后继者
return true;
}
return false;
}
protected final boolean tryRelease(int releases) { // ReentrantLock.Sync#tryRelease
int c = getState() - releases; // 减少重入次数
if (Thread.currentThread() != getExclusiveOwnerThread()) // 只有锁的拥有者才能unlock
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 重入次数减到0时释放锁
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) { // AQS#unparkSuccessor
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
```
#### 3.2.4 ReentrantLock#lockInterruptibly 与 tryLock 的分析 (略)
`ReentrantLock#lockInterruptibly` 与 `ReentrantLock#tryLock` 方法的实现只是多了层封装,不再赘述
### 3.3 ReentrantReadWriteLock (读写锁)实现原理
ReentrantReadWriteLock 的常用方法:
- readLock().lock
- readLock().unlock
- writeLock().lock
- writeLock().unlock
----
ReentrantReadWriteLock 有以下特性
- **读写互斥、写写互斥、读读不互斥**(利用该特性,可以在**读多写少**的场景,替换独占锁,优化性能)
- 可重入
- 具备公平与非公平实现
- 具备线程挂起与唤醒功能
----
分析过程包括:
- state 变量的含义
- readLock 和 writeLock 的实现
- 公平与非公平实现
- `AQS#acquireShared` 和 `AQS#releaseShared` 的实现
#### 3.3.1 ReentrantReadWriteLock#state 变量的含义
- 用 `state` 低16位用记录写锁的重入次数,高16位记录读锁的重入次数
- `state=0` 表示没有线程持有读锁或写锁
- `state!=0` 时,要么有线程持有读锁,要么持有写锁;可以进一步通过 `sharedCount(state)` 判断是否持有读锁, `execlusiveCount(state)` 判断是否持有写锁
> `java.util.concurrent.locks.ReentrantReadWriteLock.Sync` 中对 state 变量含义的定义
```java
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
```
#### 3.3.2 readLock() 和 writeLock() 的实现分析
- `readLock()` 和 `writeLock()` 分别是读锁和写锁的视图,返回是 `Lock` 接口的实现
- writeLock()基于 `AQS#acquire` 和 `AQS#release` 方法实现;readLock()基于 `AQS#acquireShared` 和 `AQS#acquireRelease` 实现
- 通过内部抽象类Sync实现上述AQS的模板方法,又抽象出 `readerShouldBlock()` 和 `writerShouldBlock()` 方法来扩展公平和非公平实现
----
`AQS#acquire` 和 `AQS#release` 方法已在 `ReentrantLock` 中分析,不再赘述;下一节只分析 `AQS#acquireShared` 和 `AQS#releaseShared` 方法;
----
先简要给出读写锁公平与非公平实现的过程:
> `ReentrantReadWriteLock.NonfairSync` 和 `ReentrantReadWriteLock.FairSync` 实现分析
```java
static final class NonfairSync extends Sync { // 非公平实现,ReentrantReadWriteLock.NonfairSync
final boolean writerShouldBlock() {
return false; // 写线程在抢锁前永远不被阻塞,非公平
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive(); // 读线程抢锁时,如果队首元素是写线程,才阻塞
}
}
static final class FairSync extends Sync { // 公平实现,ReentrantReadWriteLock.FairSync
final boolean writerShouldBlock() {
return hasQueuedPredecessors(); // 写线程抢锁前,排队
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors(); // 读线程抢锁前,排队
}
}
```
#### 3.3.3 ReadLock#lock 方法分析
```java
public void lock() { // ReentrantReadWriteLock.ReadLock#lock
sync.acquireShared(1); // AQS#acquireShared
}
public final void acquireShared(int arg) { // AQS#acquireShared
if (tryAcquireShared(arg) < 0) // 子类实现,实际在 ReentrantReadWriteLock.Sync#tryAcquireShared 中实现
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) { // ReentrantReadWriteLock.Sync.tryAcquireShared
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 // 被独占
&& getExclusiveOwnerThread() != current) {// 且不是当前线程独占
return -1; // 拿不到读锁,返回值小于 0 表示失败
} // 能走到这里,就是没有被独占
int r = sharedCount(c); // 获取读线程数量
if (!readerShouldBlock() // 公平锁:排队;非公平锁:队首元素非写线程
&& r < MAX_COUNT
&& compareAndSetState(c, c + SHARED_UNIT)) { // CAS 更新读线程数,高16位+1(1左移16位+1)
if (r == 0) { // 表示当前线程是第1个拿到读锁的线程
firstReader = current; // 只用于统计,不影响流程
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 读锁重入
firstReaderHoldCount++; // 第一个读线程的锁重入次数+1
} else { // 其他进来的读线程
HoldCounter rh = cachedHoldCounter; // 只用于统计,不影响流程
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
```
#### 3.3.4 ReadLock#unlock 方法分析
```java
public void unlock() { // ReentrantReadWriteLock.ReadLock.unlock
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) { // AQS.releaseShared
if (tryReleaseShared(arg)) { // 子类实现
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) { // ReentrantReadWriteLock.Sync.tryReleaseShared
Thread current = Thread.currentThread();
if (firstReader == current) { // 当前线程在队首
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 读线程重入次数减到0
firstReader = null; // 则该线程不再持有该锁
else
firstReaderHoldCount--; // 重入次数减1
} else { // 其他读线程,在 readHolds 中维护
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { // CAS 自旋更新 state 变量
int c = getState();
int nextc = c - SHARED_UNIT; // 高16位减1
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0; // 等于0表示被释放
}
}
```
#### 3.3.5 WriteLock#lock 方法分析
```java
public void lock() { // ReentrantReadWriteLock.WriteLock.lock
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) // 再次尝试拿锁,由子类实现
&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { // 把线程放入阻塞队列,阻塞该线程
selfInterrupt(); //
}
}
protected final boolean tryAcquire(int acquires) { // ReentrantReadWriteLock.Sync.tryAcquire
Thread current = Thread.currentThread();
int c = getState(); // 用于判断是否有读写线程
int w = exclusiveCount(c); // 写线程的重入数(写线程只能有一个)
if (c != 0) { // 被读线程或被写线程占用,此时必然互斥
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 锁被读线程持有或者不是被当前线程独占,则返回
return false; // 获取写锁失败
if (w + exclusiveCount(acquires) > MAX_COUNT) // 重入数,低16位用满,抛错误
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires); // 更新写锁重入数
return true;
}
// 下面进入抢锁环节
if (writerShouldBlock() // 公平实现:队列里有其他线程,则排队;非公平实现:不被阻塞
|| !compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current); // 独占写锁
return true;
}
```
#### 3.3.6 WriteLock#unlock 方法分析
```java
public void unlock() { // ReentrantReadWriteLock.WriteLock.unlock
sync.release(1);
}
public final boolean release(int arg) { // AQS#release
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) { // ReentrantReadWriteLock.Sync.tryRelease
if (!isHeldExclusively()) // 确保unlock方法是被持有锁的线程调用
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 写锁重入数减1
boolean free = exclusiveCount(nextc) == 0; // 是否被释放
if (free)
setExclusiveOwnerThread(null); // 释放独占线程
setState(nextc); // 更新状态
return free;
}
```
### 3.4 Condition (条件)实现原理
Condition 的常用方法:
- await
- signal
----
- 如同 `Object#wait()` 和 `Object#notify()` 方法必须和 `synchronized` 一起使用, `Condition#awiat()` 和 `Condition#signal()` 必须和 `Lock` 一起使用
- Condition 相比 `wait/notify`,避免了生产者通知生产者,消费者通知消费者的问题
- 互斥锁 `ReentrantLock.Sync#newCondition` 和 读写锁的写锁`ReentrantReadWriteLock.WriteLock#newCondition` 都使用了 `AQS#newCondition` ,其中读锁不支持 `newCondition`
> Condition 的创建
```java
public Condition newCondition() { // ReentrantLock.newCondition
return sync.newCondition();
}
final ConditionObject newCondition() { // ReentrantLock.Sync.newCondition 或 ReentrantReadWriteLock.Sync#newCondition
return new ConditionObject();
}
```
#### 3.4.1 ConditionObject.await 方法分析
```java
public final void await() throws InterruptedException { // AQS.ConditionObject.await()
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 1. 将当前线程加入等待队列(由于已拿到锁,因此方法内部线程安全)
int savedState = fullyRelease(node); // 2. 挂起前必须先释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // Node 是否在 AQS 队列中
LockSupport.park(this); // 挂起
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 3. 重新拿锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0) // 4. 如果被中断唤醒,向外抛出中断异常
reportInterruptAfterWait(interruptMode);
}
```
#### 3.4.2 ConditionObject.signal 方法分析
```java
public final void signal() { // AQS.ConditionObject.signal
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 队首
if (first != null)
doSignal(first); // 真正执行唤醒
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 先放到同步队列,再unpark唤醒
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒
return true;
}
```
### 3.5 CountDownLaunch (计数屏障)实现原理
CountDownLaunch 的常用方法:
- await:调用 await 的线程,将等待 count 被减到0
- countDown:每次调用 countDown,都会将 count 值减1
#### 3.5.1 CountDownLaunch#state 变量的含义
用 `CountDownLaunch#state` 变量表示**未 countDown 的数量**,当 `state` 为0时,调用 awiat 的线程会从挂起中唤醒
#### 3.5.2 CountDownLatch#await 方法分析
```java
public void await() throws InterruptedException { // java.util.concurrent.CountDownLatch.await()
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // AQS.acquireSharedInterruptibly
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 在 CountDownLatch.Sync#tryAcquireShared 重写
doAcquireSharedInterruptibly(arg);
}
private static final class Sync extends AbstractQueuedSynchronizer { // CountDownLatch.Sync
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) { // CountDownLatch.Sync#tryAcquireShared
return (getState() == 0) ? 1 : -1;
}
// tryReleaseShared
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // CountDownLatch#await() 实现原理
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
```
#### 3.5.3 CountDownLatch#countDown 方法分析
```java
public void countDown() { // CountDownLatch.countDown
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) { // AQS.releaseShared
if (tryReleaseShared(arg)) { // 子类实现
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) { // CountDownLatch.Sync#tryReleaseShared
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
```
### 3.6 Semaphore (信号量)实现原理
Semaphore 的常用方法:
- acquire:令资源数减 n
- release:令资源数加 n
#### 3.6.1 Semaphore#state 变量的含义
`Semaphore#state` 变量表示**资源总数**,调用 acquire 方法对 state 进行 CAS 减操作,**减到0后,线程阻塞**;调用 release 方法对 state 进行 CAS 加操作
#### 3.6.2 Semaphore#acquire 与 release 方法分析
> Semaphore 各方法源码,与锁的实现类似,不再赘述
```java
public void acquire() throws InterruptedException { // Semaphore.acquire()
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException { // Semaphore.acquire(int)
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
```
## 4 无锁编程模式
### 4.1 内存屏障(一写一读)
> linux 内核的 kfifo 队列:[root/kernel/kfifo.c](https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/kernel/kfifo.c?h=linux-2.6.38.y) ,通过 `smp_wmb()` 插入 Store 屏障,确保更新指针的操作不会重排序到修改数据之前,以及更新指针的时候,Store Cahce 被刷新,其他 CPU 可见
```c
static void kfifo_copy_in(struct __kfifo *fifo, const void *src,
unsigned int len, unsigned int off)
{
unsigned int size = fifo->mask + 1;
unsigned int esize = fifo->esize;
unsigned int l;
off &= fifo->mask;
if (esize != 1) {
off *= esize;
size *= esize;
len *= esize;
}
l = min(len, size - off);
memcpy(fifo->data + off, src, l);
memcpy(fifo->data, src + l, len - l);
/*
* make sure that the data in the fifo is up to date before
* incrementing the fifo->in index counter
*/
smp_wmb();
}
```
> `java.util.concurrent.locks.StampedLock#validate` 方法中,通过 `sun.misc.Unsafe#loadFence` 插入内存屏障,对非 volatile 的局部变量 `stamp` ,避免调用方进行乐观读时,代码被重排序
```c
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y; // validate(stamp) 中插入了内存屏障,这行读取x值和y值的代码不会被重排序到上一行前
if (!sl.validate(stamp)) { // 插入内存屏障,避免前面的代码被重排序
}
```
### 4.2 volatile(一写多读)
`volatile` 修饰的变量,在被修改后会将值刷新到主内存中,确保各个工作内存中读到的值是一致的
### 4.3 无锁队列(多写多读,可在双端增删元素)
参考 AQS 使用的无锁队列
### 4.4 无锁栈(多写多读,可在栈顶增删元素)
参考 `java.util.concurrent.Phaser` 和 `ForkJoinPool` 使用的无锁栈
### 4.5 无锁链表(多写多读,可在中间增删元素)
参考 `java.util.concurrent.ConcurrentSkipListMap` 使用的无锁跳查表
## 5 并发编程模式\*
### 5.1 信号量、Latch 与同步屏障
- synchronized + wait() + notify() 实现信号量
- synchronized + wait() + notify() 实现 Launch 模式
- 线程池 + 原子类实现 Launch 模式
- Semaphore、CountDownLatch、CyclicBarrier 与 Phaser 的适用场景
### 5.2 发布订阅模式
**相关概念:**
- 观察者模式(Observer)是为了实现**松耦合**,和发布订阅模式(Publish–Subscribe)通过注册中心,实现了**完全解耦**
- 担保-挂起(Guarded Suspension)模式,是很多设计模式(例如生产者-消费者模式)的基础
- Balking 模式与担保-挂起模式类似,但选择的是**放弃**而不是挂起
**相关示例:**
- synchronized + wait() + notify() 实现阻塞队列
- Lock + Condition 实现阻塞队列
### 5.3 线程池模式
**线程池(** `java.util.concurrent.ThreadPoolExecutor`**)原理:**
- 线程池入参的含义
- 在不断往线程池(ThreadPoolExecutor)中提交任务(Runnable)时,先由核心线程处理,核心线程数(corePoolSize)不够时,存储到任务队列(BlockingQueue),任务队列满后,将创建线程至最大线程数(maximumPoolSize)来处理任务,如果还是不够处理任务,则使用拒绝策略(RejectedExecutionHandler)
- 当任务处理完后,除了核心线程,其他线程会在经过保持时间(keepAliveTime)后被销毁
- 线程池中的 `ctl` 字段含义:高3位表示线程池的运行状态(runState),低29位表示工作线程数量(workerCount)
- 线程池状态(runState)迁移(有向无环图):
- RUNNING(-1) 经过 shutdown() 变为 SHUTDOWN(0)
- RUNNING(-1) 经过 shutdownNow() 变为 STOP(1)
- SHUTDOWN(0) 经过 shutdownNow() 变为 STOP(1)
- SHUTDOWN(0) 在队列和线程池为空时,变为TIDYING(2)
- STOP(1) 在队列和线程池为空时,变为TIDYING(2)
- TIDYING(2) 经过 terminated() 变为 TERMINATED(3)
- shutdown() 与 shutdownNow() 的区别
- shutdown() 不会清空任务队列,会等所有任务执行完成;shutdownNow() 会清空任务队列
- shutdown() 只中断空闲线程,shutdownNow() 会中断所有线程
> 线程池提交任务的 `execute` 方法分析
```java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 1. 当前线程数小于核心线程数,创建核心线程,并start任务
if (addWorker(command, true)) // start 任务成功,则返回
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 2. 当前线程数大于等于核心线程数,则放入队列
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false)) // 3. 放入队列失败,则新建空闲线程 start 任务
reject(command); // 4. 使用空闲线程 start 任务失败,使用拒绝策略
}
private boolean addWorker(Runnable firstTask, boolean core) { // 新开线程
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程池是否处于 shutdown 后续状态(如果正好是shutdown,还有任务则不退出)
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 线程数超过上界
return false;
if (compareAndIncrementWorkerCount(c)) // workCouut 加1成功则跳出循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // runState 发生了变化,重新开始for循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// workCount 成加1,开始添加线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建一个工作线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 添加线程
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 添加成功,则启动线程
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
```
> 工作线程的执行过程分析
```java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // ThreadPoolExecutor.Worker,继承自AQS
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}
final void runWorker(Worker w) { // ThreadPoolExecutor#runWorker
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 不断从队列中取任务执行
w.lock(); // 执行任务前先加锁,shutdown时会tryLock()
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 检查运行状态是否已停止
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法,默认空实现
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown); // 钩子方法,默认空实现
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally { // 确保 Worker 退出时,能执行下面的退出处理逻辑
processWorkerExit(w, completedAbruptly);
}
}
```
### 5.4 Future 模式
**背景:**
- 同步的API调用比较耗时的时候,可以先选择获取一个立即返回的凭据(Future),调用者所在线程不用陷入未知时长的阻塞中,在未来的某个时间再根据Future获取结果
- 还可以增加回调(Callback)的机制,无需通过阻塞方法(get)获取结果,而是注入一个回调方法,以此提高系统的响应时间,充分利用CPU资源
实现模式可以参考JDK源码或 [pattern/future](https://console.cloud.baidu-int.com/devops/icode/repos/baidu/personal-code/z8g/tree/master/pattern/src/main/java/com/baidu/z8g/pattern/concurrent/future)
### 5.5 CopyOnWrite 模式
参考 `CopyOnWriteArrayList`、 `StampedLock` 等工具的实现
### 5.6 ForkJoin 模式
Forkjoin 是 JDK 7 中提供分治算法的多线程并行计算框架:
- 分治算法的实现
- 工作窃取的实现
- 并行计算的实现
### 5.7 Thread-Per-Message 模式\*
### 5.8 Worker-Thread 模式\*
### 5.9 Active Objects 模式\*
### 5.10 消息总线(Event Bus)模式\*
- 空白目录
- 精简版Spring的实现
- 0 前言
- 1 注册和获取bean
- 2 抽象工厂实例化bean
- 3 注入bean属性
- 4 通过XML配置beanFactory
- 5 将bean注入到bean
- 6 加入应用程序上下文
- 7 JDK动态代理实现的方法拦截器
- 8 加入切入点和aspectj
- 9 自动创建AOP代理
- Redis原理
- 1 Redis简介与构建
- 1.1 什么是Redis
- 1.2 构建Redis
- 1.3 源码结构
- 2 Redis数据结构与对象
- 2.1 简单动态字符串
- 2.1.1 sds的结构
- 2.1.2 sds与C字符串的区别
- 2.1.3 sds主要操作的API
- 2.2 双向链表
- 2.2.1 adlist的结构
- 2.2.2 adlist和listNode的API
- 2.3 字典
- 2.3.1 字典的结构
- 2.3.2 哈希算法
- 2.3.3 解决键冲突
- 2.3.4 rehash
- 2.3.5 字典的API
- 2.4 跳跃表
- 2.4.1 跳跃表的结构
- 2.4.2 跳跃表的API
- 2.5 整数集合
- 2.5.1 整数集合的结构
- 2.5.2 整数集合的API
- 2.6 压缩列表
- 2.6.1 压缩列表的结构
- 2.6.2 压缩列表结点的结构
- 2.6.3 连锁更新
- 2.6.4 压缩列表API
- 2.7 对象
- 2.7.1 类型
- 2.7.2 编码和底层实现
- 2.7.3 字符串对象
- 2.7.4 列表对象
- 2.7.5 哈希对象
- 2.7.6 集合对象
- 2.7.7 有序集合对象
- 2.7.8 类型检查与命令多态
- 2.7.9 内存回收
- 2.7.10 对象共享
- 2.7.11 对象空转时长
- 3 单机数据库的实现
- 3.1 数据库
- 3.1.1 服务端中的数据库
- 3.1.2 切换数据库
- 3.1.3 数据库键空间
- 3.1.4 过期键的处理
- 3.1.5 数据库通知
- 3.2 RDB持久化
- 操作系统
- 2021-01-08 Linux I/O 操作
- 2021-03-01 Linux 进程控制
- 2021-03-01 Linux 进程通信
- 2021-06-11 Linux 性能优化
- 2021-06-18 性能指标
- 2022-05-05 Android 系统源码阅读笔记
- Java基础
- 2020-07-18 Java 前端编译与优化
- 2020-07-28 Java 虚拟机类加载机制
- 2020-09-11 Java 语法规则
- 2020-09-28 Java 虚拟机字节码执行引擎
- 2020-11-09 class 文件结构
- 2020-12-08 Java 内存模型
- 2021-09-06 Java 并发包
- 代码性能
- 2020-12-03 Java 字符串代码性能
- 2021-01-02 ASM 运行时增强技术
- 理解Unsafe
- Java 8
- 1 行为参数化
- 1.1 行为参数化的实现原理
- 1.2 Java 8中的行为参数化
- 1.3 行为参数化 - 排序
- 1.4 行为参数化 - 线程
- 1.5 泛型实现的行为参数化
- 1.6 小结
- 2 Lambda表达式
- 2.1 Lambda表达式的组成
- 2.2 函数式接口
- 2.2.1 Predicate
- 2.2.2 Consumer
- 2.2.3 Function
- 2.2.4 函数式接口列表
- 2.3 方法引用
- 2.3.1 方法引用的类别
- 2.3.2 构造函数引用
- 2.4 复合方法
- 2.4.1 Comparator复合
- 2.4.2 Predicate复合
- 2.4.3 Function复合
- 3 流处理
- 3.1 流简介
- 3.1.1 流的定义
- 3.1.2 流的特点
- 3.2 流操作
- 3.2.1 中间操作
- 3.2.2 终端操作
- 3.3.3 构建流
- 3.3 流API
- 3.3.1 flatMap的用法
- 3.3.2 reduce的用法
- 3.4 collect操作
- 3.4.1 collect示例
- 3.4.2 Collector接口