IOTXING

记录技术学习之路

0%

ConcurrentHashMap源码解读


ConcurrentHashmap

ConcurrentHashmap整体跟hashmap差不多,主要的区别是线程安全,并且不支持key和value为null
ConcurrentHashmap在插入元素的时候会使用synchronized,保证了写的线程安全。在扩容的时候,将表划分成一个个独立的区域,然后每个执行transfer的线程各自负责一个区域,在保证了线程安全的情况下能提高扩容的效率。

1、族谱

1
2
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {

2、基础知识

concurrentHashmap中的基础变量,大部分跟hashmap中都是相同的,例如DEFAULT_CAPACITY,MAXIMUM_CAPACITY等,下面主要针对于不同的地方进行介绍

常量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//并发级别,新版本已经都不用了,主要是兼容旧版
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//在并发transfer的时候,需要操作的最小的索引范围,也就是所谓的segment长度
private static final int MIN_TRANSFER_STRIDE = 16;

//用来生成生成戳的一个变量,生成戳会被包含在sizeCtl中
private static int RESIZE_STAMP_BITS = 16;

//能够参与扩容的最大的线程数量,必须小于32-RESIZE_STAMP_BITS个比特位
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

//在sizeCtl中帮忙记录大小的偏移位
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

//后面计算hash时候会用到的一些辅助常量
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

//cpu数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
变量
1
2
3
4
5
6
7
8
//记录表中的元素数量 
private transient volatile long baseCount;
//一个很重要的概念,主要用来控制初始化和扩容的(个人觉得跟ThreadPoolExecutor中的ctl很像,都是用一个int来存储状态跟数量)。如果为负值,说明当前表正在扩容或者初始化。为-1的时候表示正在初始化,-(1+扩容线程数量n
//)表示有n个线程正在帮忙扩容。如果表为空,使用初始化时候的值,默认为0,初始化之后,存储的是之后扩容得阈值
private transient volatile int sizeCtl;

//下一个需要执行transfer任务的起点
private transient volatile int transferIndex;
辅助对象
1
2
3
4
5
6
7
8
static final class ForwardingNode<K,V> extends Node<K,V> {                                  //在执行转移操作时,会被插入到索引头部的一个节点
final Node<K,V>[] nextTable; //存储新的表
ForwardingNode(Node<K,V>[] tab) { //存储当前节点的情况,表示已经转移了
super(MOVED, null, null, null);
this.nextTable = tab;
}
***
}

3、初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0) //根据提供的容量,获取到离指定容量最近的2的n次方
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

//传入一个map,在初始化的时候会将传入map的所有值复制到新的map中去
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}


public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

//自定义并发线程数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins //如果初始容量比并发线程还少,修改最小容量为并发线程数量
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap; //修改容量
}

4、putAll

在上面能够看到,初始化的时候如果传入了一个map,那么就会新建一个map,然后根据传入的map去复制其中的值

1
2
3
4
5
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}

其中的putval也是我们在后面执行put的时候会用的一个函数,之后会说到,这里我们先说一下tryPresize()

5、tryPresize

