0%

Java[并发]ReentrantReadWriteLock实现分析

ReentrantLock是排他锁,也就是同一时间也能有一个线程进行访问,而读写锁可以同一时间有多个读线程进行访问,而写线程访问时,所有读线程和其它写线程会被阻塞。读写锁维护了一对锁,一个读锁一个写锁,通过分离读写,使并发能力得到很大提高。
需要注意的是ReentrantReadWriteLock实现的是ReadWriteLock接口而不是Lock接口。

一、ReentrantReadWriteLock的特征

​ • 公平性:支持非公平(默认)和公平的获取锁方式,吞吐量还是非公平锁优于公平锁;
​ • 重进入:该锁支持重进入。读线程获取读锁后,能够再次获取读锁;写线程在获取写锁后能再次获取写锁,同时也能获取读锁;
​ • 锁降级:遵循获取写锁、获取读锁再释放写锁的顺序,写锁能降级成为读锁。
​ • 锁中断:读写锁都支持获取锁期时被中断;
​ • 重入数:读锁和写锁的重入次数都为65535.

二、ReentrantReadWriteLock的实现

1、读写的状态(state变量)设计
读写锁的状态同样是依赖于内部同步器的状态,也就是int型的state变量,读写锁的实现是把state变量切割为两个部分,高16位代表读,低16位代表写。如下图:

java-concur

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sync内部类的代码为:
/**

* 锁的状态变量被逻辑地划分为两个部分,即int型的32位变量state划分为
* 两个部分,低位代表独占(写)锁的计数,高位代表共享(读)锁的计数
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 返回代表读锁的数量 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/**返回代表写锁的数量 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

2、写锁的获取和释放:
写锁获取的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final boolean tryAcquire(int acquires) {
/*
* 走查:
* 1.如果读的计数非0或者写的计数非0,并且线程非持有锁的线程,失败;
* 2.如果计数已经到达最大,失败(只有计数为非0时发生)
* 3.否则,如果线程是重入或等待队列允许获取lock的线程,
* 那么更新共享变量state和设置它有持有锁的线程
*/
Thread current = Thread.currentThread();
int c = getState();
//获取写锁计数
int w = exclusiveCount(c);
if (c != 0) {
// 存在读锁或当前线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
//1、写锁计数为0且需要阻塞写线程
//2、CAS c变量的值失败时,tryAcquire失败
if ((w == 0 && writerShouldBlock(current)) ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

1)、持有锁的线程数不为0(c=getState()),如果写线程数(w=0)为0,则读线程数不为0,并且当前线程不是占有锁的线程,返回失败。
2)、如果写锁的数量>写锁的最大数量限量,抛出异常;
3)、如果写锁为0,并且需要等待(公平锁),或者CAS设置state状态变量失败,返回失败;
4)、否则成功,设置当前线程为锁的占有线程;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
int nextc = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//如果写锁独占计数已经为0,则释放同步状态
if (exclusiveCount(nextc) == 0) {
setExclusiveOwnerThread(null);
setState(nextc);
return true;
} else {
setState(nextc);
return false;
}
}

释放的过程基本和独占锁ReentrantLock差不多。

3、读锁的获取和释放
读锁的获取代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final int tryAcquireShared(int unused) {
/*
* 1.如果写锁被其它线程持有,失败;
* 2.如果达到最大计数,抛出错误;
* 3.否则,如果当前线程是写锁状态,判断是否需要阻塞,
* 如果不需要,则通过CAS状态和更新读写计数器。
* 注意:没有检查重入请求,这个在fullTryAcquireShared中进行。
* 4.如果步骤3失败,则fullTryAcquireShared中循环CAS直至成功
*/
Thread current = Thread.currentThread();
int c = getState();
//写线程持有锁,且独占线程不是当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//读线程不需要等待且CAS同步状态成功
if (!readerShouldBlock(current) &&
compareAndSetState(c, c + SHARED_UNIT)) {
//从当前线程中获取读锁的计数器,增加读锁的计数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
rh.count++;
return 1;
}
//CAS失败,则至成功
return fullTryAcquireShared(current);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
//获取当前读锁计数器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
for (;;) {
int c = getState();
int w = exclusiveCount(c);
//写线程持有写锁且当前线程不是独占线程
//或者读线程持有锁,且需要等待
if ((w != 0 && getExclusiveOwnerThread() != current) ||
((rh.count | w) == 0 && readerShouldBlock(current)))
return -1;
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//CAS设置同步状态
if (compareAndSetState(c, c + SHARED_UNIT)) {
cachedHoldCounter = rh; // cache for release
rh.count++;
return 1;
}
}
}

以上获取读锁的代码流程为:
1)、如果写线程持有锁(w!=0),且独占线程不是当前线程,返回失败;因为允许写线程获取写锁同时能获取读锁。
2)、如果读线程的请求锁数据达到65535(包括重入锁),则返回失败;
3)、如果读线程不需要等待(非公平锁)且CAS设置状态变量成功,则返回成功;否则进行4);
4)、步骤3失败的原因是CAS修改状态变量失败,则需要循环去设置状态变量直到成功或被锁被写进线持有。

读锁的释放过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryReleaseShared(int unused) {
//获取线程局部变量中的计数器HoldCounter
HoldCounter rh = cachedHoldCounter;
Thread current = Thread.currentThread();
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
//计数器减1
if (rh.tryDecrement() <= 0)
throw new IllegalMonitorStateException();
for (;;) {
int c = getState();
//state的高16位描述读锁的数量,SHARED_UNIT=1<<16
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

1)、获取读锁的计数器HoldCounter,然后减1;
2)、循环CAS设置state的高16位状态,直到为0;

另外同步器Sync的实现中还实现了tryReadLock和tryWriteLock两个方法,分别对应ReadLock的tryLock和WriteLock的tryLock。
tryReadLock成功的条件:没有写入锁或写入锁是当前独占线程 ,且读线程共享锁的数据不大于65535个;
tryWriteLock的成功条件是:没有写入锁或独占线程是当前线程,且1次修改state变量成功;

三、ReentrantReadWriteLock的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Cache {

static Map<String,Object> map = new HashMap<String,Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock rlock = rwl.readLock();
static Lock wlock = rwl.writeLock();

public static final Object get(String key){
rlock.lock();
try{
return map.get(key);
}finally{
rlock.unlock();
}
}

public static final void put(String key,Object value){
wlock.lock();
try{
map.put(key, value);
}finally{
wlock.unlock();
}
}

public static final void clear(){
wlock.lock();
try{
map.clear();
}finally{
wlock.unlock();
}
}

}