GPars(2):并发集合之Parallelizer

Parallelizer给集合和对象提供了基于ParallelArray(JSR-166y)的DSL,需要jsr166y-070108.jar(Groovy的lib目录下已经包含了该文件)。

1.Parallelizer类

来看两个样例:

 import java.util.concurrent.atomic.AtomicInteger
 import groovyx.gpars.Parallelizer
 //summarize numbers concurrently
 //并发汇总列表数值。闭包中的集合具有了eachParallel()、collectParallel()
 //及其它ParallelArrayUtil方法 
 Parallelizer.doParallel {
     final AtomicInteger result = new AtomicInteger(0)
     //注意:AtomicInteger.addAndGet(int delta)以原子方式将给定值与当前值相加
     [1, 2, 3, 4, 5].eachParallel {result.addAndGet(it)}
     assert 15 == result.get()
 } 
 //multiply numbers asynchronously
 //异步处理多个数值。
 Parallelizer.doParallel {
     final List result = [1, 2, 3, 4, 5].collectParallel {it * 2}
     assert [2, 4, 6, 8, 10] == result
 }
 import groovyx.gpars.Parallelizer
 import jsr166y.forkjoin.ForkJoinPool
 //check whether all elements within a collection meet certain criteria
 //检查是否集合中所有元素都符合(闭包中)标准
 Parallelizer.doParallel(5) {ForkJoinPool pool ->
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }

Parallelizer类中还有一个withExistingParallelizer()方法与doParallel()方法功能类似,区别是:withExistingParallelizer()复用已有的ForkJoinPool实例,而且不会等待pool线程都完成就返回了;而doParallel()方法则创建一个新的ForkJoinPool实例,只有在工作线程都完成且pool销毁之后才返回。它们都会把该ForkJoinPool实例绑定到当前线程,激活ParallelArray DSL,然后运行所提供的闭包。闭包内ForkJoinPool是其唯一参数,闭包内的集合也具备了ParallelArrayUtil类的所有方法,目前Groovy支持的方法列表如下(详情请参考Parallelizer和ParallelArrayUtil的API文档):

  • eachParallel()
  • eachWithIndexParallel()
  • collectParallel()
  • findAllParallel()
  • findParallel()
  • everyParallel()
  • anyParallel()
  • grepParallel()
  • groupByParallel()
  • foldParallel()
  • minParallel()
  • maxParallel()
  • sumParallel()

2.meta-class增强器

用ParallelEnhancer类可以为任意类或实例添加进并行方法。例如:

import groovyx.gpars.ParallelEnhancer
def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
ParallelEnhancer.enhanceInstance(list)
println list.collectParallel {it * 2 }
def animals = ['dog', 'ant', 'cat', 'whale']
ParallelEnhancer.enhanceInstance animals
println (animals.anyParallel {it ==~ /ant/} ? 
           'Found an ant' : 'No ants found')
println (animals.everyParallel {it.contains('a')} ? 
           'All animals contain a' : 'Some animals can live without an a')

运行结果:

[2, 4, 6, 8, 10, 12, 14, 16, 18]
Found an ant
Some animals can live without an a

 

3.并行透明化

我理解透明化并行应该就是:看不出是并行的并行程序吧。前面的例子可以看得很清楚,只要用了“xxxParallel”,我们就知道这是并行处理:比如eachParallel()。所谓透明就是说,看似使用的是原来的方法,但实际上是在并行处理:比如,仍使用Collection的each()方法,但达到的效果是eachParallel()。这其中比较关键的一个方法就是makeTransparent(),看下例:

import groovyx.gpars.Parallelizer
Parallelizer.doParallel {    
    //selectImportantNames()将并行处理name集合
    assert ['ALICE', 'JASON'] == selectImportantNames(
            ['Joe', 'Alice', 'Dave', 'Jason'].makeTransparent())
}
/**
 * 注意,下面是用集合标准collect()和findAll()方法,但却是并行处理。
 */
def selectImportantNames(names) {
    names.collect {it.toUpperCase()}.findAll{it.size() > 4}
}

在上面的代码片段中,makeTransparent创建一个TransparentParallel类实例并混入调用它的对象。该类覆盖了each(),collec()等方法,因此不用像ParallelArrayUtil那样用eachParallel()、collectParallel()来表示这是在并行处理,直接用each()、collec()就好了。在同一集合上,makeTransparent与ParallelEnhancer类还可结合使用,那就意味着不管是each()还是eachParallel()都可以使用,且作用是一样的。

4.尽量避免副作用

由于eachParallel()也好、collectParallel()也好都是并行运行的,要确保闭包写操作是线程安全的。闭包中不应有内部状态、共享状态或其他副作用,否则将可能导致竞争状况和死锁。不要像如下的代码那样去写程序:

def thumbnails = []
//不要像下面这样并发访问非线程安全集合thumbnails!!!
images.eachParallel {thumbnails << it.thumbnail}  

本系列的其他文章:

By songwei - Posted on 02 三月 2010