0%

[java并发]Fork/Join Executor框架-源码分析

知道了Fork/Join框架的原理后,对阅读源码应该困难小了很多。Fork/Join框架核心的类有3个,ForkJoinTask、ForkJoinWorkerThread和ForkJoinPool,另外还有两上ForkJoinTask的子类,RecursiveTask和RecursiveAction。

一、类图结构

fj4

从类图可以看出,ForkJoinPool是一个ExecutorService服务,ExecutorService的方法都适用于ForkJoinPool,而ForkJoinTask也是一个Future实现类,Future定义的方法在ForkJoinTask中都有实现。ForkJoinWorkerThread则是Thread的子类,是一个特殊的线程实现。RecursiveTask和RecursiveAction则分别继承了ForkJoinTask,RecursiveTask表示有结果的任务,而RecursiveAction则表示没有返回结果的任务,一般在使用时都会继承这两个子类来实现它的compute方法。

二、源码分析

ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask分别代表调度者、执行者和执行单元,三者的关系是密不可分的,单独实现哪个都没有意义。ForkJoinWorkerThread是连接ForkJoinPool和ForkJoinTask的纽带,下面先来看它的实现;

1、ForkJoinWorkerThread

ForkJoinWorkerThread是Thread的子类,代表执行ForkJoinTask的工作者线程。它需要维护一个双端任务队列。因此ForkJoinWorkerThread的大部分功能都与这个队列有关。双端任务队列是通过数据来实现的,它用一个queueTop int变量代表队列的头端,queueBase int变量代表尾端。

1
2
3
4
5
6
7
ForkJoinTask<?>[] queue;
//队列头索引,只有队列的本身线程能访问
int queueTop;
//队列尾索引,多线程访问
volatile int queueBase;
private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

因为queueTop只有当前的worker线程能访问,因为不需要并发控制;queueBase是多线程访问的变量,使用volatile来实现它的内存可见性。任务队列长度初始化为 1 << 13,最大不能超过1 << 24,在扩展长度时,都是以2的n次方来进进行的,主要是为了进行位算的方便性;
任务队列是个对象数组,当进行入队出队操作时就要对它进行并发更新,主要是通过Unsafe类提供的CAS操作来实现的。Unsafe的CAS操作需要计算出内存的实际偏移地址,所以在入队时可以看到一些位运算来定义内存地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
//计算出数组中对象的引用的偏移位置
//s=0时,即(((0) & (m = 2<<13 - 1)) << 2) + 16=16,s=1时,为20,即每个对象引用占用4个字节
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
//把任务加到队列中,UNSAFE.putOrderedObject采用store-store barrier,
//比volatile的store-load barrier速度更快
UNSAFE.putOrderedObject(q, u, t);
//递增
queueTop = s + 1; // or use putOrderedInt
//(s=s-queueBase) <=2,queueBase要追上queueTop时,通知线程池唤醒或创建工作线程
//queueBase要追上queueTop
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
//队列扩容
growQueue();
}
}

入队的过程实际上就是把任务添加到数据中,首先会要计算出任务要添加数组的内存地址,然后通过UNSAFE.putOrderedObject方法来插入任务;最后通知线程池signalWork来调度worker线程执行任务;如果队列已经满,则通过growQueue的方法来进行扩容。
对于出队操作,从ForkJoinPool的实现原理我们知道,它有个work-stealing机制,所以这里支持从队首出队或从队尾出队,队首出队使用队列表现得像一个栈,即LIFO; 而从队尾出现表现像队列,即FIFO;本地worker线程可以选择从队首或队尾出队,这取决于ForkJoinPool的初始化参数配置;而非本地线程只能从队尾出队。
从队首出队的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private ForkJoinTask<?> popTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
for (int s; (s = queueTop) != queueBase;) {
//算出任务所在队列索引
int i = m & --s;
//在内存中的偏移位置
long u = (i << ASHIFT) + ABASE; // raw offset
ForkJoinTask<?> t = q[i];
//被偷走了
if (t == null) // lost to stealer
break;
//把任务引用置为null
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueTop = s; // or putOrderedInt
return t;
}
}
}
return null;
}

