ConcurrentHashMap

1 JDK8版本的改进

在jdk1.7版本中,ConcurrentHashMap由数组+Segment+分段锁实现,其内部分为一个个段(Segment)数组,Segment通过继承ReentrantLock来进行加锁,通过每次锁住一个Segment来降低锁的粒度而且保证了每个Segment内的操作的线程安全性,从而实现全局线程安全。

但是这么做的缺陷就是每次通过 hash 确认位置时需要 2 次才能定位到当前 key 应该落在哪个槽:

  1. 通过 hash 值和 段数组长度-1 进行位运算确认当前 key 属于哪个段,即确认其在 segments 数组的位置。

  2. 再次通过 hash 值和 table 数组(即 ConcurrentHashMap 底层存储数据的数组)长度 - 1进行位运算确认其所在桶。

为了进一步优化性能,在 jdk1.8 版本中,对 ConcurrentHashMap 做了优化,取消了分段锁的设计,取而代之的是通过 cas 操作和 synchronized 关键字来实现优化(synchronized锁住的是Node节点),而扩容的时候也利用了一种分而治之的思想来提升扩容效率,在 JDK1.8 中 ConcurrentHashMap 的存储结构和 HashMap 基本一致,如下图所示:

2 为什么key和value不允许为null

在 HashMap 中,key 和 value 都是可以为 null 的,但是在 ConcurrentHashMap 中却不允许。

作者 Doug Lea 本身对这个问题有过回答,在并发编程中,null 值容易引来歧义, 假如先调用 get(key) 返回的结果是 null,那么无法确认是因为当时这个 key 对应的 value 本身放的就是 null,还是说这个 key 值根本不存在,这会引起歧义,如果在非并发编程中,可以进一步通过调用 containsKey 方法来进行判断,但是并发编程中无法保证两个方法之间没有其他线程来修改 key 值,所以就直接禁止了 null 值的存在。

而且作者 Doug Lea 本身也认为,假如允许在集合,如 map 和 set 等存在 null 值的话,即使在非并发集合中也有一种公开允许程序中存在错误的意思,这也是 Doug Lea 和 Josh Bloch(HashMap作者之一) 在设计问题上少数不同意见之一,而 ConcurrentHashMap 是 Doug Lea 一个人开发的,所以就直接禁止了 null 值的存在。

3 ConcurrentHashMap如何保证线程的安全性

在 ConcurrentHashMap 中,采用了大量的分而治之的思想来降低锁的粒度,提升并发性能。其源码中大量使用了cas 操作来保证安全性,而不是和 HashTable 一样,不论什么方法,直接简单粗暴的使用 synchronized关键字来实现。

3.1 如何用CAS保证数组初始化的安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K, V>[] initTable(){
Node<K, V>[] tab; int sc;
while((tab = table) == null || tab.length == 0){
if((sc = sizeCtl) < 0) //有线程在扩容,让出cpu
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //将sizeCtl改为-1,表示正在扩容
try {
if((tab = table) == null || tab.length == 0){
//首次初始化之后会对sc赋值为下一次扩容的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
table = tab = nt;
sc = n - (n >>> 2); //计算下次扩容的大小,即:当前容量的3/4
}
} finally {
sizeCtl = sc; //将sc赋值给sizeCtl
}
break;
}
}
return tab;
}

这里面有一个非常重要的变量 sizeCtl,这个变量对理解整个 ConcurrentHashMap 的原理非常重要。sizeCtl 有四个含义:

​ sizeCtl<-1 表示有 N-1 个线程正在执行扩容操作,如 -2 就表示有 2-1 个线程正在扩容。

​ sizeCtl=-1 占位符,表示当前正在初始化数组。

​ sizeCtl=0 默认状态,表示数组还没有被初始化。

​ sizeCtl>0 记录下一次需要扩容的大小。

第二个分支采用了 CAS 操作,因为 SIZECTL 默认为 0,所以这里如果可以替换成功,则当前线程可以执行初始化操作,CAS 失败,说明其他线程抢先一步把 sizeCtl 改为了 -1。扩容成功之后会把下一次扩容的阈值赋值给sc,即 sizeCtl。

3.2 put操作如何保证数组元素的可见性

