Fork/join主要用于问题分解处理,分而治之。
Fork/join算法将问题划分成多个小的子问题,对每个子问题运用同样的算法,当子问题足够小时,问题就可以直接得到解决。所有子问题都解决了,结合起来父问题也就得到了解决。
JSR-166y类库对Fork/join支持相当不错,但有些问题如果不注意,还会遇到麻烦。而且还得自己处理threads、pools及synchronization barriers(同步障)。GPars隐藏了这些东西,让你用起Fork/join来更加方便。下例是统计一指定目录及其子目录下文件个数的算法:
import groovyx.gpars.AbstractForkJoinWorker
import static groovyx.gpars.Parallelizer.*
public final class FileCounter extends AbstractForkJoinWorker<Long> {
private final File file;
def FileCounter(final File file) {
this.file = file
}
protected void compute() {
long count = 0;
file.eachFile {
if (it.isDirectory()) {
println "Forking a thread for $it"
//fork一个子任务
forkOffChild(new FileCounter(it))
} else {
count++
}
}
//用子任务结果计算并保存本任务结果
setResult(count + ((childrenResults)?.sum() ?: 0))
}
}
doParallel(1) { pool -> //1个线程也能搞定
println """Number of files:
${orchestrate(
new FileCounter(
new File("...目录名...")
)
)}"""
}
上例中Parallelizer.orchestrate()方法根据传递进的AbstractForkJoinWorker类型的参数(根任务)创建一个ForkJoinOrchestrator并运行该AbstractForkJoinWorker,等待结果返回。childrenResults是AbstractForkJoinWorker的属性,等待子任务结果返回,返回值为List类型。
Fork/Join操作之所以能安全地由成若干小线程运行,要归功于内部TaskBarrier类对线程的同步。当算法中一个线程被阻塞等待其子问题得以完成时,该线程会被归还到池内,以用于任务队列中其他可运行子问题。尽管算法创建了许多任务(由于子目录很多),而且都要等其子目录任务完成,但最少只需一个线程就足以保持运算进行。
GPars指南中还给出了一个比较复杂的Mergesort的例子,有兴趣的读者可以参考。
本系列的其他文章:

最新评论
2 周 3 天之前
2 周 5 天之前
4 周 5 天之前
5 周 2 天之前
5 周 3 天之前
5 周 6 天之前
6 周 4 天之前
11 周 6 天之前
13 周 5 天之前
15 周 3 天之前