可以看到,它操作的是queueTop的最新值,即代表最近入队的任务,计算出任务所在队列的内存偏移地址后,再通过CAS操作把任务引用置null,这里要注意的是任务可以被别的worker线程偷走的情况 ,所以需要不停遍历队列直至找到任务,如果最后找不到任务,返回null。
从队尾出队的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final ForkJoinTask<?> deqTask() {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
//queueTop != (b = queueBase)队列不为空
if (queueTop != (b = queueBase) &&
(q = queue) != null && // must read q after b
//计算数组索引
(i = (q.length - 1) & b) >= 0 &&
//再次检查queueBase == b,以防止已经有线程修改queueBase
(t = q[i]) != null && queueBase == b &&
//把对应位置引用设置为null
UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
queueBase = b + 1;
return t;
}
return null;
}

可以看出,主要操作的是queueBase变量,同样需要计算出任务所以数组的内存地址,然后再进行一个re-check操作,检查queueBase变量是否发生变化,因为queueBase是volatile的,有变化都会更新;最后进行CAS设置任务引用为null,queueBase变量+1;
另外还有类似的一个locallyDeqTask()方法,只有本地线程能调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final ForkJoinTask<?> locallyDeqTask() {
ForkJoinTask<?> t; int m, b, i;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
while (queueTop != (b = queueBase)) {
if ((t = q[i = m & b]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
t, null)) {
queueBase = b + 1;
return t;
}
}
}
return null;
}

提供给ForkJoinTask的出队方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//从本地队列中获取task或者从别的队列中窃取task
final ForkJoinTask<?> pollTask() {
ForkJoinWorkerThread[] ws;
//从线程自己的队列中获取task,有则返回
ForkJoinTask<?> t = pollLocalTask();
if (t != null || (ws = pool.workers) == null)
return t;
//从别的线程窃取任务
int n = ws.length; // cheap version of FJP.scan
int steps = n << 1;
//获取种子
int r = nextSeed();
int i = 0;
//以线程数的2倍循环获取别的线程任务
while (i < steps) {
//随机种子的作用是为了在轮询其它worker线程时,更均匀
ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
if (w != null && w.queueBase != w.queueTop && w.queue != null) {
if ((t = w.deqTask()) != null)
return t;
i = 0;
}
}
return null;
}

线程初始化和执行:
构造方法初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected ForkJoinWorkerThread(ForkJoinPool pool) {
//线程的名字
super(pool.nextWorkerName());
this.pool = pool;
//在线程池中注册
int k = pool.registerWorker(this);
poolIndex = k;
eventCount = ~k & SMASK; // clear wait count
locallyFifo = pool.locallyFifo;
Thread.UncaughtExceptionHandler ueh = pool.ueh;
if (ueh != null)
setUncaughtExceptionHandler(ueh);
//设置为守护线程
setDaemon(true);
}

onStart时初始化:

1
2
3
4
5
6
7
protected void onStart() {
//初始化任务队列
queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
//生成种子
int r = pool.workerSeedGenerator.nextInt();
seed = (r == 0) ? 1 : r; // must be nonzero
}

种子的作用主要是在线程池扫描worker线程时,能够均匀地扫描。
执行线程任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
Throwable exception = null;
try {
//初始化
onStart();
//开始工作
pool.work(this);
} catch (Throwable ex) {
exception = ex;
} finally {
onTermination(exception);
}
}