ConcurrentHashMap 中存储数据采用的 Node 数组是采用了 volatile 来修饰的,但是这只能保证数组的引用在不同线程之间是可用的,并不能保证数组内部的元素在各个线程之间也是可见的,所以判定某一个桶是否有元素,并不能直接通过下标来访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = sprerad(key.hashCode()); //获取hash值
int binCount = 0; //记录链表长度
for (Node<K, V>[] tab = table;;) { //自旋
Node<K,V> f; int n, i, fh;
if(tab == null || (n = tab.length) == 0) //数组没有初始化
tab = initTable(); //初始化数组
//tabAt为null表示当前位置没有元素,采用Unsafe方法获取元素而不是直接tab[i]
//volatile修饰数组只针对数组的引用保证可见,对内部元素则不一定
else if ((f = tabAt(tab, i = (n-1) & hash)) == null){
//cas操作去put,如果cas失败说明其他线程抢先去操作,继续自旋(for循环)
if(casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else { //当前下标已经有元素
V oldVal = null;
//这里就相当于每次只锁住一个下标,锁的粒度更细了。所以最多可以支持的并发数比1.7版本的分段设计更多了
synchronized (f) { //锁住f节点,即当前链表或者红黑树的头节点
if (tabAt(tab, i) == f) {
if (fh >= 0) { //这里要判断是否大于等于0是因为如果当前线程已经被转移了,hash值会被改为负数
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if(e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))){
oldVal = e.val;
if(!onlyIfAbsent)
e.val = value;
break;
}
...
}
...
}
...
}
...
}
...
}
}
addCount(1L, binCount);
return null;
}
1
2
3
4
//tableAt 方法实际上就是一个 CAS 操作
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i){
return (Node<K,V>) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE);
}

3.3 精妙的计算方式

在 HashMap 中,调用 put 方法之后会通过 ++size 的方式来存储当前集合中元素的个数,但是在并发模式下,这种操作是不安全的,所以不能通过这种方式。直接通过 CAS 操作来修改 size 是可行的,但是假如同时有非常多的线程要修改 size 操作,那么只会有一个线程能够替换成功,其他线程只能不断的尝试 CAS,这会影响到 ConcurrentHashMap 集合的性能,所以作者就想到了一个分而治之的思想来完成计数。作者定义了一个数组来计数,而且这个用来计数的数组也能扩容,每次线程需要计数的时候,都通过随机的方式获取一个数组下标的位置进行操作,这样就可以尽可能的降低了锁的粒度,最后获取 size 时,则通过遍历数组来实现计数:

1
2
3
4
5
6
7
//用来计数的数组,大小为2的N次幂,默认为2
private transient volatile ConuterCell[] counterCells;

@sun.misc.Contended static fianl class CounterCell { //数组中的对象
volatile long value; //存储元素个数
CounterCell(long x) { value = x; }
}

addCount计数方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private final void addCount(long x, int check){
CounterCell[] as; long b, s;
/** CounterCell数组是否为空,如果不为空,则尝试通过cas操作将baseCount加上x,如果发现BASECOUNT和baseCount不相等,
* 则说明当前存在锁竞争,所以需要改用CountCell来计数,(baseCount只有在没有线程竞争时用来计数)
*/
if((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true; //true表示没有线程竞争
//CounterCell计数数组为空,则直接调用fullAddCount
if (as == null || (m = as.length - 1) < 0 ||
//Random在线程并发的时候会有性能问题以及可能会产生相同的随机数,性能也没有ThreadLocalRandom.getProbe好
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
//通过CAS操作会修改当前数组下标中对应的计数,如果修改失败则表示有线程竞争
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//内部包含CounterCell的扩容,初始化等操作
fullAddCount(x, uncontended);
return;
}
...
}
}

首先会判断 CounterCell 数组是不是为空,这里的 CAS 操作是将 BASECOUNT 和 baseCount 进行比较,如果相等,则说明当前没有其他线程过来修改 baseCount(即 CAS 操作成功),此时则不需要使用 CounterCell 数组,而直接采用 baseCount 来计数。

假如 CounterCell 为空且 CAS 失败,那么就会通过调用 fullAddCount 方法来对 CounterCell 数组进行初始化。

fullAddCount方法

这个方法也很长,里面包含了对 CounterCell 数组的初始化和赋值等操作。

  1. 初始化CountCell数组:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
else if (cellsBusy == 0 && counterCells == as && 
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)){
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2]; //默认构造一个长度为2的数组
rs[h & 1] = new CounterCell(x); //计算下标(h是一个随机数,x表示添加元素的数量)
counterCells = rs;
init = true; //初始化完成标志
}
} finally {
cellsBusy = 0; //恢复cellsBusy标识
}
}

