0%

Java[并发]-ConcurrentLinkedQueue实现分析

ConcurrentLinkedQueue是采用循环CAS来实现的非阻塞线程安全的队列,它是基于链接节点的无界队列,采用”wait-free”算法,即循环CAS来实现。ConcurrentLinkedQueue队列不能存储Null元素。

一、ConcurrentLinkedQueue的结构

​ ConcurrentLinkedQueue中包含head和tail两个节点,分别指向首节点和尾节点,而它的类型是Node,一个内部类实现。每个Node节点由next节点和它的内容item组成。节点和节点之间通过next引用链接,形成链表结构。默认head节点为null,tail指向head节点。类图如下所示:

java-concurrence-queue5

可以看到,关键的head,tail引用都是volatile的,说明对它们的修改是内存可见的。
ConcurrentLinkedQueue具有队列常用的方法offer、poll、add、remove、contains、size、peek等

二、重点方法的实现

1、入队操作-offer的实现

入队操作主要做两件事:第一是定位出尾节点;第二是利用CAS算法将入队节点设置成尾节点的next节点,如果失败则循环重试。
入队的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//入队前,创建入队节点
Node<E> n = new Node<E>(e);
retry:
//死循环,入队不成功反复入队
for (;;) {
//创建一个指向tail节点的引用
Node<E> t = tail;
//p用来表示队列的尾节点,默认等于tail
Node<E> p = t;
for (int hops = 0; ; hops++) {
//获取p的下一个节点
Node<E> next = succ(p);
//如果next节点不为null,说明有线程修改了tail节点
if (next != null) {
//如果循环了2次及以上,且当前节点还不等于尾节点
if (hops > HOPS && t != tail)
continue retry;
//更新p指向next节点
p = next;
} else if (p.casNext(null, n)) {//如果p是尾节点,则设置p的next节点为入队节点
/**
* 如果tail节点有>=1个next节点,则将入队节点设置成tail节点,
* 更新失败也没关系,因为失败的话已经有别的线程更新了tail节点
*/
if (hops >= HOPS)
casTail(t, n); // Failure is OK.
//入队成功
return true;
} else {
//p有next节点,表示p的next节点是尾节点,则重新设置p节点
p = succ(p);
}
}
}
}

从代码可以看出,它的实现流程如下:
1).如果入队的元素为空,则抛出异常;否则,创建一个入队的节点n;
2).开始死循环,入队不成功则继续入队;
3).创建节点t指向tail引用,创建节点p作为尾节点,默认为t;
4).获取尾节点p的next节点,如果不为null则表示已经有别的线程修改了tail节点,如果循环2次以上,t节点还不等于tail节点,则返回从头循环,否则把尾节点p指向next节点继续循环;
5).如果尾节点p的next节点为null,则使用CAS方法casNext把入队节点更新为p的next节点,成功后如果发现hops变量已经大于HOPS常量,说明tail节点已经不是指向尾节点,则需要把tail指向入队的n节点,通过CAS方法casTail实现。这个过程失败也不会有影响,说明有别的线程成功更新了tail节点;
6).如果casNext失败,则需要把p指定它的后继节点,继续循环;
注意的是,入队方法永远返回true,所以不能利用它的结果来判断入队是否成功;

2、出队操作-poll的实现

出队就是把头节点的元素返回,并清空它的引用。并不是每次都更新head节点,当head节点有元素时,直接弹出元素,如果节点为空,才会更新head节点。出队操作的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public E poll() {
Node<E> h = head;
//p表示首节点,需要出队的节点
Node<E> p = h;
for (int hops = 0; ; hops++) {
//获取p的元素
E item = p.getItem();
//元素不为null,且CAS设置元素为null成功
if (item != null && p.casItem(item, null)) {
//如果循环次数大于HOPS
if (hops >= HOPS) {
//获取首节点的下一个设置成head节点
Node<E> q = p.getNext();
updateHead(h, (q != null) ? q : p);
}
//成功返回出队的元素
return item;
}
//首节点p的元素为空,说明已经有别的线程更新了首节点
//获取p的下一个节点
Node<E> next = succ(p);
//如果下一节点也为空,说明队列已经空了
if (next == null) {
updateHead(h, p);
break;
}
//如果首节点的下一节点不为空,把它设置成首节点
p = next;
}
return null;
}

从代码看出,它的实现流程如下:
1).设置h节点指向head节点,设置p节点为首节点,并默认为指向h;
2).如果辅助变量hops循环,直至成功出队;
3).获取首节点p中的元素,如果不为null,则通过CAS方法casItem,把p中的元素设置为null; 判断hops变量是否大于HOPS,如果大于,说明当前的head节点已经不是指向首节点,则需要把首节点p的下一个节点设置为head节点;
4).如果获取首节点p的中元素为null,说明已经有别的线程把首节点出队,则需要获取p的后继节点,如果后继节点也为null,说明队列已经空了,需要把p更新为h节点;,并且跳出循环;
5).如果队列不为空,则设置next节点为p节点,继续循环。

3、remove方法

remove方法可以删除队列中的某个元素的实例,它的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean remove(Object o) {
if (o == null) return false;
Node<E> pred = null;
//遍历队列
for (Node<E> p = first(); p != null; p = succ(p)) {
//获取队列节点中的元素item
E item = p.getItem();
//如果元素不为null并且等于要移除的对象,则通CAS方法casItem设置此节点的元素为null
if (item != null &&
o.equals(item) &&
p.casItem(item, null)){
//设置成功,则获取当前节点的后继节点next
Node<E> next = succ(p);
//如果前驱节点pred不为空且后继节点不为空,
//通过前驱节点pred的casNext方法把p的next更新为p节点
//即把元素移出后,把链接从断的地方连起来
if(pred != null && next != null)
pred.casNext(p, next);
return true;
}
//如果找不到,则一直更新p为前驱节点
pred = p;
}
return false;
}

4、size方法

1
2
3
4
5
6
7
8
9
10
11
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p)) {
if (p.getItem() != null) {
// Collections.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
}
}
return count;
}

三、总结:

从ConcurrentLinkedQueue的实现可以看出,对于无锁的队列,主要是基于UNSAFE类的CAS操作来实现的,关键的head、tail节点,和Node中的元素item、next节点都是volatile变量,这样可以保证并发修改时的内存可见性。 另外值得注意的是在offer/poll方法的实现中,都使用了一个辅助变量hops来实现对volatile变量的的读写控制,减少volatile变量的CAS操作。比如入队时,并不是每次都会casTail去设置tail节点指向尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS(默认为1)时,才会更新tail节点指向尾节点。本质上是通过增加volatile变量的读操作来减少对volatile变量的写操作来提升入队和出队的效率的。