企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] > ### `ConcurrentHashMap` * 1.8版本的`ConcurrentHashMap`相比之前的版本主要做了两处改进: * 使用`CAS`代替分段锁。 * 红黑树,这一点和`HashMap`是一致的。 * `ConcurrentHashMap`不可以插入`value`为`null`的键值,`HashMap`可以。 <br/> > ### `put` * table便是其数据的存放载体: `transient volatile Node<K,V>[] table;` ``` public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; //volatile读 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //初始化 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // bin为空,tabAt是一次volatile读,casTabAt为CAS操作设置节点 } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { //节点添加 V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; } ``` <br/> > ### `initTable()`初始化 * `sizeCtl`,`ConcurrentHashMap`初始化的扩容操作中一个至关重要的控制变量 * `private transient volatile int sizeCtl;` * 0: 初始值。 * -1: 正在进行初始化。 * 负值(小于-1): 表示正在进行扩容,因为ConcurrentHashMap支持多线程并行扩容。 * 正数: 表示下一次触发扩容的临界值大小,即当前值 \* 0.75(负载因子)。 * `ConcurrentHashMap`只允许一个线程进行初始化操作,当其它线程竞争失败(`sizeCtl < 0`)时便会进行自旋,直到竞争成功(初始化)线程完成初始化,那么此时`table`便不再为`null`,也就退出了while循环。 * 对`SIZECTL`字段`CAS`更新的成功便标志者线程赢得了竞争,可以进行初始化工作了,剩下的就是一个数组的构造过程。 ``` private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //volatile读 while ((tab = table) == null || tab.length == 0) { //volatile读 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //sizeCtl设为当前大小的3 / 4 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; } ``` <br/> > ### `tryPresize(int size) `扩容 * `ConcurrentHashMap`支持多线程并行扩容,具体来说,是支持多线程将节点从老的数组拷贝到新的数组,而新数组创建仍是一个线程完成。 * 扩容竞争成功的线程为`transfer`方法的`nextTab`参数传入null,这将导致新数组的创建。竞争失败的线程将会判断当前节点转移工作是否已经完成,如果已经完成,那么意味着扩容的完成,退出即可,如果没有完成,那么此线程将会进行辅助转移。 ``` private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; //volatile读,没有正在进行初始化或扩容的操作 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //这里实际上进行了初始化工作 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } //已达到最大值,无法再进行扩容 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); if (sc < 0) { //竞争失败 Node<K,V>[] nt; //判断是否已经完成 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //竞争成功 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } } ``` <br/> > ### ` transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)` 转移 * 每个线程针对一个分片来进行转移操作,所谓的一个分片其实就是bin数组的一段。默认的最小分片大小为16,如果所在机器 只有一个CPU核心,那么就取16,否则取(数组大小 / 8 / CPU核心数)与16的较大者。 ``` private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //1. 分片 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //nextTab初始化,CAS保证了只会有一个线程执行这里的代码 if (nextTab == null) { try { Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; //分片的最大下标i实际上就是在这里完成减一的,因为从下面可以看出,每处理完一个桶位便将advance设为true //从而便又进入了内层循环,但是注意,当最后一次(即bound)处理完成时,i会被再次减一,从而导致进入下面的 //分支再次读取transferIndex,这就说明了转移线程会在转移完一个分片后继续尝试剩余的分片(桶位) if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { //所有bin均转移完毕 i = -1; advance = false; } //申请分片 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //bound表示此分片的截止(最小)下标 bound = nextBound; //i表示此分片的最大下标 i = nextIndex - 1; //advance意为前进,跳出内层循环 advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { //进入到这里就意味着所有的桶位都已被处理完毕或是被包含在某个转移线程的申请分片中(即待转移) int sc; if (finishing) { //进行收尾工作,此工作一定是由最后一个分片申请线程进行的,这里用volatile写将nextTable置为null //,table指向新数组 nextTable = null; table = nextTab; //sizeCtl设为新数组大小的3 / 4 sizeCtl = (n << 1) - (n >>> 1); return; } //转移线程开始转移之前会将sizeCtl自增,转移完成之后自减,所以判断转移是否已经完成的方式便是sizeCtl是 //否等于初始值 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) //还有其它线程尚未转移完成,直接退出,将收尾工作交给最后完成的那个线程 return; //进行到这里就说明当前线程为最后一个完成的线程,有意思的是这里又将advance置为true且i置为n(原) //数组的大小,作用就是最后再全部扫描一遍所有的桶位,看是否还有漏网之鱼 finishing = advance = true; i = n; } } else if ((f = tabAt(tab, i)) == null) //2. advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { //3. 转移算法 //双重检查 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { //runBit代表了当前桶位是否需要移动 int runBit = fh & n; Node<K,V> lastRun = f; //这里是找出最后一个和头结点的移动属性相同的 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //构造无需移动和需要移动的链表 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //设置到新数组 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //将原数组的当前桶位设为MOVED,即已处理完(转移) setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } } ``` <br/> > ### `get(Object key)` ``` public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) //命中头结点 return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { //遍历当前桶位的节点链表 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } ``` <br/> <br/> *** 参考: [ConcurrentHashMap](https://github.com/seaswalker/JDK/blob/master/note/ConcurrentHashMap/concurrenthashmap.md)