0%

[java并发]Fork/Join Executor框架-原理

fork/join executor框架是jdk7conrrent包中新增的并发编程的一个利器,是Doug Lea的又一力作,fork/join框架的原理发表在论文A Java Fork/Join Framework上。fork/join框架充分了利用了现代处理器多核心的特点,通过work stealing(工作窃取)来达到充分利用CPU的目的。通过fork/join的机制,对大任务进行分割合并来处理复杂海量的计算任务。

在了解fork/join 框架的工作机制之前,需要先了两个关键的概念原理,deque-双端队列,work stealing-工作窃取。

一、Deque(双端队列)

在fork/join框架中,deque是由框架中的ForkJoinWorkerThread类实现的,没有采用阻塞队列LinkedBlockingDeque这个现成的实现类。LinkedBlockingDeque采用的是独占锁对队列两端的操作进行并发控制,一定程度上削弱了它的性能。另外一个原因是ForkJoinWorkerThread作为ForkJoinPool控制的线程,与一般的线程存在区别,ForkJoinWorkerThread中实现deque,可以更好的支持并发。
ForkJoinWorkerThread中的任务队列deque是通过数组来实现的,分别用queueTop和queueBase两个整型的引用指向队列的头和尾。而在行为上,deque又支持FIFO和LIFO两种操作模式。deque的工作模式如下图所示:

fj池

从图中可以看出,queueTop随着入队操作不断向右移,而入队操作只能是自己的worker线程来执行,所以queueTop是不需要进行并发控制的。queueBase对应出队操作,每把一个任务出队,它就向右移一个位置。因为队尾的任务可能被其它的线程窃取,所以它是volatile类型的变量,在修改时对其它线程可见。本地worker线程可以采用两个方式来获取任务。如果是LIFO的方式,那么它是通过queueTop来出队的,此时,队列右端类似一个栈,如果采用FIFO的方式,那么它是通过queueBase来出队的,此时就是传统的队列了。
双端队列deque明显提高了并发的性能,因为入队操作不需要锁来控制,其它线程窃取任务时,是对队列的另一端操作的,没有相互影响。对队列的操作都是通过Unsafe类提供的CAS方法来实现的,并且计算出了实际的内存地址。

二、work-stealing(工作窃取)

  work-stealing机制是fork/join框架的核心,通过ForkJoin线程池来对它的worker线程调度来实现工作窃取。从工作队列deque的结构来看,它就是要支持工作窃取这种模式的。它的实现基于以下算法:
• 每个worker线程维护着一个双端任务队列;
• 双端任务队列支持支持LIFO的push和pop操作,FIFO的take操作;
• 由给定线程在任务中生成的子任务会推到它自己的任务队列deque中。
• worker线程通过LIFO(最新的任务先出队)处理它自己的任务;
• 如果一个worker线程的任务队列没有任务处理,它会随机挑选(窃取)其它线程的任务队列中的任务来执行,通过FIFO(最早的任务先出队)的顺序来处理;
• 当一个worker线程遇到一个join操作,它会先处理其它的任务,如果合适的话,直到目标任务通知它任务完成;否则所有的任务都在不阻塞的情况下完成。
• 如果一个worker线程没有本地任务且窃取其它worker线程的任务失败,它会退出,让出控制权(通过yield,sleep或者优先级调整),它过一会儿会重试获取任务,除非所有的worker都处于类似的空闲状态。在这种情况下它们会阻塞直到顶层其它的任务被调用。

工作窃取的状态如下图:

fj2

三、fork/join

任务submit:

任务提交一般是由外部线程(比如说main线程)来执行的,并不是worker线程,所以第一次提交的任务不会进入任何worker线程的任务队列,而是进入了pool自己的任务队列。外部线程接着通知ForkJoinPool唤醒池中的线程或者创建worker线程来执行任务。worker线程会扫描pool的任务队列,获取任务来执行,执行任务时,如果任务需要分解,就会进行fork操作。

任务Fork:
任务是在执行过程中进行分裂的,当执行任务遇到一个fork操作时,表示该任务会进行分解,分解后的子任务会推到当前执行fork操作的worker线程的任务队列当中。只要任务分解没有达到指定的阀值,它都会无限地分解下去。

任务Join: 如果线程遇到一个任务进行join操作时,它会判断当前任务没有在自己队列的栈顶,如果在,就会执行该任务,否则继续判断任务是不是在自己的队列尾,如果在,就执行它;如果这两种情况都不满足,就会扫描其它worker线程,看看任务是不是被其它worker线程偷走了,如果扫描到,就执行它,否则只有阻塞等待任务执行完成。join过程这么复杂的原因是为了充分让线程进行工作,避免线程进入阻塞状态。

下面表示Foin/Join框架调度的一种可能状态:

fj3

根据步骤,模拟了一种执行的可能情况:
1).外部线程提交任务到ForkJoin池的队列中,并通知ForkJoin池创建新的worker线程;
2).worker1线程创建后,它会扫描线程池中的其它线程窃取任务,如果失败则扫描线程池本身的任务队列;
3).worker2线程从线程池队列扫描到任务,会马上执行它;
4).在执行任务时,发现任务需要fork成小任务,就把分割出来的子任务推到自己的本地任务队列中,并通知线程池唤醒其它线程或创建新线程;
5).worker1如果发现需要join任务,就从本地队列头或尾查找该任务;查找不到则扫描别的线程,如果都失败则阻塞等待通知,这种情况图中没有画出来;
6).worker1通知线程池创建了新的worker线程worker2,worker2启动后先去窃取别的worker线程的任务,比如从worker1窃取到了任务task1-1;
7).worker2窃取到任务task1-1后会执行,发现需要fork,则fork成了任务task1-1-1和task1-1-2,并推入自己的任务队列中,然后通知线程池唤醒线程或创建新worker线程;后面的执行基本按照这个流程不断进行下去,直到任务无法分解。

四、总结

可以看到,以Deque为数据结构为基础,通过work-stealing算法,可以充分利用CPU多核进行复杂计算,避免线程进入阻塞或闲置状态,使线程进行饱和工作。这对java的并发处理问题的能力是很大的提升。也避免了大范围使用锁带来的问题。但也有缺点,每个worker线程维护一个任务队列,在任务窃取时需要volatile控制,join时可能造成的阻塞,都是一定的资源消耗,但相对于它的性能提升,都是可以接受的。