Asynchronizer是基于Java Executor的后台异步处理器。groovyx.gpars.Asynchronizer类让我们很容易在后台启动异步执行的任务,之后再收集结果。
1.增强闭包
在Asynchronizer.doParallel()代码块中,闭包具有了下列方法:
- async() - 创建所提供闭包的变种,新创建的闭包被调用时是在线程池上异步执行的。返回java.util.concurrent.Future(表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。计算完成后只能使用 get 方法来检索结果,如有必要,计算完成前可以阻塞此方法。详细信息请参见java.util.concurrent.Future的API文档)类型的值。
- callAsync() - 在一个单独线程上调用闭包,异步执行。返回java.util.concurrent.Future类型的值。
还是例子比较好理解,下面两个示意性的例子分别说明了async()和callAsync()的使用方法:
import groovyx.gpars.Asynchronizer
import java.util.concurrent.Future
def calculate = {
……
}
Asynchronizer.doParallel() {
Closure longLastingCalculation = {calculate()}
//创建新异步闭包
Closure fastCalculation = longLastingCalculation.async()
//异步执行,立即返回
Future result=fastCalculation()
//在异步计算处理时,这里可以做些其他事情
println result.get()
}
import groovyx.gpars.Asynchronizer
Asynchronizer.doParallel() {
/**
* callAsync()方法是call()方法的异步变种,将返回Future
*/
assert 6 == {it * 2}.call(3)
assert 6 == {it * 2}.callAsync(3).get()
}
2.增强ExecutorService
在Asynchronizer.doParallel()代码块中,java.util.concurrent.ExecutorService类型得以增强,可以用<<操作向ExecutorService提交返回Future结果的任务。其封装了该类型的submit()方法。样例:
import groovyx.gpars.Asynchronizer
import java.util.concurrent.ExecutorService
Asynchronizer.doParallel {ExecutorService executorService ->
executorService << {println 'Inside parallel task'}
}
3.并行执行闭包
Asynchronizer类还提供了doInParallel()、executeInParallel()和startInParallel()方法,可以方便地并行或异步执行多个闭包:
Asynchronizer.doParallel {
//下面一句并行执行两个闭包,并等待执行完毕返回结果
assertEquals( [10, 20],
AsyncInvokerUtil.doInParallel({calculateA()}, {calculateB()}) )
//下面一句异步执行两个闭包,但每个闭包返回的是Future类型,而无需等待结果
assertEquals( [10, 20],
AsyncInvokerUtil.executeAsync({calculateA()}, {calculateB()})*.get() )
}
4.并行集合处理
Asynchronizer也可用于基于纯JDK的并行集合处理。但与Parallelizer不同之处在于,Asynchronizer不需要jsr-166y.jar文件,而是利用了标准JDK的executor service,通过并行化闭包来并行处理一个集合或一个对象。这种方式需要记录状态,因此这种并发集合类的工作用Parallelizer来做还是要好一些。使用样例:
import groovyx.gpars.Asynchronizer
import java.util.concurrent.Future
//异步计算乘积
Asynchronizer.doParallel {
Collection result = [1, 2, 3, 4, 5].collectParallel{it * 10}
assert [10, 20, 30, 40, 50] == result
}
//使用异步闭包计算乘积
Asynchronizer.doParallel {
def closure={it * 10}
def asyncClosure=closure.async()
Collection result = [1, 2, 3, 4, 5].collect(asyncClosure)
assert new HashSet([10, 20, 30, 40, 50]) ==
new HashSet((Collection)result*.get())
}
可以让传入的闭包带ExecutorService参数,这样在闭包内就可以自由使用ExecutorService了。如下例所示,其中doParallel()的参数表示所创建的线程池中线程的数量:
import groovyx.gpars.Asynchronizer
import java.util.concurrent.ExecutorService
Asynchronizer.doParallel(5) {ExecutorService service ->
service.submit({performLongCalculation()} as Runnable)
}
如果要复用已有的ExecutorService实例,可以用Asynchonizer.withExistingAsynchronizer()。doParallel()方法在所有线程都完成任务后才返回,并且销毁ExecutorService实例;withExistingAsynchronizer()方法则不会等ExecutorService线程完成即返回。注意,上述DSL必须在doParallel()或withExistingAsynchronizer()代码块中才有作用,目前支持的方法如下(所有在代码块中的对象均具备这些方法):
- eachParallel()
- eachWithIndexParallel()
- collectParallel()
- findAllParallel()
- findParallel()
- everyParallel()
- anyParallel()
- grepParallel()
- groupByParallel()
5.Meta-class增强器
与ParallelEnhancer类似,AsyncEnhancer类可以给任意类增加上述异步方法(与GPars(2):并发集合之Parallelizer中的例子相似):
import groovyx.gpars.AsyncEnhancer
def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
AsyncEnhancer.enhanceInstance(list)
println list.collectParallel {it * 2 }
def animals = ['dog', 'ant', 'cat', 'whale']
AsyncEnhancer.enhanceInstance animals
println (animals.anyParallel {it ==~ /ant/} ?
'Found an ant' : 'No ants found')
println (animals.everyParallel {it.contains('a')} ?
'All animals contain a' : 'Some animals can live without an a')
本系列的其他文章:

最新评论
2 周 3 天之前
2 周 5 天之前
4 周 5 天之前
5 周 2 天之前
5 周 3 天之前
5 周 6 天之前
6 周 4 天之前
11 周 6 天之前
13 周 5 天之前
15 周 3 天之前