JDK1.7ConcurrentHashMap源码分析

ConcurrentHashMap简介

ConcurrentHashMap中应用了很多多线程的知识,对于学习和应用并发知识很有帮助。
java.util.concurrent.ConcurrentHashMap是JDK1.5中新增的并发集合类的一种。
目的是用来代替之前的线程安全的Hashtable,以及在多线程访问情况下的HashMap需求。

HashMap本身不是线程安全的,需要调用时进行加锁协调,来达到多线程安全的原子性、可见性、重排序三个方面的要求。上一篇文章介绍了HashMap实现,内部通过Entry链表的数组作为数据结构,在多线程访问时可能由于没有同步,在修改next、设置数组值、resize()等等方法执行时都可能出现竞态条件, 并且没有使用happen-before机制,并不能保证一个线程的修改能够对之后其他线程可见,并且没有足够的同步来防止重排序问题。

Hashtable的主要问题在所有的方法都是使用synchronize关键字进行加锁访问的,虽然保证了数据访问的线程安全性但是同一时刻只能准许一个线程执行方法内的代码极大地限制了线程的并发能力,所以现在Hashtable已经很少使用。

ConcurrentHashMap在提供和Hashtable同样方法和相同的线程安全语义的基础上,大多数读不加锁,修改分散到多个分段锁上(类似数据库的分库分表处理),极大提升了并发能力,通过volatile、synchronized等语义保证可见性和防止重排序。

当不依赖Hashtable以实例this作为synchronized的监视器锁对象时,ConcurrentHashMap都可以替代原来使用Hastable的地方。

由于JDK1.7和JDK1.8版本实现方式差别较大,下面分别介绍两个版本中的主要实现。

JDK1.7分析

结构分析

简单来说,ConcurrentHashMap包含一个Segment数组,每个Segment又是一个HashMap, 内部又是一个HashEntry数组,每个HashEntry中包含了key, value, 指向同一个数组上下一个HashEntry上的指针。Map保存的数据保存在HashEntry的key和value中,所以ConcurrentHashMap可以理解为一个table的table,首先通过key的hash的前几个bit来定位到Segment数组的索引,再用该hash在Segment中定位到具体的HashEntry。

ConcurrentHashMap中用到了大量数组操作,值得注意的是,即使volatile修饰数组,如果没有其他操作,读取和修改数组中的变量并不能保证volatile的读和写语义。JDK1.5后提供了AtomicIntegerArray等类能够解决这一问题,但是在JDK1.7的ConcurrentHashMap中,使用的是sun.misc.Unsafe类的getObjectVolatile的方式,代码中注释的解释是’ These provide the functionality of AtomicReferenceArrays but reduce the levels of indirection. ‘。

ConcurrentHashMap中修改操作(如put、remove)是需要对对应的Segment加锁的,大多数的读不加锁,但是size()、containsValue遇到并发修改竞争时需要进行全表加锁。

和HashMap一样,ConcurrentHashMap的DEFAULT_INITIAL_CAPACITY为16,DEFAULT_LOAD_FACTOR为0.75。
另外增加了concurrencyLevel参数,concurrencyLevel默认为16,表示的是并发级别即期望的并发修改ConcurrentHashMap的线程的数量的估计,在内部用于决定分段Segment的数量。方法是找到第一个不小于concurrencyLevel的2的次方数来作为Segment数组的大小,例如默认16的话,Semgent数组长度就是16,concurrentLevel是100的话数组长度是128。
这样将initialCapacity平分到各个Segment中,再找到对应的2的次方值, 作为每个Segment中HashEntry数组的大小。

构造器