给ForkJoinTask提供的joinTask方法,用于合并任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final int joinTask(ForkJoinTask<?> joinMe) {
//获取当前任务,并把join的任务作为当前任务
ForkJoinTask<?> prevJoin = currentJoin;
currentJoin = joinMe;
for (int s, retries = MAX_HELP;;) {
//joinMe任务已经完成或取消、异常,把currentJoin引用重置回去
if ((s = joinMe.status) < 0) {
currentJoin = prevJoin;
return s;
}
if (retries > 0) {
if (queueTop != queueBase) {
//判断任务是否在自己的worker线程队列的队首
//有则执行
if (!localHelpJoinTask(joinMe))
retries = 0; // cannot help
}
//retries达到8次
else if (retries == MAX_HELP >>> 1) {
--retries; // check uncommon case
//如果要执行的任务在某个worker线程的队尾,则偷过来执行
if (tryDeqAndExec(joinMe) >= 0)
//status>0表示任务没执行结束,则让出当前线程控制权
Thread.yield(); // for politeness
}
else
//helpJoinTask方法检查当前任务是不是被某个Worker线程偷走了
//并且是这个线程最新偷走的任务(currentSteal),如果是的话,
//当前线程帮助执行这个任务,这个过程成功则返回true
retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
}
else {
retries = MAX_HELP; // restart if not done
//阻塞等待
pool.tryAwaitJoin(joinMe);
}
}
}

以上代码主要执行以下逻辑:
如果任务已经完成,则返回任务状态;
如果任务在自己任务队列的首部,则执行它;
如果任务在任务的尾部,则执行 ;
以上两种情况都不满足,扫描别的worker线程的任务队列,发现的话就执行它;
最后如果都不满足则调用线程池的tryAwaitJoin方法陷入阻塞等待;

给线程池扫描时提供的execTask方法,用于执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
//死循环执行
for (;;) {
//首先执行偷来的任务
if (t != null)
t.doExec();
//接着执行自己任务队列中的任务,直到队列中没有任务
if (queueTop == queueBase)
break;
//根据locallyFifo设置,决定是FIFO方法获取任务还是LIFO方式获取
t = locallyFifo ? locallyDeqTask() : popTask();
}
//更新偷任务的计数
++stealCount;
currentSteal = null;
}

2、ForkJoinTask

ForkJoinTask代表一个可分割和合并的任务,它是Future的实现类,所以它可以代表一个异步计算任务,具有get/cancel/isDone/isCancelled一些方法。ForkJoinTask通过一个volatile的status整型变量来代表任务状态,它定义的任务状态有以下4种:

1
2
3
4
5
volatile int status; // accessed directly by pool and workers
private static final int NORMAL = -1;
private static final int CANCELLED = -2;
private static final int EXCEPTIONAL = -3;
private static final int SIGNAL = 1;

小于0的状态都代表任务完成态,分别代表正常完成、被取消和异常状态,SIGNAL代表任务处于阻塞等待通知状态,另外还有0代表初始状态。
ForkJoinTask最主要的两个方法就是fork和join了,对任务进行分割和合并:
fork方法:

1
2
3
4
5
6
public final ForkJoinTask<V> fork() {
//把任务添加到当前线程的双端队列
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}

fork方法比较简单,只是把任务推到当前worker线程的任务队列中。

join方法:

1
2
3
4
5
6
7
8
public final V join() {
//返回非正常的结果
if (doJoin() != NORMAL)
return reportResult();
else
//返回正常的结果
return getRawResult();
}

调用核心的doJoin方法后,如果返回非正常结果,比如任务被取消,执行过程中发生异常等,则调用reportResult方法处理,否则调用getRawResult方法处理。
核心的doJoin方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
//当前线程为ForkJoinWorkerThread
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0)
return s;
//任务是不是在队列头
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
//如果当前任务在队列头,直接由当前线程执行,执行任务,由子类实现
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
//如果任务完成,设置任务状态
if (completed)
return setCompletion(NORMAL);
}
//任务不在队列头,调用joinTask
return w.joinTask(this);
}
else
//外部线程等待任务结束 的情况
return externalAwaitDone();
}

doJoin方法首先判断执行的线程是不是worker线程,是的话尝试从它的任务队列首尾出队一个任务,成功则执行该任务,否则调用worker线程的joinTask方法;如果不是worker线程进行join任务,则调用externalAwaitDone进行等待;
另外ForkJoinTask类还提供invoke/invokeAll等方法用于执行任务;

三、ForkJoinPool