该函数执行的时候,有两种情况,一种是putall时候,还有一种是单个bucket中的元素达到了转换树的阈值,但是表中的总元素数量还没有达到阈值,会调用该方法进行扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private final void tryPresize(int size) {                                       

//给定初始容量,里面tableSizeFor另外一篇文章有说到,主要是获取到最近的一个2的n次方,这里主要是设定初始容量
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
//如果小于0,说明正在扩容,大于等于0的时候,表示可以进行扩容
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//如果当前table为0,也就是初始化
if (tab == null || (n = tab.length) == 0) {
//选择sizeCtl和size中比较大的一个
n = (sc > c) ? sc : c;
//比较当前的sizeCtl是否一致,如果不一致,说明有线程进行了修改。在初始化的时候,该值会被置为-1,同时只会有一个线程进行该操作
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//如果当前线程拿到了初始化的锁,并且table还没有被初始化,开始初始化数组
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//初始化完成后,n >>> 2等于0.25n,n-(n>>>2) = 0.75n,也就是loadFactor,将sc修改为下一次扩容的阈值
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
//如果需要扩容的容量比sc还要小,说明不需要扩容。容量超过了最大容量,就跳出循环
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//扩容操作,本来要转换成红黑数的,key数量小于设定数量,就先执行扩容操作
else if (tab == table) {

int rs = resizeStamp(n);
//如果现在已经开始扩容了
if (sc < 0) {
Node<K,V>[] nt;
//这里的rs后面会说,rs+1,表示当前正在扩容,sc >>> RESIZE_STAMP_SHIFT) != rs表示当前的n发生了变化,sc == rs + MAX_RESIZERS表示当前参与扩容的线程已经达到了最大值,(nt = nextTable) == null表示扩容已经结束了,transferIndex <= 0表示没有transfer任务需要领取了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//尝试获取一次transfer任务,如果获取成功,就对sc+1,
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) //尝试去增加一个参数transfer的线程,先对sizeCtl加一,然后执行transfer
transfer(tab, nt);
}
//执行到这一步的条件是sc >=0,说明还没有开始transfer,尝试成为第一个transfer线程,
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

6、resizeStamp

1
2
3
4
5
6
7
/**
* 返回长度为n时候的扩容所需要用到的生成戳
* 在左移RESIZE_STAMP_SHIFT位之后,一定要是个负值
*/
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); //numberOfLeadingZeros获得n前面的0的位数,将1左移15位,然后执行或运算
}

7、transfer

transfer是在扩容完后,把旧的元素移动或者复制到新的表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {                             
int n = tab.length, stride;
// 这里计算出需要的每个transfer所需要移动的bucket的数量
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// nexttTab为空,说明是第一个开始执行transfer的线程
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
//新建一个长度为之前2倍的数组,做为新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
//如果发生oom,就把sizeCtl改为int的最大值。
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//在第一次transfer的时候,需要transfer的起点位置
transferIndex = n;
}
//新表的长度
int nextn = nextTab.length;
//先插入fwn,表示转移的状态
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 表示是否需要处理下一个元素
boolean advance = true;
// 表示当前是否已经处理完成
boolean finishing = false;
//bound表示的当前transfer的segment的终点,每次transfer都是倒序执行的
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//判断是否有新的segment需要处理
while (advance) {
int nextIndex, nextBound;
//表示当前任务正在执行还未结束,--i >= bound,说明还没有处理到边界,如果已经处理到了,--i < bound,因为有任务还在处理,所以就不领取新的transfer任务
if (--i >= bound || finishing)
advance = false;
//没有任务可以领取了,可以退出transfer了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//计算下一个需要处理的segment的起点,如果下一个需要处理的起点币stride小,说明剩下的元素已经小于stride了,需要把终点设置为0
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//设置要处理的segment的区间,bound表示终点,nextIndex为起点
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
} //如果i坐标小于0,或者i超过了表的长度,或者i+旧表长度超过新表长度,说明已经处理结束了
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { //如果已经结束了,设置nextTable为空,避免被再次操作
nextTable = null;
table = nextTab;
// n << 1 = 2n, n>>>1 = 0.5n sizeCtl = 1.5n,而现在已经扩容两倍了,相当于sizeCtl等于新表空间的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//cas操作比较sizeCtl是否相等,如果相等,进行sizeCtl-1,表示将当前线程退出扩容,因为sizeCtl控制的是控制扩容的线程的数量
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断是否最后一个执行扩容的线程,如果是,需要检查一下各个标志位,
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null) //如果当前索引位置上没有元素,插入一个forwardingNode,表示已经转移过了,此时hash为MOVED
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) //如果取到当前节点,并且上面的hash值为MOVED,说明已经处理过了
advance = true; // already processed
else {
synchronized (f) { //到这一步,就要开始实际处理了
if (tabAt(tab, i) == f) { //二次校验,判断当前元素是否发生了变化
Node<K,V> ln, hn;
if (fh >= 0) { //如果没有forwardingNode,正常情况下hash都是大于0的
int runBit = fh & n; //这个东西类似于hashMap里面的e.hash & oldCap,也就是计算需不需要移动到新的索引位置上去
Node<K,V> lastRun = f; //下面会复用其中的某一个链表,然后找到该链表的最后一个值
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) { //如果为0,说明不需要移位
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0) //循环节点,判断节点是否移位,把节点不停的加入到对应节点的头部,也就是倒序添加节点
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); //把bucket处理完之后,把需要移位以及不需要移位的元素,都加到新表去
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); //给旧表的对应位置加一个forwardingNode
advance = true; //当前处理完了,需要处理下一个
}
else if (f instanceof TreeBin) { //如果当前bucket是红黑树
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) { //计算
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : //如果树的深度小于设定的阈值,则将树转换为链表,如果
(hc != 0) ? new TreeBin<K,V>(lo) : t; //如果所有元素都不需要移位,直接使用上面初始化的对象,否则新建树去装载
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

8、helpTransfer

该函数的用处较多,用途是为了帮助进行扩容过程中的transfer,在putVal的时候,也会执行该函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) && //如果表不为空,并且f是一个forwardingNode(说明正在移位),并且新的表并不为空,则开始帮助transfer
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//根据table长度获得一个生成戳
int rs = resizeStamp(tab.length);
//如果在执行的时候,nextTable和table还没有被别的更改并且sc<0,也就是处于扩容状态
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// sc == rs + 1表示当前正在扩容 sc >>> RESIZE_STAMP_SHIFT) != rs表示当前的sc计算之后得到的rs不一样,也就是说n发生了变化 transferIndex <=0 说明没有需要帮助转移的任务了 sc == rs + MAX_RESIZERS表示当前已经达到最大的转移线程了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//在sc里面加一,相当于领取一个任务,然后开始transfer
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