构造器中的处理如下, 按照上述concurrentLevel的说明计算出Segment数组大小,创建Segment数组。由于segments用final修饰,能够保证线程安全的发布。中间有一步是创建第一个Segment并赋值到segment[0]上,用的是Unsafe.putOrderedObject方法,putOrderedObject并不具备volatile写的内存语义,
// FIXME
使用的是#StoreStore内存屏障而不是#StoreLoad,#StoreStore比#StoreLoad快很多,它保证了屏障前的所有的stores/save都在屏障的store前完成,并不能保证stores的顺序性。
因为只有第一个Segment在构造器时创建了,所以之后的方法都需要先检查Segment是否为空,修改ConcurrentHashMap的方法使用ensureSegment返回对应索引的Segment,如果为空则利用CAS创建一个新的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

每个Segment相当于一个HashMap,内部是HashEntry对象的数组,put和get时先根据hash映射到其中一个Segment,在映射到Segment中的数组的一个位置上。映射方式是取hash的前面若干位进行取模。
例如,在ConcurrentHashMap中,Segment数组的大小是16的话,需要4个bit位进行取模。因此定义了两个final字段在构造器中进行赋值,也就是上面代码中出现的segmentShift和segmentMask

。当concurrentLevel为16时,segmentShift为32 - 4即28,segmentMask为16 - 1 = 0x1111。这样当计算出hash后,将hash无符号右移28位与上0x1111,即对Segment数组大小取模,得到具体的数组中的位置。

ConcurrentHashMap结构

首先看下ConcurrentHash的主要框架结构, Segment继承了ReentrantLock只是为了方便使用它的加锁功能,如tryLocklock等,相比synchronized关键字, ReentrantLock的锁能够在不同的方法块中加锁解锁,并且可以使用tryLock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ConcurrentHashMap {
final int segmentMask;
final int segmentShift;
final Segment<K,V>[] segments;
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
private void rehash(HashEntry<K,V> node) {
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
}
}
}

put方法

看一下put方法的处理
首先对key进行hash后,取模得到j, 然后找到Segment数组中对应的Segment,委托给Segment的put方法。ConcurrentHashMap中使用了很多Unsafe类的功能,用于获取访问对象的内存结构(如UNSAFE.putOrderedObject)、一些CAS方法(如UNSAFE.compareAndSwapObject),以及通过volatile语义访问设置字段(如UNSAFE.getObjectVolatile)。

1
2
3
4
5
6
7
8
9
10
11
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

ensureSegment

当对应的Segment为空时,ensureSegment(int index)方法会尝试创建一个。其中volatile读Segment[j]为null时可能对应两种情况
// FIXME

  1. 对应的Segment还没有创建
  2. 对应的segment创建了但是还没有从写入线程的write buffer中flush。
    执行过程为
  3. 使用Unsafe进行volatile读取Semgent数组对应位置的值,如果为空,则从Segment[0]复制相关的Entry数组长度capacity、loadFactory等值。
  4. 在进行一次volatile读的recheck,如果为空,则继续创建一个Segment对象。
  5. 使用while循环和Unsafe的CAS进行原子性替换Segment数组对应索引的元素,这种乐观锁的方式避免了加锁可能导致的线程挂起,也称为lock-free。当线程t1和t2都看到Segment[j]为null时,最多只有一个能够通过UNSAFE.cas替换成功,并且如果t1成功了,t2下一个while循环的volatile读能够看到t1所做的修改,即写入的值。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
    Segment<K,V> proto = ss[0]; // use segment 0 as prototype
    int cap = proto.table.length;
    float lf = proto.loadFactor;
    int threshold = (int)(cap * lf);
    HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
    == null) { // recheck
    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
    == null) {
    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
    break;
    }
    }
    }
    return seg;
    }

Segment数组索引计算

