ConcurrentLinkedQueue是采用循环CAS来实现的非阻塞线程安全的队列,它是基于链接节点的无界队列,采用”wait-free”算法,即循环CAS来实现。ConcurrentLinkedQueue队列不能存储Null元素。
一、ConcurrentLinkedQueue的结构
ConcurrentLinkedQueue中包含head和tail两个节点,分别指向首节点和尾节点,而它的类型是Node,一个内部类实现。每个Node节点由next节点和它的内容item组成。节点和节点之间通过next引用链接,形成链表结构。默认head节点为null,tail指向head节点。类图如下所示:
可以看到,关键的head,tail引用都是volatile的,说明对它们的修改是内存可见的。
ConcurrentLinkedQueue具有队列常用的方法offer、poll、add、remove、contains、size、peek等
二、重点方法的实现
1、入队操作-offer的实现
入队操作主要做两件事:第一是定位出尾节点;第二是利用CAS算法将入队节点设置成尾节点的next节点,如果失败则循环重试。
入队的代码如下:
1 | public boolean offer(E e) { |
从代码可以看出,它的实现流程如下:
1).如果入队的元素为空,则抛出异常;否则,创建一个入队的节点n;
2).开始死循环,入队不成功则继续入队;
3).创建节点t指向tail引用,创建节点p作为尾节点,默认为t;
4).获取尾节点p的next节点,如果不为null则表示已经有别的线程修改了tail节点,如果循环2次以上,t节点还不等于tail节点,则返回从头循环,否则把尾节点p指向next节点继续循环;
5).如果尾节点p的next节点为null,则使用CAS方法casNext把入队节点更新为p的next节点,成功后如果发现hops变量已经大于HOPS常量,说明tail节点已经不是指向尾节点,则需要把tail指向入队的n节点,通过CAS方法casTail实现。这个过程失败也不会有影响,说明有别的线程成功更新了tail节点;
6).如果casNext失败,则需要把p指定它的后继节点,继续循环;
注意的是,入队方法永远返回true,所以不能利用它的结果来判断入队是否成功;
2、出队操作-poll的实现
出队就是把头节点的元素返回,并清空它的引用。并不是每次都更新head节点,当head节点有元素时,直接弹出元素,如果节点为空,才会更新head节点。出队操作的代码如下:
1 | public E poll() { |
从代码看出,它的实现流程如下:
1).设置h节点指向head节点,设置p节点为首节点,并默认为指向h;
2).如果辅助变量hops循环,直至成功出队;
3).获取首节点p中的元素,如果不为null,则通过CAS方法casItem,把p中的元素设置为null; 判断hops变量是否大于HOPS,如果大于,说明当前的head节点已经不是指向首节点,则需要把首节点p的下一个节点设置为head节点;
4).如果获取首节点p的中元素为null,说明已经有别的线程把首节点出队,则需要获取p的后继节点,如果后继节点也为null,说明队列已经空了,需要把p更新为h节点;,并且跳出循环;
5).如果队列不为空,则设置next节点为p节点,继续循环。
3、remove方法
remove方法可以删除队列中的某个元素的实例,它的代码如下:
1 | public boolean remove(Object o) { |
4、size方法
1 | public int size() { |
三、总结:
从ConcurrentLinkedQueue的实现可以看出,对于无锁的队列,主要是基于UNSAFE类的CAS操作来实现的,关键的head、tail节点,和Node中的元素item、next节点都是volatile变量,这样可以保证并发修改时的内存可见性。 另外值得注意的是在offer/poll方法的实现中,都使用了一个辅助变量hops来实现对volatile变量的的读写控制,减少volatile变量的CAS操作。比如入队时,并不是每次都会casTail去设置tail节点指向尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS(默认为1)时,才会更新tail节点指向尾节点。本质上是通过增加volatile变量的读操作来减少对volatile变量的写操作来提升入队和出队的效率的。