这里面有一个比较重要的变量 cellsBusy,默认是 0,表示当前没有线程在初始化或者扩容,所以这里判断如果 cellsBusy==0,而 as 其实在前面就是把全局变量 CounterCell 数组的赋值,这里之所以再判断一次就是再确认有没有其他线程修改过全局数组 CounterCell,所以条件满足的话就会通过 CAS 操作修改 cellsBusy 为 1,表示当前自己在初始化了,其他线程就不能同时进来初始化操作了。

最后可以看到,默认是一个长度为 2 的数组,也就是采用了 2 个数组位置进行存储当前 ConcurrentHashMap 的元素数量。

  1. CountCell如何赋值

初始化完成之后,如果再次调用 put 方法,那么就会进入 fullAddCount 方法的另一个分支:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//CounterCell不为空,则表示已经初始化
if ((as = counterCells) != null && (n = as.length) > 0) {
//获得当前使用CounterCell,数组的下标
if ((a = as[(n - 1) & h]) == null) { //当前位置还没有设置数量
//cellsBusy=0表示CounterCell没有初始化
if (cellsBusy == 0) {
CounterCell r = new CounterCell(x);
//cellBusy由0改为1表示有线程正在操作CounterCell数组
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
//计算下标,并将CounterCell对象放到CounterCell[]数组对应下标
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true; //是否创建成功标记设置为true
}
} finally {
cellsBusy = 0;
}
}
}
}
}
}

这里面首先判断了 CounterCell 数组不为空,然后会再次判断数组中的元素是不是为空,因为如果元素为空,就需要初始化一个 CounterCell 对象放到数组,而如果元素不为空,则只需要 CAS 操作替换元素中的数量即可。所以这里面的逻辑也很清晰,初始化 CounterCell 对象的时候也需要将 cellBusy 由 0 改成 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//如果当前位置的元素不为空,则通过cas操作加上数量
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//两个不相等代表有其他线程创建了新的CounterCell数组
//或者当前CounterCell数组大小已经大于等于CPU数量(保证并发数不会操作CPU数量)
else if (counterCells != as || n >= NCPU)
//设置当前线程的循环失败不进行扩容
collide = false;
else if (!collide)
collide = true; //恢复collide状态,下次循环可继续扩容
//如果进入这个分支,则说明当前竞争压力过大,需要对CounterCell数组进行扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
//再次判断是否相等,相等表示当前没有其他线程扩容CounterCell数组
if (counterCells == as) {
CounterCell[] rs = new CounterCell[n << 1]; //扩容大小为扩大2倍
for(int i = 0; i < n; ++i) //迁移数据
rs[i] = as[i];
coountCells = rs;
}
} finally {
cellsBusy = 0;
}
}

主要看上述代码最后一个分支,一旦会进入这个分支,就说明前面所有分支都不满足,即:

​ 当前 CounterCell 数组已经初始化完成。

​ 当前通过 hash 计算出来的 CounterCell 数组下标中的元素不为 null。

​ 直接通过 CAS 操作修改 CounterCell 数组中指定下标位置中对象的数量失败,说明有其他线程在竞争修改同一个数组下标中的元素。

​ 当前操作不满足不允许扩容的条件。

​ 当前没有其他线程创建了新的 CounterCell 数组,且当前 CounterCell 数组的大小仍然小于 CPU 数量。

所以接下来就需要对 CounterCell 数组也进行扩容,这个扩容的方式和 ConcurrentHashMap 的扩容一样,也是将原有容量乘以 2,所以其实 CounterCell 数组的容量也是满足 2 的 N 次幂。

3.4 ConcurrentHashMap扩容

接下来回到 addCount 方法,因为这个方法在添加元素数量的同时,也会判断当前 ConcurrentHashMap 的大小是否达到了扩容的阈值,如果达到,需要扩容。ConcurrentHashMap 扩容也支持多线程同时进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n); //获取扩容戳
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}

这里 check 是传进来的链表长度,>=0 才开始检查是否需要扩容,紧挨之后是一个 while 循环,主要是满足两个条件:

  1. sizeCtl在初始化的时候会被赋值为下一次扩容的大小(扩容之后也会),所以 >=sizeCtl 表示的就是是否达到扩容阈值。

  2. table 不为 null 且当前数组长度小于最大值 2 的 30 次方。