看一下SSHIFT,SBASE是怎么计算出来的。在ConcurrentHashMap类的静态初始化块中。计算了hashSeed、segmentShift、segmentMask、segments等字段在ConcurrentHashMap对象内存布局中的offset,用于之后的UNSAFE.putObjectVolatile等方法。以开启了UseCompressedOops的64位虚拟机为例,
UNSAFE.arrayBaseOffset获取数组内容开始的offset, 算上8bytes markword、 4bytes 类型指针、4 byte数组长度是16。UNSAFE.arrayIndexScale获取数组内元素所占长度大小,引用类型占4bytes,所以UNSAFE.arrayBaseOffset(Segment[].class)为4。
ConcurrentHashMap继承的父类中定义了keySet和values,加上对象头中的8bytes MarkWord和4bytes对象类型指针,所以第一个字段的offset为8 + 4 + 4 + 4 = 20,所以
HASHSEED_OFFSET, SEGMASK_OFFSET, SEGSHIFT_OFFSET, SEGMENTS_OFFSET依次是20, 24, 28, 32。SSHIFT和TSHIFT分别是Segment数组和HashEntry数组中每个元素所占空间的大小,因为都是对象引用所以都是4,用于使用(Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE))这样的方式进行数组访问。
chm_fieldoffset
chm_jol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long SBASE;
private static final int SSHIFT;
private static final long TBASE;
private static final int TSHIFT;
private static final long HASHSEED_OFFSET;
private static final long SEGSHIFT_OFFSET;
private static final long SEGMASK_OFFSET;
private static final long SEGMENTS_OFFSET;
static {
int ss, ts;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class tc = HashEntry[].class;
Class sc = Segment[].class;
TBASE = UNSAFE.arrayBaseOffset(tc);
SBASE = UNSAFE.arrayBaseOffset(sc);
ts = UNSAFE.arrayIndexScale(tc);
ss = UNSAFE.arrayIndexScale(sc);
HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("hashSeed"));
SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segmentShift"));
SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segmentMask"));
SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("segments"));
} catch (Exception e) {
throw new Error(e);
}
if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
throw new Error("data type scale not a power of two");
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}

Segment.put

上面的一些过程了解后,可以看到最终put交给了Segment的put方法, 每个Segment有相当于一个HashMap,是由HashEntry数组组成的,HashEntry又通过next指针构一个链表,put所做的操作就是寻找对应的key是否存在,如果存在替换,否则创建一个。put操作里面需要进行加锁,因为Segment继承于ReentrantLock,就拥有了和synchronized相同的锁功能,但是有提供了tryLock这样的非阻塞方法。

  1. 调用tryLock尝试加锁,如果加锁成功,则node为null,否则scanAndLockForPut自旋等待并且寻找对应key的节点是否存在
  2. 获取到锁后,找到对应的HashEntry first, 向后遍历寻找是否有和要找的key equals的HashEntry。
  3. 如果找到了,则按照是否是putIfAbsent判断是否替换旧的value值
  4. 如果没有找到(循环到null),则创建一个节点或使用scanAndLockForPut中创建的节点作为新的头结点,判断是否需要rehash, 并设置到HashEntry数组对应的位置。
  5. 最后解锁,返回key原来对应的value值。

scanAndLockForPut中的处理为不断尝试获取锁的过程中遍历链表,如果最终没有找到和key equals的HashEntry则创建一个HashEntry并在获取到锁后返回它,如果自旋超过一定次数则使用lock进行加锁等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

scanAndLockForPut