ForkJoinPool的主要工作是对worker线程进行调度,是3个类中实现最复杂的一个,其中为了维护池的状态和worker线程组并发更新,用了大量的位运算。ForkJoinPool也是执行work-stealing算法的主要场所。ForkJoinPool通过一个volatile的64位的long变量ctl来表示池的状态,把它按位切割成了几个部分,每个部分代表了不同的含义:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* AC 16位 (48-63) :正在运行的worker线程数据减去系统的并发数(正在等待资源的线程数)
* TC 16位 (32-47) :所有worker线程数据减去系统的并发数
* ST 1位(31):1表示线程池正在关闭
* EC 15位(16-30): 第一个等待线程的等待数
* ID 16位(0-15):Treiber栈(存储等待线程)顶的worker线程在线程队列中的索引
* 当ac为负数,表示没有足够活动的workers;
* 当tc为负数,表示没有足够total workers;
* 当id为负数,表示至少有一个等待的worker;
* 当e为负数,表示池已经终止。
*/
volatile long ctl;

为了控制worker数组的并发更新,定义了32位的scanGuard,同样按拉把它切成了3个部分:

1
2
3
4
5
//更新worker数组的保护变量,16位为标识位SG_UNIT,当为1时表示锁住;
//当锁住时,其它线程无法更新worker数组
//0-15为代表worker线程数组的数量,为2的n次方-1
volatile int scanGuard;
private static final int SG_UNIT = 1 << 16;

ForkJoinPool本身也维护着一个任务队列,与worker线程类似,主要是用来放置外部线程提交的任务,通过lock来实现并发控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Array serving as submission queue. Initialized upon construction.
*/
private ForkJoinTask<?>[] submissionQueue;

/**
* Lock protecting submissions array for addSubmission
*/
private final ReentrantLock submissionLock;

/**
* Condition for awaitTermination, using submissionLock for
* convenience.
*/
private final Condition termination;

下面分析ForkJoinPool的主要方法:
任务提交:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
return task;
}
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
ForkJoinWorkerThread w;
Thread t = Thread.currentThread();
if (shutdown)
throw new RejectedExecutionException();
//提交任务的线程为ForkJoinWorkerThread并且是属于当前池的线程
if ((t instanceof ForkJoinWorkerThread) &&
(w = (ForkJoinWorkerThread)t).pool == this)
//把任务提交到worker线的任务队列
w.pushTask(task);
else
//添加到池中的任务队列
addSubmission(task);
}

首先会判断任务是否为worker线程提交,如果是则直接把任务推到它的任务队列中;否则提交到线程池的任务队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void addSubmission(ForkJoinTask<?> t) {
//获取Lock 并加锁
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
ForkJoinTask<?>[] q; int s, m;
//把任务加到池的队列中
if ((q = submissionQueue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
//扩容
if (s - queueBase == m)
growSubmissionQueue();
}
} finally {
lock.unlock();
}
//通过worker线程开始工作
signalWork();
}

