0%

Java[并发]-ConcurrentHashMap实现分析(JDK7)

oncurrentHashMap是一个支持并发操作的Map,比Hashtable的效率要高很多。Hashtable使用Synchonized来实现同步,而ConcurrentHashMap综合使用了多种并发编程的技术来实现。主要使用了“锁分段”技术来降低锁的颗粒度,同时运用volatile的内存语义,ReentrantLock的API来实现。里面很多并发编程的技巧值得学习。JDK8变化较大,但基本思想变化不大,后面再分析。
ConcurrentHashMap中包含了多个Segment对象,而Segment对象是继承ReentrantLock的一个实现,所以ConcurrentHashMap中有很多个锁。Segment包含了HashEntry数组。HashEntry对象才是存储数据的对象。ConcurrentHashMap的类图如下:

java-concurrence-map

ConcurrentHashMap的主要的方法都是通过Segment中的方法来实现的。如果修改时是在不同的Segment中进行的,那么就可以并发地修改而不需要锁。从ConcurrentHashMap中读取数据时,几乎也是无锁的,因为在Segment中维护了一个volatile的count整型变量,利用volatile的内存语义,在读一个key之前,都先读count变量,根据happen-before规则,对volatile变量的写总是发生在读之前并且volatile禁止重排序,包含编译时和运行时的处理器重排序,可以保护读写count变量部分的代码。但有些操作需要跨所有的Segment进行,那么就需要加锁,所有的Segemment循环顺序加锁和解锁,比如size()方法。

一、hash算法

作为一个Map结构,hash的算法尤其重要,需要使数据能均匀地分布在各个Segment中,以防某个Segment出现热点,性能下降。另外需要确定数据存储在哪个Segment中,这里通过段的偏移值(segmentShift)和段的掩码(segmentMask)来计算数据应该保存在哪个段中。段中的hash表的个数都为2的n次方,是为了快速定位到hash糟的位置。这里的计算都是通过位运算来实现的。先看ConcurrentHashMap中的hash方法:

1
2
3
4
5
6
7
8
9
10
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}

通过重新hash后,数据会更加分散,分布在各个Segment中更均匀;
再看看segmentShift和segmentShift的计算方法,它与并发级别变量concurrencyLevel有关,默认为16;

1
2
3
4
5
6
7
8
9
10
11
12
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

通过ConcurrentHashMap的构造方法计算出segmentShift和segmentShift值后,就可以利用这两个值来计算数据应该放在哪个Segment中。如果按默认的无参数构造函数new出来的ConcurrentHashMap,默认的concurrencyLevel为16,计算出来的segmentShift = 28,segmentMask =5,所以Segment的数量为16个。
再看看Segment中初始化的hash表的大小:

1
2
3
4
5
6
7
8
9
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;

for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);

如果默认的容量为16,那么计算得到cap=1,即每个Segment中的HashEntry数组长度为1。
要定位到数据应该存储在哪个Segment中,是通过segmentFor方法实现的:

1
2
3
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

右移segmentShift位后再&上掩码segmentMask,可以保证segments数组的下标小于segmentMash。

二、ConcurrentHashMap的数据结构

ConcurrentHashMap的数据结构主要由Segment构成,而Segment则由HashEntry构成。HashEntry的结构由一些final的变量和volatile变量组成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;

HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}

@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}

可以看到,其中的value是volatile的,可以保证它的修改内存可见性。next引用,使HashEntry可以形成一个链表结构。

1
2
3
4
5
6
7
8
9
10
11
12
13

static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* segment中的元素数量.
*/
transient volatile int count;
/**
*hash表更新时记录
*/
transient int modCount;
transient int threshold;
transient volatile HashEntry<K,V>[] table;
final float loadFactor;

Segment则为volatile的count变量,volatile的HashEntry数组变量,modCount变量、扩展闽值threshold和负载因子loadFactor组成。Segment还有map进行各种操作的方法,ConcurrentHashMap的各种操作方法基本是通过Segment实现的。count变量的作用就是利用volatile变量的内存可见性,对读写进行保护,是实现无锁的重要技巧。Map的读取操作之前都要先读count,保证写操作已经完成.

java-concurrence-map2

三、ConcurrentHashMap中各种操作的实现

1、put方法实现
实现的代码如下:

1
2
3
4
5
6
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);
}