扩容戳有什么用:当满足扩容条件之后,首先会先调用一个方法来获取扩容戳,这个扩容戳比较有意思,要理解扩容戳,必须从二进制的角度来分析。resizeStamp 方法就一句话,其中 RESIZE_STAMP_BITS 是一个默认值 16。

1
2
3
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

(1) Integer.numberOfLeadingZeros(n) 这个方法,实际上这个方法就是做一件事,那就是获取当前数据转成二进制后的最高非 0 位前的 0 的个数。举个例子:

1
就以16为准,16转成二进制是10000,最高非0位是在第5位,因为int类型是32位,所以他前面还有27位,而且都是0,那么这个方法得到的结果就是271的前面还有270)。

(2)1 << (RESIZE_STAMP_BITS - 1) 在当前版本就是 1<<15,也就是得到一个二进制数 1000000000000000,这里也是要做一件事,把这个 1 移动到第 16 位。最后这两个数通过 | 操作一定得到的结果就是第 16 位是 1,因为 int 是 32 位,最多也就是 32 个 0,而且因为 n 的默认大小是 16(ConcurrentHashMap 默认大小),所以实际上最多也就是 27(11011)个 0,执行 | 运算最多也就是影响低 5 位的结果。

27 转成二进制为 0000000000000000000000000011011,然后和 00000000000000001000000000000000 执行 | 运算,最终得到的而结果就是 00000000000000010000000000011011,注意:这里之所以要保证第 16 位为 1,是为了保证 sizeCtl 变量为负数,因为前面提到,这个变量为负数才代表当前有线程在扩容。

首次扩容为什么计数要 +2 而不是 +1

首次扩容一定不会走前面两个条件,而是走的最后一个条件,这个条件通过 CAS 操作将 rs 左移了 16(RESIZE_STAMP_SHIFT)位,然后加上一个 2,为什么是加 2 呢?

要回答这个问题先回答另一个问题,上面通过方法获得的扩容戳 rs 究竟有什么用?实际上这个扩容戳代表了两个含义:

  1. 高 16 为代表当前扩容的标记,可以理解为一个纪元。

  2. 低 16 代表了扩容的线程数。

因为 rs 最终是要赋值给 sizeCtl 的,而 sizeCtl 负数才代表扩容,而将 rs 左移 16 位就刚好使得最高位为 1,此时低 16 位全部是 0,而因为低 16 位要记录扩容线程数,所以应该 +1,但是这里是 +2,原因是 sizeCtl 中 -1 这个数值已经被使用了,用来代替当前有线程准备扩容,所以如果直接 +1 是会和标志位发生冲突。所以继续回到上图中的第二个if语句,就是正常继续 +1 了,只有初始化第一次记录扩容线程数的时候才需要 +2。

扩容条件

看上代码中第一个if语句,这里面有 5 个条件,代表是满足这 5 个条件中的任意一个,则不进行扩容:

  1. (sc >>> RESIZE_STAMP_SHIFT) != rs 这个条件实际上有 bug,在 JDK12 中已经换掉。

  2. sc == rs + 1 表示最后一个扩容线程正在执行首位工作,也代表扩容即将结束。

  3. sc == rs + MAX_RESIZERS 表示当前已经达到最大扩容线程数,所以不能继续让线程加入扩容。

  4. 扩容完成之后会把 nextTable(扩容的新数组) 设为 null。

  5. transferIndex <= 0 表示当前可供扩容的下标已经全部分配完毕,也代表了当前线程扩容结束。

多并发下如何实现扩容

在 ConcurrentHashMap 中采用的是分段扩容法,即每个线程负责一段,默认最小是 16,也就是说如果 ConcurrentHashMap 中只有 16 个槽位,那么就只会有一个线程参与扩容。如果大于 16 则根据当前 CPU 数来进行分配,最大参与扩容线程数不会超过 CPU 数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab){
int n = tab.length, stride;
//如果n/8在除以NCPU得到的数小于16,则只会有一个线程进行扩容
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; //subdivide range默认每个线程最低位数为16
//nextTab是扩容后的新数组,为空表示新数组还没有被初始化
if (nextTab == null){
try {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //构建一个新数组
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE; //扩容失败后将sizeCtl设为最大值,也就是不再触发扩容
return;
}
nextTable = nextTab;
transferIndex = n; //transferIndex表示转移数据的下标,默认记录为旧数组大小
}
int nextn = nextTab.length;
}

