GPars(4):Asynchronizer

Asynchronizer是基于Java Executor的后台异步处理器。groovyx.gpars.Asynchronizer类让我们很容易在后台启动异步执行的任务,之后再收集结果。

1.增强闭包

在Asynchronizer.doParallel()代码块中,闭包具有了下列方法:

  • async() - 创建所提供闭包的变种,新创建的闭包被调用时是在线程池上异步执行的。返回java.util.concurrent.Future(表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。计算完成后只能使用 get 方法来检索结果,如有必要,计算完成前可以阻塞此方法。详细信息请参见java.util.concurrent.Future的API文档)类型的值。
  • callAsync() - 在一个单独线程上调用闭包,异步执行。返回java.util.concurrent.Future类型的值。

 还是例子比较好理解,下面两个示意性的例子分别说明了async()和callAsync()的使用方法:

import groovyx.gpars.Asynchronizer
import java.util.concurrent.Future

def calculate = {
  ……
}
Asynchronizer.doParallel() {
    Closure longLastingCalculation = {calculate()}
    //创建新异步闭包
    Closure fastCalculation = longLastingCalculation.async() 
    //异步执行,立即返回 
    Future result=fastCalculation()                           
    //在异步计算处理时,这里可以做些其他事情
    println result.get()
}
import groovyx.gpars.Asynchronizer

Asynchronizer.doParallel() {
    /**
     * callAsync()方法是call()方法的异步变种,将返回Future
     */
    assert 6 == {it * 2}.call(3)
    assert 6 == {it * 2}.callAsync(3).get()
}

2.增强ExecutorService

在Asynchronizer.doParallel()代码块中,java.util.concurrent.ExecutorService类型得以增强,可以用<<操作向ExecutorService提交返回Future结果的任务。其封装了该类型的submit()方法。样例:

import groovyx.gpars.Asynchronizer
import java.util.concurrent.ExecutorService

Asynchronizer.doParallel {ExecutorService executorService ->
    executorService << {println 'Inside parallel task'}
}

3.并行执行闭包

Asynchronizer类还提供了doInParallel()、executeInParallel()和startInParallel()方法,可以方便地并行或异步执行多个闭包:

Asynchronizer.doParallel {
    //下面一句并行执行两个闭包,并等待执行完毕返回结果
    assertEquals( [10, 20], 
      AsyncInvokerUtil.doInParallel({calculateA()}, {calculateB()}) )     
    //下面一句异步执行两个闭包,但每个闭包返回的是Future类型,而无需等待结果         
    assertEquals( [10, 20], 
      AsyncInvokerUtil.executeAsync({calculateA()}, {calculateB()})*.get() ) 
}

4.并行集合处理

Asynchronizer也可用于基于纯JDK的并行集合处理。但与Parallelizer不同之处在于,Asynchronizer不需要jsr-166y.jar文件,而是利用了标准JDK的executor service,通过并行化闭包来并行处理一个集合或一个对象。这种方式需要记录状态,因此这种并发集合类的工作用Parallelizer来做还是要好一些。使用样例:

import groovyx.gpars.Asynchronizer
import java.util.concurrent.Future
 //异步计算乘积
 Asynchronizer.doParallel {
     Collection result = [1, 2, 3, 4, 5].collectParallel{it * 10}
     assert [10, 20, 30, 40, 50] == result
 } 
 //使用异步闭包计算乘积
 Asynchronizer.doParallel {
     def closure={it * 10}
     def asyncClosure=closure.async() 
     Collection result = [1, 2, 3, 4, 5].collect(asyncClosure)
     assert new HashSet([10, 20, 30, 40, 50]) == 
            new HashSet((Collection)result*.get()) 
 } 

可以让传入的闭包带ExecutorService参数,这样在闭包内就可以自由使用ExecutorService了。如下例所示,其中doParallel()的参数表示所创建的线程池中线程的数量:

import groovyx.gpars.Asynchronizer
import java.util.concurrent.ExecutorService

 Asynchronizer.doParallel(5) {ExecutorService service ->
     service.submit({performLongCalculation()} as Runnable)
 }
 

如果要复用已有的ExecutorService实例,可以用Asynchonizer.withExistingAsynchronizer()。doParallel()方法在所有线程都完成任务后才返回,并且销毁ExecutorService实例;withExistingAsynchronizer()方法则不会等ExecutorService线程完成即返回。注意,上述DSL必须在doParallel()或withExistingAsynchronizer()代码块中才有作用,目前支持的方法如下(所有在代码块中的对象均具备这些方法):

  • eachParallel()
  • eachWithIndexParallel()
  • collectParallel()
  • findAllParallel()
  • findParallel()
  • everyParallel()
  • anyParallel()
  • grepParallel()
  • groupByParallel()

5.Meta-class增强器

与ParallelEnhancer类似,AsyncEnhancer类可以给任意类增加上述异步方法(与GPars(2):并发集合之Parallelizer中的例子相似):

import groovyx.gpars.AsyncEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
AsyncEnhancer.enhanceInstance(list)
println list.collectParallel {it * 2 }
def animals = ['dog', 'ant', 'cat', 'whale']
AsyncEnhancer.enhanceInstance animals
println (animals.anyParallel {it ==~ /ant/} ? 
         'Found an ant' : 'No ants found')
println (animals.everyParallel {it.contains('a')} ? 
         'All animals contain a' : 'Some animals can live without an a')

本系列的其他文章:

 

By songwei - Posted on 04 三月 2010