企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## ConcurrentHashMap [TOC] ### 1\. 概述     众所周知,哈希表是中非常高效,复杂度为 O(1) 的数据结构,在 Java 开发中,我们最常见到最频繁使用的就是 HashMap 和 HashTable,但是在线程竞争激烈的并发场景中使用都不够合理。   **HashMap**:先说 HashMap,HashMap 是**线程不安全**的,在并发环境下,可能会形成**环状链表**(扩容时可能造成),导致 get 操作时,cpu 空转,所以,在并发环境中使 用HashMap 是非常危险的。   **HashTable**: HashTable 和 HashMap的实现原理几乎一样,差别无非是:(1)HashTable不允许key和value为null;(2)HashTable是线程安全的。   但是 HashTable 线程安全的策略实现代价却太大了,简单粗暴,get/put 所有相关操作都是 synchronized 的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。 :-: ![](https://box.kancloud.cn/99faac49b46b4788292dd84f40db5dc3_1692x1264.png)   HashTable 性能差主要是由于所有操作需要竞争同一把锁,而如果容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是ConcurrentHashMap 所采用的 "**分段锁**" 思想。 ![](https://box.kancloud.cn/4f2df7892cd59278878cf803ad0d3d03_1798x944.png) ### 2\. 存储结构 ConcurrentHashMap 采用了非常精妙的"分段锁"策略,ConcurrentHashMap 的主干是个 Segment 数组。 ~~~java final Segment<K,V>[] segments; ~~~   Segment 继承了 ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在 ConcurrentHashMap,一个 Segment 就是一个子哈希表,Segment 里维护了一个 HashEntry 数组,并发环境下,对于不同 Segment 的数据进行操作是不用考虑锁竞争的。(就按默认的 ConcurrentLeve 为16来讲,理论上就允许 16 个线程并发执行,有木有很酷)   **所以,对于同一个 Segment 的操作才需考虑线程同步,不同的 Segment 则无需考虑。** Segment 类似于 HashMap,一个 Segment 维护着一个 HashEntry 数组 ~~~java transient volatile HashEntry<K,V>[] table; ~~~ HashEntry 是目前我们提到的最小的逻辑处理单元了。一个 ConcurrentHashMap 维护一个 Segment 数组,一个 Segment 维护一个 HashEntry 数组。 ~~~java static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; } ~~~   ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发度更高(并发度就是 Segment 的个数)。 Segment 继承自**ReentrantLock**。 ~~~java static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; } ~~~ ~~~java final Segment<K,V>[] segments; ~~~ 默认的并发级别为 16,也就是说默认创建 16 个 Segment。 ~~~java static final int DEFAULT_CONCURRENCY_LEVEL = 16; ~~~ ### 2\. size 操作 每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。 ~~~java /** * The number of elements. Accessed only either within locks * or among other volatile reads that maintain visibility. */ transient int count; ~~~ 在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。 ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。 尝试次数使用 RETRIES\_BEFORE\_LOCK 定义,该值为 2,retries 初始值为 -1,因此尝试次数为 3。 如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。 ~~~java /** * Number of unsynchronized retries in size and containsValue * methods before resorting to locking. This is used to avoid * unbounded retries if tables undergo continuous modification * which would make it impossible to obtain an accurate result. */ static final int RETRIES_BEFORE_LOCK = 2; public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { // 超过尝试次数,则对每个 Segment 加锁 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } // 连续两次得到的结果一致,则认为这个结果是正确的 if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; } ~~~ ### 3\. 同步方式 Segment 继承自 ReentrantLock,所以我们可以很方便的对每一个 Segment 上锁。 对于读操作,获取 Key 所在的 Segment 时,需要保证可见性。具体实现上可以使用 volatile 关键字,也可使用锁。但使用锁开销太大,而使用 volatile 时每次写操作都会让所有 CPU 内缓存无效,也有一定开销。ConcurrentHashMap 使用如下方法保证可见性,取得最新的 Segment。 ~~~java Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u) ~~~ 获取 Segment 中的 HashEntry 时也使用了类似方法 ~~~java HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE) ~~~ 对于写操作,并不要求同时获取所有 Segment 的锁,因为那样相当于锁住了整个 Map。它会先获取该 Key-Value 对所在的 Segment 的锁,获取成功后就可以像操作一个普通的 HashMap 一样操作该 Segment,并保证该Segment 的安全性。 同时由于其它 Segment 的锁并未被获取,因此理论上可支持 concurrencyLevel(等于 Segment 的个数)个线程安全的并发读写。 获取锁时,并不直接使用 lock 来获取,因为该方法获取锁失败时会挂起。事实上,它使用了自旋锁,如果 tryLock 获取锁失败,说明锁被其它线程占用,此时通过循环再次以 tryLock 的方式申请锁。如果在循环过程中该 Key 所对应的链表头被修改,则重置 retry 次数。如果 retry 次数超过一定值,则使用 lock 方法申请锁。 这里使用自旋锁是因为自旋锁的效率比较高,但是它消耗 CPU 资源比较多,因此在自旋次数超过阈值时切换为互斥锁。 ### 4\. JDK 1.8 的改动 * JDK 1.7 使用分段锁机制来实现并发更新操作,核心类为 Segment,它继承自重入锁 ReentrantLock,并发程度与 Segment 数量相等。 * JDK 1.8 使用了 CAS 操作来支持更高的并发度,在 CAS 操作失败时使用内置锁 synchronized。 * 并且 JDK 1.8 的实现也在链表过长时会转换为红黑树。 参考资料: * [ConcurrentHashMap演进从Java7到Java8](http://www.jasongj.com/java/concurrenthashmap/) * [ConcurrentHashMap实现原理及源码分析 - dreamcatcher-cx - 博客园](https://www.cnblogs.com/chengxiao/p/6842045.html)