Segment中的put方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}

从代码中可以看出实现的流程如下:
1)、先计算key的hascode,再通过hash方法重新hash后,调用segment对象的put方法来实现;
2)、因为put涉及对HashEntry数组进行修改,所以先加锁。如果元素的数量已经大于闽值,则需要调用rehash方法进行扩容;
3)、判断是否存在相同的结点,如果存在则替换这个节点的值;否则创建一个新的HashEntry结点,加到table的首部位置 ;因为HashEntry的key,hash,next,是final类型的变量 ,因此不能把节点添加到链表的中间或结尾位置 ,这可以保证,访问节点时,这个节点的后面链接不会改变,添加的节点只能添加到Hash链表的首节点。
4)、更新modCount和volatile变量count,更新count需要放在最后一步,保证前面的修改生效;

2、get方法
get方法的相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}

/**
* 在value为null时调用。只有编译器刚好在HashEntry初始化它的table声明时发生,
* 即tab[index]= new HashEntry()时,
* 对于内存模型是合法的,但是执行中不知道有没有发生;
*/
V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}

实现流程如下:
1)、先读取volatile变量count,保证HashEntry数组的修改可见;
2)、根据hash值获取HashEntry对象,遍历table查找,如果对象是要找的对象并且不为空,则直接返回;
3)、如果读到null值则通过readValueUnderLock方法再读;读到null值的原因是在put操作tab[index] = new HashEntry<K,V>(key, hash, first, value)在构造HashEntry时,HashEntry的构造方法中对value的赋值和tab[index]的赋值可能出现重排序,这就导致结点的值为null。这种情况很罕见,ConcurrentHashMap会在持有锁的情况下,再读一次,保证不会读到null值。

3、remove方法
remove方法的相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public V remove(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).remove(key, hash, null);
}
V remove(Object key, int hash, Object value) {
lock();
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
//遍历hash链表,找到匹配的节点
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
++modCount;
//获取要删除节点的下一节点
HashEntry<K,V> newFirst = e.next;
//在被删除节点后继的节点都保留,而所有前面的节点都复制
for (HashEntry<K,V> p = first; p != e; p = p.next){
//被删除节点的前面所有节点复制产生的链表顺序将和以前相反
//因为后面的节点最后遍历作为首节点添加到链接首部
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
}
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
} finally {
unlock();
}
}

从代码可以看出,remove的实现流程如下: 1)、定位到对应的Segment,调用segment的remove方法;
2)、首先加锁,计算到hash表的首节点,从首节点遍历,直到找到要删除的节点;
3)、定位到要删除节点的下一节点,删除节点前面的节点都重新复制,生成的hash表的顺序刚好和原来相反;
4)、删除节点的后面节点的hash表保持不变;最后写volatile的count值。

4、size()方法的实现
size方法的相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public int size() {
final Segment<K,V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
//循环两次,通过计算对比,直接得到集合的长度
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
check = 0;
sum = 0;
int mcsum = 0;
//计算count的和、modCount的和
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
}
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
check += segments[i].count;
//说明在循环时,map发生了变化。再重试1次
if (mc[i] != segments[i].modCount) {
check = -1; // force retry
break;
}
}
}
//如果相等,说明此时的长度就是count的sum;
if (check == sum)
break;
}
//如果失败,则通过对segments顺序加锁计算出map的长度
if (check != sum) { // Resort to locking all segments
sum = 0;
for (int i = 0; i < segments.length; ++i)
segments[i].lock();
for (int i = 0; i < segments.length; ++i)
sum += segments[i].count;
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();
}
//如果长度大于Integer.MAX_VALUE,直接返回Integer.MAX_VALUE
if (sum > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else
return (int)sum;
}

size方法的实现流程如下:
1)、通过计算segment的count,和modCount变量,尝试(默认2次循环)通过比较,得到map的长度;
2)、如果1)失败,则通过循环顺序给segment加锁,计算出map的长度。

总结:
从ConcurrentHashMap的实现可以看出,实现过程都尽量避免加锁操作:
1)、通过锁分段技术,减少锁的范围;
2)、通过volatile变量的同步语义,来实现读操作基本无锁;
3)、通过final变量的同步语义,来维护hash表更新;
4)、在跨segment实现时,通过re-check的方式避免加锁;如size()方法,containtsValue方法