0%

Java[并发]AQS实现源码分析

AQS主要是通过同步队列,独占式同步状态获取、释放和共享式同步状态获取、释放及超时状态获取同步等核心数据结构和模板方法。

一、同步队列(CLH lock queue)的结构

​ 主要由Node内部类来实现构成同步队列的节点,Node本身维护了几种状态:
​ • CANCELLED : 代表线程已经取消
​ • SIGNAL:代表后继节点需要唤醒
​ • CONDITION:代表线程在condition queue中等待某一条件
​ • PROPAGATE:代表后继节点会传播唤醒的操作,在共享模式中使用
​ • INITIAL:初始状态
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** 代表线程已经取消*/
static final int CANCELLED = 1;
/** 代表后继节点需要唤醒 */
static final int SIGNAL = -1;
/** 代表线程在condition queue中等待某一条件 */
static final int CONDITION = -2;
/**代表后继节点会传播唤醒的操作,在共享模式中使用*/
static final int PROPAGATE = -3;
//等待状态,上面的几种状态
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//获取同步状态的线程
volatile Thread thread;
//等待队列中的后继节点
Node nextWaiter;

同步队列的结构如下图:

javaconcurrrent

同步队列的出队操作:

1
2
3
4
5
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

将头节点移除即可。
同步队列的入队操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//优化:只有当compareAndSetTail失败才会进行enq(node)方法循环
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
//进入自旋
for (;;) {
Node t = tail;
if (t == null) { // 没有头节点时,初始化
Node h = new Node(); //设置一个虚拟的头节点
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
//把尾节点作为插入节点的头节点
node.prev = t;
//设置成为尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

二、独占式同步状态的获取和释放

独占式同步状态的获取源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final void acquire(int arg) {
//独占式获取,tryAcquire由子类实现不会阻塞线程,如果返回true,则继续
//如果返回false则加入阻塞队列阻塞线程,并等待前继节点释放锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//acquireQueued返回true,说明当前线程被唤醒后获取到锁
//重置中断状态为true
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
try {
//等待前继节点释放,自旋re-check
boolean interrupted = false;
for (;;) {
//获取前继节点
final Node p = node.predecessor();
//前继节点是head,说明当前节点是next,则尝试获取锁
if (p == head && tryAcquire(arg)) {
//前继节点出列,node成为head
setHead(node);
p.next = null; // help GC
return interrupted;
}
//p不是前继节点且tryAcquire获取锁失败,
//那么阻塞当前线程并等待前继唤醒,阻塞之前再重试一下
//并设置前继的waitStatus为SIGNAL
//线程阻塞在parkAndCheckInterrupt方法中,
//parkAndCheckInterrupt返回可能是前继unpark或线程被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//说明当前线程是被中断唤醒的,线程被中断会继续走到if判断,也就是忽略中断
//除非中断后碰巧acquire成功,那么需要重新设置线程的中断状态selfInterrupted
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}

如果tryAcquire成功则马上返回,失败则acquireQueued加入同步队列,在这过程中线程可能被反复地阻塞和唤醒直到tryAcquire成功,这是因为线程可能被中断。而acquireQueued方法保证忽略中断,只有tryAcquire成功才返回。其中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
//前继节点已经设置好unpark了,所以后继可以安全的park
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
//跳过被取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//0或者PROPAGATE(CONDITION状态用在ConditionObject,这里不会是CONDITION这个值)
//waitStatus=0(初始化)或PROPAGATE,会先重试确定无法acquire到再park
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//更新前继节点状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//park并且设置线程的interrupted
LockSupport.park(this);
return Thread.interrupted();
}

shouldParkAfterFailedAcquire方法的主要作用是:1)确定后继是否需要park;2)略过被取消的节点;3)设置前驱节点的waitStatus为SIGNAL。parkAndCheckInterrupt作用就是阻塞线程,线程被唤醒只可能是被unpark,被中断或伪唤醒。被中断acquire方法返回前会通过selfInterrupted设置interrupted状态,伪唤醒会通过for循环re-check

共享状态的获取流程如下图:

javacon

独占式同步状态的释放,源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final boolean release(int arg) {
//tryRelease由子类实现,通过设置state值来达到同步的效果
if (tryRelease(arg)) {
Node h = head;
//waitStatus=0说明是初始化的空队列
if (h != null && h.waitStatus != 0)
//唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*

* If status is negative (i.e., possibly needing signal) try
clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

​ int ws = node.waitStatus;
​ if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);

}

释放比较简单,在共享同步变量释放后,最后调用Locksupport.unpark方法使parkAndCheckInterrupt方法返回。

三、共享式同步状态获取和释放

先看共享式同步状态获取的源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public final void acquireShared(int arg) {
//如果没有许可则入队等待
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//添加到队列
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
//等待前继释放并传播
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//尝试获取许可
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取成功则前继出队,跟独占不同的是会继续向后继节点传播唤醒操作,
//保证剩下的线程能尽快地获取剩下的许可。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
return;
}
}
//p不是头节点且获取不到许可
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
/**
* 尝试唤醒后继节点,propagate>0说明许可还有能继续被线程acquires
* 或者之前的head被设置为PROPAGATE(PROPAGATE或以转换为SIGNAL)
* 或者为null。检查有些保守,在多竞争时,可能出现不必要的唤醒
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//唤醒后继节点
doReleaseShared();
}
}

doAcquiredShare方法与独占式很像,唯一区别在setHeadAndPropagate方法,这个方法将Node设置为Head后,如果发现还有许可,会继续释放给后面的节点,后面的节点会把这个动作传播下去。

共享式同步状态释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
//队列不为空且有后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;
//不管共享还是独占模式,只要节点waitStatus为SIGNAL状态都唤醒
if (ws == Node.SIGNAL) {
//将waitStatus设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果循环过程中head节点被修改了则重试
if (h == head) // loop if head changed
break;
}
}

四、独占式超时获取同步状态

独占式超时获取同步状态的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
if (nanosTimeout <= 0) {
cancelAcquire(node);
return false;
}
if (nanosTimeout > spinForTimeoutThreshold &&
shouldParkAfterFailedAcquire(p, node))
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
//计算时间,当前now减去睡眠之前的时间lastTime得到已经睡眠的时间delta,
//然后被原有超时时间nanosTimeout减去,得到了还应该睡眠的时间
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
}

该方法在自旋过程中,获取前驱节点时,如果成功则返回,这个和独占式是一样的,不同的地方在失败处理上。如果当前线程获取同步状态失败,则判断是否超时(当nanosTimeout 小于等于0时表示已经超时),如果没有超时,重新计划超时时间nanosTimeout ,然后当前线程等待nanosTimeout 纳秒(当已到设置的超时时间,线程会从LockSupport.parkNanos(this, nanosTimeout)方法返回),
如果nanosTimeout 小于等于spinForTimeoutThreshold (1000纳秒)时,将不会使该线程进行超时等待,而是进入快速自旋过程。
独占式超时获取同步状态的流程如下图:

javacon2