扩容空间和 HashMap 一样,每次扩容都是将原空间大小左移一位,即扩大为之前的两倍。注意这里的 transferIndex 代表的就是推进下标,默认为旧数组的大小。

扩容时的数据迁移如何保证安全性

初始化好了新的数组,接下来就是要准备确认边界。也就是要确认当前线程负责的槽位,确认好之后会从大到小开始往前推进,比如线程一负责 1-16,那么对应的数组边界就是 0-15,然后会从最后一位 15 开始迁移数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int nextn = nextTabl.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
for(int i = 0,bound = 0;;){
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
}

这里面有三个变量比较关键:

  1. fwd 节点,这个代表的是占位节点,最关键的就是这个节点的 hash 值为 -1,所以一旦发现某一个节点中的 hash 值为 -1 就可以知道当前节点已经被迁移了。

  2. advance:代表是否可以继续推进下一个槽位,只有当前槽位数据被迁移完成之后才可以设置为 true

  3. finishing:是否已经完成数据迁移。

知道了这几个变量,再看看上面的代码,第一次一定会进入 while 循环,因为默认 advance 为 true,第一次进入循环的目的为了确认边界,因为边界值还没有确认,所以会直接走到最后一个分支,通过 CAS 操作确认边界。确认边界这里直接表述很难理解,通过一个例子来说明:

1
假设说最开始的空间为16,那么扩容后的空间就是32,此时transferIndex为旧数组大小16,而在第二个if判断中,transferIndex赋值给了nextIndex,所以nextIndex16,而stride代表的是每个线程负责的槽位数,最小就是16,所以stride也是16,所以nextBound= nextIndex > stride ? nextIndex - stride : 0 皆可以得到:nextBound=0和i=15了,也就是当前线程负责0-15的数组下标,且从0开始推进,确认边界后立刻将advance设置为false,也就是会跳出while循环,从而执行下面的数据迁移部分逻辑。

因为 nextBound=0,所以 CAS 操作实际上也是把 transferIndex 变成了 0,表示当前扩容的数组下标已经全部分配完毕,这也是前面不满足扩容的第 5 个条件。

数据迁移时,会使用 synchronized 关键字对当前节点进行加锁,也就是说锁的粒度精确到了每一个节点,可以说大大提升了效率。加锁之后的数据迁移和 HashMap 基本一致,也是通过区分高低位两种情况来完成迁移。

当前节点完成数据迁移之后,advance 变量会被设置为 true,也就是说可以继续往前推进节点了,所以会重新进入上面的 while 循环的前面两个分支,把下标 i 往前推进之后再次把 advance 设置为 false,然后重复操作,直到下标推进到 0 完成数据迁移。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { //finishing=true表示完成迁移
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); //设置下一次扩容的大小为新数组空间大小的3/4
return;
}
//首次进入这里,通过cas将sizeCtl-1表示扩容的线程数-1,因为自己已经完成扩容
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit再次循环检查一遍
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd); //数组迁移完成后把当前节点设置成fwd
else if ((fh = f.hash) == MOVED) //检测当前节点是否已被迁移,如果是的话将advance设置为true,继续推进下标
advance = true; // already processed
else {
synchronized (f) { //这部分是负责具体迁移的代码和HashMap差不多

}
}

while 循环彻底结束之后,会进入到下面这个 if 判断,当前线程自己完成了迁移之后,会将扩容线程数进行递减,递减之后会再次通过一个条件判断,这个条件其实就是前面进入扩容前条件的反推,如果成立说明扩容已经完成,扩容完成之后会将 nextTable 设置为 null,所以上面不满足扩容的第 4 个条件就是在这里设置的。

3.5 总结

在整个 ConcurrentHashMap 中,整个思想就是降低锁的粒度,减少锁的竞争,所以采用了大量的分而治之的思想,比如多线程同时进行扩容,以及通过一个数组来实现 size 的计数等。


ConcurrentHashMap
http://www.zivjie.cn/2023/03/12/java基础/多线程/ConcurrentHashMap/
作者
Francis
发布于
2023年3月12日
许可协议