addSubmission方法通过ReentrantLock来控制添加到线程池的任务队列,因为外部线程可能存在并发提交的情况。最后通过signalWork方法通知线程唤醒worker线程或者添加新的worker线程来工作;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final void signalWork() { 
long c; int e, u;
while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
(INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
if (e > 0) { // release a waiting worker
int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
if ((ws = workers) == null ||
(i = ~e & SMASK) >= ws.length ||
(w = ws[i]) == null)
break;
long nc = (((long)(w.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
if (w.eventCount == e &&
UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
w.eventCount = (e + EC_UNIT) & E_MASK;
//如果线程阻塞中,则唤醒它
if (w.parked)
UNSAFE.unpark(w);
break;
}
}
else if (UNSAFE.compareAndSwapLong
(this, ctlOffset, c,
(long)(((u + UTC_UNIT) & UTC_MASK) |
((u + UAC_UNIT) & UAC_MASK)) << 32)) {
//添加新的worker线程
addWorker();
break;
}
}
}

signalWork方法先通过一系列位运算来获取池的状态进行判断,如果池中有阻塞的空闲线程则唤醒它;否则添加一个新的worker线程;signalWork在worker线程把任务推到自己的任务队列(pushTask())时也会进行调用。

addWorker方法添加了新的worker线程后会调用t.start方法启动线程,即worker的run方法会被调用,执行pool.work方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final void work(ForkJoinWorkerThread w) {
boolean swept = false; // true on empty scans
long c;
//worker循环执行任务
//(int)(c = ctl) >= 0表示ST位不为0,即线程池不在关闭状态
while (!w.terminate && (int)(c = ctl) >= 0) {
int a; //AC // active count
//swept为false有3种情况:
//首次循环;scan返回false; tryAwaitWork=true
//a = (int)(c >> AC_SHIFT)右移48位得到活动的worker线程数
if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
swept = scan(w, a);
else if (tryAwaitWork(w, c))//把worker放到等待queue中,等待eventCount改变
swept = false;
}
}

work方法包含了worker线程最外层的while循环,它首先通过位运算得到池的状态和线程相关信息,然后调用scan方法扫描整个线程池的worker数组的任务队列和线程池的任务队列来执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//扫描任务并执行,窃取其它线程中的task或pool中的task来执行
//如果有任务执行了,返回false,告诉worker方法循环继续,否则返回true,调用tryAwaitWork等待
private boolean scan(ForkJoinWorkerThread w, int a) {
int g = scanGuard; // mask 0 avoids useless scans if only one active
//parallelism == 1 - a表示parallelism=1,a=0
//blockedCount表示没有线程因为join被阻塞
//同时满足表示没有任务worker线程在运行中,即m=0;
//g & SMASK返回g变量0-15位的值,即worker数组的数量
int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws == null || ws.length <= m) // staleness check
return false;
//窃取其它线程中的task并执行
for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
//随机获取一个worker线程
ForkJoinWorkerThread v = ws[k & m];
//(b = v.queueBase) != v.queueTop当前worker线程v任务队列不为空
// (i = (q.length - 1) & b) >= 0获取到的任务队列下标>0
if (v != null && (b = v.queueBase) != v.queueTop &&
(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
//定位任务的内存偏移位置
long u = (i << ASHIFT) + ABASE;
//(t = q[i]) != null当前索引有任务
//v.queueBase == b任务没被偷走
//CAS设置数组任务引用为null,表示拿走任务
if ((t = q[i]) != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
int d = (v.queueBase = b + 1) - v.queueTop;
//偷到了,告诉这个被偷线程是池中哪个线程偷走它的任务
v.stealHint = w.poolIndex;
//如果被偷线程的任务队列中还有任务,告诉别的线程也来偷。。
if (d != 0)
signalWork(); // propagate if nonempty
//执行偷到的任务
w.execTask(t);
}
//改变当前线程的种子
r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
//false表示扫描到了任务,外层循环可以继续扫描
return false; // store next seed
}
else if (j < 0) { // xorshift
r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
}
else
++k;
}
//如果扫描不到任务,但scanGuard被更新了,说明有任务的变化
//返回false让外层循环继续扫描
if (scanGuard != g) // staleness check
return false;
else { //从pool的队列中获取task并执行 // try to take submission
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
if ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
//获取到了池中任务队列中的队尾任务并执行
queueBase = b + 1;
w.execTask(t);
}
return false;
}
return true; // all queues empty
}
}

