oncurrentHashMap是一个支持并发操作的Map,比Hashtable的效率要高很多。Hashtable使用Synchonized来实现同步,而ConcurrentHashMap综合使用了多种并发编程的技术来实现。主要使用了“锁分段”技术来降低锁的颗粒度,同时运用volatile的内存语义,ReentrantLock的API来实现。里面很多并发编程的技巧值得学习。JDK8变化较大,但基本思想变化不大,后面再分析。 ConcurrentHashMap中包含了多个Segment对象,而Segment对象是继承ReentrantLock的一个实现,所以ConcurrentHashMap中有很多个锁。Segment包含了HashEntry数组。HashEntry对象才是存储数据的对象。ConcurrentHashMap的类图如下:
ConcurrentHashMap的主要的方法都是通过Segment中的方法来实现的。如果修改时是在不同的Segment中进行的,那么就可以并发地修改而不需要锁。从ConcurrentHashMap中读取数据时,几乎也是无锁的,因为在Segment中维护了一个volatile的count整型变量,利用volatile的内存语义,在读一个key之前,都先读count变量,根据happen-before规则,对volatile变量的写总是发生在读之前并且volatile禁止重排序,包含编译时和运行时的处理器重排序,可以保护读写count变量部分的代码。但有些操作需要跨所有的Segment进行,那么就需要加锁,所有的Segemment循环顺序加锁和解锁,比如size()方法。
一、hash算法 作为一个Map结构,hash的算法尤其重要,需要使数据能均匀地分布在各个Segment中,以防某个Segment出现热点,性能下降。另外需要确定数据存储在哪个Segment中,这里通过段的偏移值(segmentShift)和段的掩码(segmentMask)来计算数据应该保存在哪个段中。段中的hash表的个数都为2的n次方,是为了快速定位到hash糟的位置。这里的计算都是通过位运算来实现的。先看ConcurrentHashMap中的hash方法:
1 2 3 4 5 6 7 8 9 10 private static int hash (int h) { h += (h << 15 ) ^ 0xffffcd7d ; h ^= (h >>> 10 ); h += (h << 3 ); h ^= (h >>> 6 ); h += (h << 2 ) + (h << 14 ); return h ^ (h >>> 16 ); }
通过重新hash后,数据会更加分散,分布在各个Segment中更均匀; 再看看segmentShift和segmentShift的计算方法,它与并发级别变量concurrencyLevel有关,默认为16;
1 2 3 4 5 6 7 8 9 10 11 12 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0 ; int ssize = 1 ; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } segmentShift = 32 - sshift; segmentMask = ssize - 1 ; this .segments = Segment.newArray(ssize);
通过ConcurrentHashMap的构造方法计算出segmentShift和segmentShift值后,就可以利用这两个值来计算数据应该放在哪个Segment中。如果按默认的无参数构造函数new出来的ConcurrentHashMap,默认的concurrencyLevel为16,计算出来的segmentShift = 28,segmentMask =5,所以Segment的数量为16个。 再看看Segment中初始化的hash表的大小:
1 2 3 4 5 6 7 8 9 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1 ; while (cap < c) cap <<= 1 ; for (int i = 0 ; i < this .segments.length; ++i) this .segments[i] = new Segment<K,V>(cap, loadFactor);
如果默认的容量为16,那么计算得到cap=1,即每个Segment中的HashEntry数组长度为1。 要定位到数据应该存储在哪个Segment中,是通过segmentFor方法实现的:
1 2 3 final Segment<K,V> segmentFor (int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
右移segmentShift位后再&上掩码segmentMask,可以保证segments数组的下标小于segmentMash。
二、ConcurrentHashMap的数据结构 ConcurrentHashMap的数据结构主要由Segment构成,而Segment则由HashEntry构成。HashEntry的结构由一些final的变量和volatile变量组成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static final class HashEntry <K ,V > { final K key; final int hash; volatile V value; final HashEntry<K,V> next; HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this .key = key; this .hash = hash; this .next = next; this .value = value; } @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V>[] newArray(int i) { return new HashEntry[i]; } }
可以看到,其中的value是volatile的,可以保证它的修改内存可见性。next引用,使HashEntry可以形成一个链表结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 static final class Segment <K ,V > extends ReentrantLock implements Serializable { transient volatile int count; transient int modCount; transient int threshold; transient volatile HashEntry<K,V>[] table; final float loadFactor;
Segment则为volatile的count变量,volatile的HashEntry数组变量,modCount变量、扩展闽值threshold和负载因子loadFactor组成。Segment还有map进行各种操作的方法,ConcurrentHashMap的各种操作方法基本是通过Segment实现的。count变量的作用就是利用volatile变量的内存可见性,对读写进行保护,是实现无锁的重要技巧。Map的读取操作之前都要先读count,保证写操作已经完成.
三、ConcurrentHashMap中各种操作的实现 1、put方法实现 实现的代码如下:
1 2 3 4 5 6 public V put (K key, V value) { if (value == null ) throw new NullPointerException(); int hash = hash(key.hashCode()); return segmentFor(hash).put(key, hash, value, false ); }
Segment中的put方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 V put (K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1 ); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null ) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null ; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; } return oldValue; } finally { unlock(); } }
从代码中可以看出实现的流程如下: 1)、先计算key的hascode,再通过hash方法重新hash后,调用segment对象的put方法来实现; 2)、因为put涉及对HashEntry数组进行修改,所以先加锁。如果元素的数量已经大于闽值,则需要调用rehash方法进行扩容; 3)、判断是否存在相同的结点,如果存在则替换这个节点的值;否则创建一个新的HashEntry结点,加到table的首部位置 ;因为HashEntry的key,hash,next,是final类型的变量 ,因此不能把节点添加到链表的中间或结尾位置 ,这可以保证,访问节点时,这个节点的后面链接不会改变,添加的节点只能添加到Hash链表的首节点。 4)、更新modCount和volatile变量count,更新count需要放在最后一步,保证前面的修改生效;
2、get方法 get方法的相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public V get (Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); } V get (Object key, int hash) { if (count != 0 ) { HashEntry<K,V> e = getFirst(hash); while (e != null ) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null ) return v; return readValueUnderLock(e); } e = e.next; } } return null ; } HashEntry<K,V> getFirst (int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1 )]; } V readValueUnderLock (HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
实现流程如下: 1)、先读取volatile变量count,保证HashEntry数组的修改可见; 2)、根据hash值获取HashEntry对象,遍历table查找,如果对象是要找的对象并且不为空,则直接返回; 3)、如果读到null值则通过readValueUnderLock方法再读;读到null值的原因是在put操作tab[index] = new HashEntry<K,V>(key, hash, first, value)在构造HashEntry时,HashEntry的构造方法中对value的赋值和tab[index]的赋值可能出现重排序,这就导致结点的值为null。这种情况很罕见,ConcurrentHashMap会在持有锁的情况下,再读一次,保证不会读到null值。
3、remove方法 remove方法的相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public V remove (Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).remove(key, hash, null ); } V remove (Object key, int hash, Object value) { lock(); try { int c = count - 1 ; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1 ); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null ; if (e != null ) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next){ newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); } tab[index] = newFirst; count = c; } } return oldValue; } finally { unlock(); } }
从代码可以看出,remove的实现流程如下:1)、定位到对应的Segment,调用segment的remove方法; 2)、首先加锁,计算到hash表的首节点,从首节点遍历,直到找到要删除的节点; 3)、定位到要删除节点的下一节点,删除节点前面的节点都重新复制,生成的hash表的顺序刚好和原来相反; 4)、删除节点的后面节点的hash表保持不变;最后写volatile的count值。
4、size()方法的实现 size方法的相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public int size () { final Segment<K,V>[] segments = this .segments; long sum = 0 ; long check = 0 ; int [] mc = new int [segments.length]; for (int k = 0 ; k < RETRIES_BEFORE_LOCK; ++k) { check = 0 ; sum = 0 ; int mcsum = 0 ; for (int i = 0 ; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0 ) { for (int i = 0 ; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1 ; break ; } } } if (check == sum) break ; } if (check != sum) { sum = 0 ; for (int i = 0 ; i < segments.length; ++i) segments[i].lock(); for (int i = 0 ; i < segments.length; ++i) sum += segments[i].count; for (int i = 0 ; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) return Integer.MAX_VALUE; else return (int )sum; }
size方法的实现流程如下: 1)、通过计算segment的count,和modCount变量,尝试(默认2次循环)通过比较,得到map的长度; 2)、如果1)失败,则通过循环顺序给segment加锁,计算出map的长度。
总结: 从ConcurrentHashMap的实现可以看出,实现过程都尽量避免加锁操作: 1)、通过锁分段技术,减少锁的范围; 2)、通过volatile变量的同步语义,来实现读操作基本无锁; 3)、通过final变量的同步语义,来维护hash表更新; 4)、在跨segment实现时,通过re-check的方式避免加锁;如size()方法,containtsValue方法