scanAndLockForPut是一个优化过程,没有获取到锁的线程在循环尝试获取锁的过程中遍历链表,去寻找和key相等的节点,如果没有找到则投机地创建一个,方法返回时说明已经获取到了锁,并且如果返回的HashEntry不为空说明是链表中没有对应的key,并且返回值是刚创建的key-value,可以设置到对应的HashEntry头结点。
处理过程如下

  1. entryForHash进行一次volatile读, 然后在循环中尝试tryLock
  2. tryLock不成功的情况下,如遍历链表,有一个retries变量初始时为-1, 当遍历到最后一个节点时还没有找到则创建一个HashEntry并且将retries置为0,如果找到了对应的key的Entry也将retries置为0。
  3. MAX_SCAN_RETRIES在类初始化时根据系统的CPU处理器的数量决定,如果CPU大于1则是64,否则是1。
  4. 创建好HashEntry或已经匹配到HashEntry后则进入下一个else if,其中++retries判断如果大于MAX_SCAN_RETRIES就是用lock()方法中断当前的spin lock。
  5. 如果不大于MAX_SCAN_RETRIES, 则判断retryies是否为奇数,这个其实是给单处理应用的,并且判断当前的HashEntry的头节点是否和循环开始时的相等,如果不相等说明有其他线程进行了修改,需要重新遍历链表。这里的 (retries & 1) 确认容易让人不解,因为多处理器时64的话retries也会出现2这种偶数的可能,但是最终加锁后还是会进行重新读取HashEntry链表的,所以这里如果有读取到stale数据最终是能够发现的。那这个方法的意义在于什么呢?首先通过读取HashEntry帮助将对应的数据加载到CPU缓存行中并且帮助出发JIT编译来加热代码,其次如果创建了HashEntry并且最终却是没有对应的key的话节省了创建HashEntry的时间,这两个用途都能够减少之后加锁的时间。Doug lea对(reties &1)这个用法也进行了回复http://altair.cs.oswego.edu/pipermail/concurrency-interest/2014-August/012881.html
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {
    HashEntry<K,V> f; // to recheck first below
    if (retries < 0) {
    if (e == null) {
    if (node == null) // speculatively create node
    node = new HashEntry<K,V>(hash, key, value, null);
    retries = 0;
    }
    else if (key.equals(e.key))
    retries = 0;
    else
    e = e.next;
    }
    else if (++retries > MAX_SCAN_RETRIES) {
    lock();
    break;
    }
    else if ((retries & 1) == 0 &&
    (f = entryForHash(this, hash)) != first) {
    e = first = f; // re-traverse if entry changed
    retries = -1;
    }
    }
    return node;
    }

rehash

Segment的rehash是在获取锁之后进行的,所做的操作时扩大一倍Segment中数组的长度,然后将旧的数组复制过去,再将新数组替换。 由于前后有锁的获取和释放语义,不需要考虑多线程问题,按照单线程处理即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

get

get方法相对简单一些,通过hash先找到Segment再找到Segment中的HashEntry即可。并且这个过程没有加锁,通过volatile读语义保证可见性。如果同时有put操作增加了HashEntry,由于是在链表头部添加的(先设置HashEntry的next为原来的头节点,再将该HashEntry设置为头节点),不会对get造成影响。只有当put、remove等修改结果需要将锁release后才能保证对get可见。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;

size和containsValue

size()和containsValue都不能用key进行路由到单一Segment进行处理,所以都需要遍历整个map,并且如果遇到竞争还需要降级到所有segment加锁的方式。
下面以size()为例进行分析,

  1. 复制segment的引用到本地变量表,减少之后segment变量获取的开销(aload相对于getField, 虽然差别很小)。复制到本地变量在其他场景中还有防止对象变化导致前后不一致的问题。
  2. 循环中对各个segment的count和modCount进行相加得到对应的两个总数size和sum,并将sum记为最近一次sum值last,如果当期sum和上一次sum相等说明这两次循环间没有其他线程修改内部数据,则直接返回size,否则继续循环
  3. 如果循环次数达到RETRIES_BEFORE_LOCK(固定为2)则降级为对segment依次加锁,由于加锁顺序一致不会导致死锁。加锁后则两次读到的modCount和肯定一致了。最终在finally中进行解锁(retries超过RETRIES_BEFORE_LOCK的情况)。
  4. 值得注意的点是Segment中的modCount和count两个变量没有使用volatile修饰也没有用Unsafe的volatile读写,这里利用的是要么在加锁内要么在volatile读之间访问的来保证可见性。Accessed only either within locks or among other volatile reads that maintain visibility。例如put方法修改modCount是在Segment锁内进行的,当t2调用完put释放锁后,对modCount的修改happen-before于之后对Segment的volatile读,size方法中使用的segmentAt方法涉及到了Segment的volatile读,并且由于顺序性规则happen-before与之后的segment.modCount读取。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum; // sum of modCounts
    long last = 0L; // previous sum
    int retries = -1; // first iteration isn't retry
    try {
    for (;;) {
    if (retries++ == RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
    sum += seg.modCount;
    int c = seg.count;
    if (c < 0 || (size += c) < 0)
    overflow = true;
    }
    }
    if (sum == last)
    break;
    last = sum;
    }
    } finally {
    if (retries > RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    segmentAt(segments, j).unlock();
    }
    }
    return overflow ? Integer.MAX_VALUE : size;
    }