scan方法实现非常复杂,但基本的逻辑是先随机获取一个worker线程,然后窃取它的任务来执行,如果失败则扫描线程池的任务队列。如果能找到任务执行都返回false,否则返回true。返回false会导致work方法继续进行while循环扫描,直到返回true或活动线程数<=0;此时,会调用tryAwaitWork把当前worker线程park起等待唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
//
int v = w.eventCount;
//强转,丢失高位信息
//(int)c后得到等待线程的信息
w.nextWait = (int)c; // w's successor record
//正在运行的线程数量减1,即AC值减1
long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
//如果ctl发生了变化
if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
//ctl != c 表示第一个等待线程的信息发生了变化
//!UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)表示增加了的线程数正在减少
long d = ctl; // return true if lost to a deq, to force scan
//返回true,强制外层循环再次扫描
return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
}
for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount
long s = stealCount;
//把w的stealCount设置到pool的stealCount,并且设置w.stealCount=0
if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
sc = w.stealCount = 0;
else if (w.eventCount != v)//w的eventCount发生变化,则下次更新stealCount
return true; // update next time
}
//!shutdown || !tryTerminate(false))表示线程池没有关闭
//(int)c != 0表示有worker线程正在运行
//parallelism + (int)(nc >> AC_SHIFT) == 0表示活跃线程数为0
// blockedCount == 0表示正在join等待的线程为0
//quiescerCount==0表示quiesce线程池中的数量为0
if ((!shutdown || !tryTerminate(false)) &&
(int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
blockedCount == 0 && quiescerCount == 0)
//闲置等待如果没有任务则关闭线程
idleAwaitWork(w, nc, c, v); // quiescent
for (boolean rescanned = false;;) {
if (w.eventCount != v)
return true;
//尝试把当前线程从等待队列中移除
//一旦移除成功,eventCount发生变化,返回true
if (!rescanned) {
int g = scanGuard, m = g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws != null && m < ws.length) {
rescanned = true;
for (int i = 0; i <= m; ++i) {
ForkJoinWorkerThread u = ws[i];
if (u != null) {
if (u.queueBase != u.queueTop &&
!tryReleaseWaiter())
rescanned = false; // contended
if (w.eventCount != v)
return true;
}
}
}
if (scanGuard != g || // stale
(queueBase != queueTop && !tryReleaseWaiter()))
rescanned = false;
if (!rescanned)//让出控制权,减小冲突
Thread.yield(); // reduce contention
else//在park之前清除中断状态
Thread.interrupted(); // clear before park
}
else {
w.parked = true; // must recheck
//再次检查要不要park
if (w.eventCount != v) {
w.parked = false;
return true;
}
LockSupport.park(this);
//从park返回
rescanned = w.parked = false;
}
}
}

tryAwaitWork方法会进行多次检查,目的是最后挣扎一下,不想进入park(阻塞)状态。在worker线程进行joinTask方法中,如果合并失败,会进入tryAwaitJoin方法,也是线程池提供的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
int s;
//清除中断标识,因为tryAwaitDown可能产生中断异常
Thread.interrupted(); // clear interrupts before checking termination
//任务没有完成
if (joinMe.status >= 0) {
//blockCount加1,标识当前线程为阻塞
//
if (tryPreBlock()) {
//调用wait等待任务完成
joinMe.tryAwaitDone(0L);
//完成后blockCount-1,标识当前线程为活跃状态
postBlock();
}
else if ((ctl & STOP_BIT) != 0L)//线程池关闭状态,取消任务
joinMe.cancelIgnoringExceptions();
}
}

最后看看ForkJoinPool的初始化过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
Thread.UncaughtExceptionHandler handler,
boolean asyncMode) {
checkPermission();
if (factory == null)
throw new NullPointerException();
//MAX_ID=1<<15-1
if (parallelism <= 0 || parallelism > MAX_ID)
throw new IllegalArgumentException();
this.parallelism = parallelism;
this.factory = factory;
this.ueh = handler;
//worker队列使用LIFO还是FIFO
this.locallyFifo = asyncMode;
long np = (long)(-parallelism); // offset ctl counts
//初始化ctl
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
//ForkJoin Pool任务队列长度
this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
// initialize workers array with room for 2*parallelism if possible
int n = parallelism << 1;
if (n >= MAX_ID)
n = MAX_ID;
else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
}
//worker线程数组初始化,2的倍数
workers = new ForkJoinWorkerThread[n + 1];
this.submissionLock = new ReentrantLock();
this.termination = submissionLock.newCondition();
StringBuilder sb = new StringBuilder("ForkJoinPool-");
sb.append(poolNumberGenerator.incrementAndGet());
sb.append("-worker-");
this.workerNamePrefix = sb.toString();
}

ForkJoinPool还提供了ExecutorService的一些通用方法,这里不再分析。

四、总结

从ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask源码实现可以看到,三者之前存在互相依赖的关系,且实现比ThreadPoolExecutor更加复杂;在并发控制时较少使用lock相关的API。