【从入门到放弃-Java】并发编程-JUC-ConcurrentHashMap

前言

上文【从入门到放弃-Java】并发编程-锁-synchronized中,我们介绍了可以使用内置锁synchronized同步类或代码块儿,到达线程安全的目的。

jdk帮我们把常用的一些模块封装成同步容器,如Vector、Hashtable、Collections.synchronizedXxx等。实现方式主要是将常用的容器类加了Synchronized同步。但我们知道,synchronized的频繁使用及竞争较为激烈时,对性能的影响比较大。

jdk1.5之后为我们提供了多种并发容器类,来提升同步容器的性能,这些类主要在java.util.concurrent包(简称juc,包内还有很多其它的并发工具类)中。我们本文先来学习下最常用的并发容器-ConcurrentHashMap。

ConcurrentHashMap

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/

// key和value都不能是null
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果key或者value是null则立即抛出空指针异常
if (key == null || value == null) throw new NullPointerException();

//求hash值,将哈希的高位扩展到低位,并将高位强制为0。主要是为了减少hash冲突。
int hash = spread(key.hashCode());
int binCount = 0;

//Node是Map.Entry的实现类,存放key、value。但key、value都不能是null。table的个数是2的n次方
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
//Node会延迟初始化、即在第一次插入数据的时候进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//以原子的方式获取Node数组n-1位置的node,如果未null,尝试插入新值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//使用cas的方式设置新node的key、value值
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
//如果Node是一个ForwardingNode,即有其它线程在扩容,则一起进行扩容操作
else if ((fh = f.hash) == MOVED)
//如果当前正在扩容,则当前线程加入一起帮助扩容。
tab = helpTransfer(tab, f);
//当使用putIfAbsent时,如果map中存在key,则返回对应的value
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
/**
* currentHashMap在JDK1.8中使用synchronized对需要修改的Node加锁同步,替代了JDK1.7及之前版本采用分段锁的方式。两种方式对比:
* 1、1.7采用数组+Segment+分段锁的方式实现,分段锁及将几个map分为多个类似hashmap的结构,内部是多个Entry链表数组。加锁时,使用ReentrantLock对访问的Segment加锁,其它Segment可以正常操作。缺点是寻找节点需要两次hash,一次找到Segment,一次找到Entry链表的头部。
* 2、1.8采用数组+链表或红黑树的方式实现。使用Node替代了Segment,采用了CAS及synchronized进行同步。当Node链表的长度大于阙值(默认为8)时,会将链表转化为红黑树,提升查找性能。
*
*/
//通过synchronized的方式,对当前Node进行加锁操作。
synchronized (f) {
//判断f节点是否已被其它线程修改
if (tabAt(tab, i) == f) {
//如果当前Node还是链表结构时
if (fh >= 0) {
binCount = 1;
//遍历Node链表,设置value
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果当前节点的key与我们要设置的key相等时,则将值设置为value。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//设置e为Node链表中的下一个元素,继续判断key是否相等,直到找到相等的key设置值。但如果链表中没有相等的key时,则在链表尾部新增一个元素,并设置值。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
//如果当前Node为红黑树结构时
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//设置值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
//如果Node链表的长度大于8时,判断是链表结构扩容,或者转为红黑树结构
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

/**
* Copies all of the mappings from the specified map to this one.
* These mappings replace any mappings that this map had for any of the
* keys currently in the specified map.
*
* @param m mappings to be stored in this map
*/
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//获取hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果获取到的Node的hash值和key的相等,则说明是链表。
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果获取Node的hash值小于0则说明是非链式结构
else if (eh < 0)
//不断查找Node的下一个节点,知道找到为止
return (p = e.find(h, key)) != null ? p.val : null;
//不断查找Node的下一个节点,直到找到为止(感觉和find重复了。最外层的if中只需要一个Node::find方法就能搞定。知道原因的大神请指正)
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

treeifyBin

扩容或将结构转为红黑树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n;
if (tab != null) {
//如果当前Node数组小于64则扩容,大于64时则转换为红黑树结构
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//rehash:resize
tryPresize(n << 1);
//如果是链表结构则转换为红黑树结构
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
//创建树节点,加入红黑树中
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

tryPresize

扩容操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Tries to presize table to accommodate the given number of elements.
*
* @param size number of elements (doesn't need to be perfectly accurate)
*/
private final void tryPresize(int size) {
//size在传入前已经翻倍,这里会再次调整,变为为:大于(1.5 * oldSize + 1)的2的幂,且小于MAXIMUM_CAPACITY的大小
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
//当sizeCtl小于等于0时。说明已有线程在初始化或者rehash了
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
如果table是空,即未初始化的话,进行初始化。
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//sc = n - n / 4 = 0.75,在final中,将sizeCtl设置为当前大小的0.75倍。大于这个阙值时,会再次进行扩容。
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//如果还未开始迁移
else if (tab == table) {
int rs = resizeStamp(n);
if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 开始迁移
transfer(tab, null);
}
}
}

transfer

将Node迁移至新的table中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//设置线程迁移数据的步长,单核步长为n,多核为(n >>> 3) / NCPU, 最小为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果要迁移的table还未初始化,则进行初始化动作
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//开始迁移,为要迁移的Node创建一个ForwardingNode节点。key和value都是null,hashcode为MOVED,nextTable指向新的table
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//表示一个节点已被迁移完毕,可以迁移下一个了。
boolean advance = true;
//迁移过程是否完毕。
boolean finishing = false; // to ensure sweep before committing nextTab
//i是迁移的起始位置,bound是迁移的末尾。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//其实位置大于结束位置,说明已经迁移完毕
if (--i >= bound || finishing)
advance = false;
//如果transferIndex小于等于0,则说明节点都已有线程在迁移了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//迁移结束
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//迁移完毕后,将新的table赋值给table成员变量,修改sizeCtl完成迁移
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//不相等说明还有线程没迁移完毕
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//所有线程迁移完毕后,设置finishing为完成。
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果tab[i] = null,设置为fwd
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果当前节点已经迁移,则处理下一个节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//加锁同步处理
synchronized (f) {
//验证下是否已经被其它线程处理
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果是链表结构
if (fh >= 0) {
//按照Node中元素hash值的第log(2)(n)位,记为runBit,是0或1将Node链表分为两个新的链表。
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//将runBit位为0的链表记为ln,为1的设为hn。这里是标记最后一个不一致的节点,lastRun后节点的runBit都一样,因此不用新修改节点,减少消耗
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
//将runBit位为0的链表记为ln,为1的设为hn。
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将分开的两个节点设置为table的i和i+n位。
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//如果是红黑树结构
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
//按照Node中元素hash值的第log(2)(n)位,记为runBit,是0或1将Node红黑树分为两颗树。
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
}
}
}

总结

本文分析了currentHashMap是如何实现线程安全并提升性能的、如何扩容、JDK1.7和1.8实现方式的区别等

  • 分Node加synchronize锁,不影响其它node的读写
  • Node节点hash冲突的元素数量少于8时,使用链表结构,大于等于8时,转换为红黑树结构提升查找性能
  • 扩容时,会将table的长度扩大为大于(1.5 * oldSize + 1)的2的幂大小,并将每个Node根据log(2)(n)位是0或1,分为两个Node,放在新table的i和i+n的位置
  • JDK1.8将原currentHashMap使用数组+segment+ReentrantLock的方式改为数组+Node+CAS+synchronized的方式。减少了hash次数并采用cas和红黑树等多种优化提升性能