JDK1.8简单实现分析

JDK1.8中虽然有6千行代码,但是大部分是用来实现类似1.8中StreamAPI的功能的。

实现概述

1.8中采取的是和HashMap中类似的单数组链表策略,并且在链表长度超过一定大小时转换成红黑树。
每个数组元素称为为一个bin或bucket,里面存放的是Node。
Node代表了一个键值对,并通过next指向下一个节点。Node有一些子类,TreeNode是用平衡树组织的节点,ForwardingNode用于在resize的时候放在Node的头结点。
Node<K,V>[] table是内部数据的bin数组, table的大小。
sizeCtl用于控制table的初始化和resize。
sizeCtl为负数的时候表示正在进行初始化或者resize, -1表示正在初始化,或者-(1 + 活动的resize线程数), 当table为null的时候存储的是初始时的table的大小或者0表示默认值。
初始化完成后,它表示下一个要进行resize的值。
sizeCtl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
/**
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

Put操作

Put操作的处理过程,

  1. 按照key的hashCode计算最终的hash值
  2. 如果这时候table还是null或长度为0,则进行初始化。
  3. 找到table数组中对应的元素Node ,如果第一个Node为null,则通过CAS的方式替换table数组的这个值,如果CAS成功返回
  4. 如果node的hash为-1, 表示当前Node正在进行resize,则会一起helpTransfer帮助resize
  5. 其他的情况需要锁住第一个Node
  6. 如果头结点的hash大于0说明是普通的链表Node,只需要向后遍历,如果遇到key一样的Node根据是否onlyIfAbsent判断是否替换,如果遍历完成没有找到,则创建一个放在链表最后。Nodel中的value和next值都是volatile修饰的。
  7. 否则Node是TreeNode的情况时,委托给TreeBin的putTreeVal方法。
  8. 判断这个bin容器中Node的数量是否超过了需要转换成树的阈值,则将链表转换成树。
  9. 调用addCount, 其中会检查当前键值对的数量是否超过了sizeCtl的值,如果是则需要进行扩容,transfer将原来的数据移动到新的Node table中。

Put实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

InitTable实现

通过CAS抢占设置sizeCtl值为-1,成功的进行Node[]表的创建,其他的需要暗示运行时、CPU让出线程调度。
创建的数组的大小总是2的次方,之后设置sizeCtl为数组大小capacity * 0.75作为判断是否进行resize的threshold。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

treeifyBin实现

  1. 首先判断
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
    if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
    tryPresize(n << 1);
    else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
    synchronized (b) {
    if (tabAt(tab, index) == b) {
    TreeNode<K,V> hd = null, tl = null;
    for (Node<K,V> e = b; e != null; e = e.next) {
    TreeNode<K,V> p =
    new TreeNode<K,V>(e.hash, e.key, e.val,
    null, null);
    if ((p.prev = tl) == null)
    hd = p;
    else
    tl.next = p;
    tl = p;
    }
    setTabAt(tab, index, new TreeBin<K,V>(hd));
    }
    }
    }
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

addCount

addCount中首先如果CounterCell不为空或CAS一次加baseCount失败的情况, 则使用CounterCell统计键值对counter数。
如果check大于0,则查看当前的sumCount是否大于了sizeCtl,是的话需要调用transfer方法进行扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

sum实现

更多参考

感觉有收获的话,请我吃个早饭吧!