Parallelizer给集合和对象提供了基于ParallelArray(JSR-166y)的DSL,需要jsr166y-070108.jar(Groovy的lib目录下已经包含了该文件)。
1.Parallelizer类
来看两个样例:
import java.util.concurrent.atomic.AtomicInteger
import groovyx.gpars.Parallelizer
//summarize numbers concurrently
//并发汇总列表数值。闭包中的集合具有了eachParallel()、collectParallel()
//及其它ParallelArrayUtil方法
Parallelizer.doParallel {
final AtomicInteger result = new AtomicInteger(0)
//注意:AtomicInteger.addAndGet(int delta)以原子方式将给定值与当前值相加
[1, 2, 3, 4, 5].eachParallel {result.addAndGet(it)}
assert 15 == result.get()
}
//multiply numbers asynchronously
//异步处理多个数值。
Parallelizer.doParallel {
final List result = [1, 2, 3, 4, 5].collectParallel {it * 2}
assert [2, 4, 6, 8, 10] == result
}
import groovyx.gpars.Parallelizer
import jsr166y.forkjoin.ForkJoinPool
//check whether all elements within a collection meet certain criteria
//检查是否集合中所有元素都符合(闭包中)标准
Parallelizer.doParallel(5) {ForkJoinPool pool ->
assert [1, 2, 3, 4, 5].everyParallel {it > 0}
assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
}
Parallelizer类中还有一个withExistingParallelizer()方法与doParallel()方法功能类似,区别是:withExistingParallelizer()复用已有的ForkJoinPool实例,而且不会等待pool线程都完成就返回了;而doParallel()方法则创建一个新的ForkJoinPool实例,只有在工作线程都完成且pool销毁之后才返回。它们都会把该ForkJoinPool实例绑定到当前线程,激活ParallelArray DSL,然后运行所提供的闭包。闭包内ForkJoinPool是其唯一参数,闭包内的集合也具备了ParallelArrayUtil类的所有方法,目前Groovy支持的方法列表如下(详情请参考Parallelizer和ParallelArrayUtil的API文档):
- eachParallel()
- eachWithIndexParallel()
- collectParallel()
- findAllParallel()
- findParallel()
- everyParallel()
- anyParallel()
- grepParallel()
- groupByParallel()
- foldParallel()
- minParallel()
- maxParallel()
- sumParallel()
2.meta-class增强器
用ParallelEnhancer类可以为任意类或实例添加进并行方法。例如:
import groovyx.gpars.ParallelEnhancer
def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
ParallelEnhancer.enhanceInstance(list)
println list.collectParallel {it * 2 }
def animals = ['dog', 'ant', 'cat', 'whale']
ParallelEnhancer.enhanceInstance animals
println (animals.anyParallel {it ==~ /ant/} ?
'Found an ant' : 'No ants found')
println (animals.everyParallel {it.contains('a')} ?
'All animals contain a' : 'Some animals can live without an a')
运行结果:
[2, 4, 6, 8, 10, 12, 14, 16, 18] Found an ant Some animals can live without an a
3.并行透明化
我理解透明化并行应该就是:看不出是并行的并行程序吧。前面的例子可以看得很清楚,只要用了“xxxParallel”,我们就知道这是并行处理:比如eachParallel()。所谓透明就是说,看似使用的是原来的方法,但实际上是在并行处理:比如,仍使用Collection的each()方法,但达到的效果是eachParallel()。这其中比较关键的一个方法就是makeTransparent(),看下例:
import groovyx.gpars.Parallelizer
Parallelizer.doParallel {
//selectImportantNames()将并行处理name集合
assert ['ALICE', 'JASON'] == selectImportantNames(
['Joe', 'Alice', 'Dave', 'Jason'].makeTransparent())
}
/**
* 注意,下面是用集合标准collect()和findAll()方法,但却是并行处理。
*/
def selectImportantNames(names) {
names.collect {it.toUpperCase()}.findAll{it.size() > 4}
}
在上面的代码片段中,makeTransparent创建一个TransparentParallel类实例并混入调用它的对象。该类覆盖了each(),collec()等方法,因此不用像ParallelArrayUtil那样用eachParallel()、collectParallel()来表示这是在并行处理,直接用each()、collec()就好了。在同一集合上,makeTransparent与ParallelEnhancer类还可结合使用,那就意味着不管是each()还是eachParallel()都可以使用,且作用是一样的。
4.尽量避免副作用
由于eachParallel()也好、collectParallel()也好都是并行运行的,要确保闭包写操作是线程安全的。闭包中不应有内部状态、共享状态或其他副作用,否则将可能导致竞争状况和死锁。不要像如下的代码那样去写程序:
def thumbnails = []
//不要像下面这样并发访问非线程安全集合thumbnails!!!
images.eachParallel {thumbnails << it.thumbnail}
本系列的其他文章:

最新评论
2 周 3 天之前
2 周 5 天之前
4 周 5 天之前
5 周 2 天之前
5 周 3 天之前
5 周 6 天之前
6 周 4 天之前
11 周 6 天之前
13 周 5 天之前
15 周 3 天之前