9、initTable

在第一次put的时候,如果表还没有初始化,会调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) //初始化map,如果没有拿到锁,就先自旋等待
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2); //设定阈值为0.75n
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

10、putVal

实际的插入函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
final V putVal(K key, V value, boolean onlyIfAbsent) {
//校验key或者value是否为空,跟hashmap的区别的,hashmap的key可以为空,concurrentHashmap的key和value都不能为空
if (key == null || value == null) throw new NullPointerException();
//计算key的hashCode,跟hashmap里面的计算规则一样
int hash = spread(key.hashCode());
int binCount = 0;
//对表循环操作,找到要操作的点
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果表还没有初始化,调用initTable对表进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果要插入的索引位置还没有元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 使用cas操作,将新节点插入到索引位置去,该操作没有锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果当前索引正在被移位,也就是说正在扩容,则帮助进行移动
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//开始正常的插入操作
V oldVal = null;
//使用同步操作,来获取最新的f
synchronized (f) {
//如果当前索引上面的值没有发生变化,因为在这期间可能有线程对f进行了操作,就会导致索引上面的对象发生了变化
if (tabAt(tab, i) == f) {
//fh 为f.hash,如果f.hash>0,说明至少有一个元素在上面
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果key相同,hash相同,那么就替换原有的value值,然后把旧值返回去
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果e.next == null说明是bucket的最后一个节点了,就把新的节点加入到最后一个节点后面
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果节点是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
//因为已经是树了,而binCount主要是拿来做转换成树的操作的,所以随便给个值就好了
binCount = 2;
//向树中添加节点,如果原来有相同key的节点存在,替换旧值,并返回旧值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//如果bucket的容量大于0
if (binCount != 0) {
//如果达到了转换成树的阈值。执行转换操作
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//如果put之前的值不为空,返回旧值
if (oldVal != null)
return oldVal;
break;
}
}
}
//计数器增加
addCount(1L, binCount);
return null;
}