Actor
概念
Actor 是独立隔离的活动对象,它们之间不共享数据,只通过消息传递进行通信。避免共享可变状态可以使开发人员免受许多典型的并发问题,例如死锁或竞争条件。每个actor 的主体(代码)由线程池中的随机线程执行,因此actor 可以并发且独立地执行。由于Actor 可以共享一个相对较小的线程池,因此它们可以避免 JVM 的线程限制,即使在应用程序包含数千个actor 的情况下,也不需要过多的系统资源。
Actor 通常在其常规任务之上执行三种基本类型的操作
-
创建一个新的actor
-
向另一个actor 发送消息
-
接收消息
Actor 可以作为特定actor 类的子类创建,也可以使用工厂方法将actor 的主体作为闭包参数提供。有各种方法可以发送消息,可以使用>> 运算符或任何send()、sendAndWait() 或sendAndContinua() 方法。
接收消息可以在阻塞或非阻塞方式下执行,当物理线程返回到池中直到消息可用时。
Actor 可以编排成各种算法,可能利用类似于企业消息传递系统中已知的架构模式。 |
生命周期
使用工厂方法创建Actor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Actors.actor {
println "actor1 has started"
delegate.metaClass {
afterStop = {List undeliveredMessages ->
println "actor1 has stopped"
}
onInterrupt = {InterruptedException e ->
println "actor1 has been interrupted"
}
onTimeout = {->
println "actor1 has timed out"
}
onException = {Exception e ->
println "actor1 threw an exception"
}
}
println("Running actor1")
...
}
子类化DefaultActor 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class PooledLifeCycleSampleActor extends DefaultActor {
protected void act() {
println("Running actor2")
...
}
private void afterStart() {
println "actor2 has started"
}
private void afterStop(List undeliveredMessages) {
println "actor2 has stopped"
}
private void onInterrupt(InterruptedException e) {
println "actor2 has been interrupted"
}
private void onTimeout() {
println "actor2 has timed out"
}
private void onException(Exception e) {
println "actor2 threw an exception"
}
}
用法
使用工厂方法创建Actor
1
2
3
4
5
6
7
8
9
10
import static groovyx.gpars.actor.Actors.actor
def console = actor {
loop {
react {
println it
}
}
...
}
子类化DefaultActor 类
1
2
3
4
5
6
7
8
9
10
11
12
class CustomActor extends DefaultActor {
@Override protected void act() {
loop {
react {
println it
}
}
}
}
def console=new CustomActor()
console.start()
发送消息
1
2
3
4
console.send('Message')
console << 'Message'
console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"}
console.sendAndWait 'Message'
超时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static groovyx.gpars.actor.Actors.actor
def me = actor {
friend.send('Hi')
react(30.seconds) {msg ->
if (msg == Actor.TIMEOUT) {
friend.send('I see, busy as usual. Never mind.')
stop()
} else {
//continue conversation
}
}
}
me.join()
当等待消息时超时到期时,Actor.TIMEOUT 消息将到达。如果actor 上存在onTimeout() 处理程序,该处理程序也会被调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
import static groovyx.gpars.actor.Actors.actor
def me = actor {
delegate.metaClass.onTimeout = {->
friend.send('I see, busy as usual. Never mind.')
stop()
}
friend.send('Hi')
react(30.seconds) {
// Continue conversation.
}
}
me.join()
Actor 组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def coreActors = new NonDaemonPGroup(5) //5 non-daemon threads pool
def helperActors = new DefaultPGroup(1) //1 daemon thread pool
def priceCalculator = coreActors.actor {
...
}
def paymentProcessor = coreActors.actor {
...
}
def emailNotifier = helperActors.actor {
...
}
def cleanupActor = helperActors.actor {
...
}
// Increase size of the core actor group.
coreActors.resize 6
// Shutdown the group's pool once you no longer need the group to release resources.
helperActors.shutdown()
DynamicDispatchActor
1
2
3
4
5
6
final Actor actor = new DynamicDispatchActor({
when {String msg -> println 'A String'; reply 'Thanks'}
when {Double msg -> println 'A Double'; reply 'Thanks'}
when {msg -> println 'A something ...'; reply 'What was that?'}
})
actor.start()
Reactor
import groovyx.gpars.actor.Actors final def doubler = Actors.reactor { 2 * it }.start()
Agent
概念
在Clojure 编程语言中,你可以找到一个Agent 的概念,它本质上就像接受代码(函数)作为消息的 actor。接收后,接收到的函数会针对Agent 的内部状态运行,函数的返回值被认为是Agent 的新内部状态。本质上,agent 通过只允许一个agent 管理的线程 对它们进行修改来保护可变值。可变值不能直接从外部访问,而是必须将请求发送到agent,agent 保证按顺序代表调用者处理请求。Agent 保证所有请求的顺序执行,因此值的 consistency。
用法
Agent 实现了一个类似 Clojure 的Agent 概念
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import groovyx.gpars.agent.Agent
def jugMembers = new Agent<List>(['Me']) // Add Me.
jugMembers.send {it.add 'James'} // Add James.
final Thread t1 = Thread.start{
jugMembers {it.add 'Jo'} // Add Jo --- using the implicit call() method to send the function.
}
final Thread t2 = Thread.start{
jugMembers << {it.add 'Dave'} // Add Dave.
jugMembers << {it.add 'Alice'} // Add Alice.
}
[t1, t2]*.join()
println jugMembers.val
jugMembers.valAsync {println "Current members: $it"}
System.in.read()
jugMembers.stop()
通信 Seq. Procs
概念
CSP(通信顺序进程)并发概念提供了一个消息传递模型,该模型具有同步会合式通信。
它主要因其高度的确定性和组合并行进程的能力而受到重视。
GPars GroovyCSP 封装了来自坎特伯雷大学的 JCSP 库,并建立在Jon Kerridge 的工作基础之上。请在这里查看他的作品。
有关CSP 并发模型的更多信息,请查看用户指南中的CSP 部分或参考以下链接
-
CSP 定义:Wiki CSP
-
Google 的Go 编程语言,带有CSP 风格的并发:Go with Google
用法
GroovyCSP
看看这个 CSP 风格并发的 Groovy API 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import groovyx.gpars.csp.PAR
import org.jcsp.lang.CSProcess
import org.jcsp.lang.Channel
import org.jcsp.lang.ChannelOutput
import org.jcsp.lang.One2OneChannel
import groovyx.gpars.csp.plugAndPlay.GPrefix
import groovyx.gpars.csp.plugAndPlay.GPCopy
import groovyx.gpars.csp.plugAndPlay.GPairs
import groovyx.gpars.csp.plugAndPlay.GPrint
class FibonacciV2Process implements CSProcess {
ChannelOutput outChannel
void run() {
One2OneChannel a = Channel.createOne2One()
One2OneChannel b = Channel.createOne2One()
One2OneChannel c = Channel.createOne2One()
One2OneChannel d = Channel.createOne2One()
new PAR([
new GPrefix(prefixValue: 0, inChannel: d.in(), outChannel: a.out()),
new GPrefix(prefixValue: 1, inChannel: c.in(), outChannel: d.out()),
new GPCopy(inChannel: a.in(), outChannel0: b.out(), outChannel1: outChannel),
new GPairs(inChannel: b.in(), outChannel: c.out()),
]).run()
}
}
One2OneChannel N2P = Channel.createOne2One()
new PAR([
new FibonacciV2Process(outChannel: N2P.out()),
new GPrint(inChannel: N2P.in(), heading: "Fibonacci Numbers")
]).run()
数据流并发
概念
数据流并发 提供了一种替代的并发模型,它本质上是安全且健壮的。它强调数据及其通过进程的流动,而不是实际处理数据的进程。数据流 算法使开发人员免于处理死锁、竞争条件,并使死锁变得确定性,从而 100% 可重现。如果你在测试中没有得到死锁,那么你在生产中就不会得到死锁。
数据流变量
一个单赋值多读变量,提供线程之间安全的线程数据交换。
数据流类
一个数据流变量 的虚拟无限映射,具有按需创建策略。
数据流流
一个线程安全的无界确定性阻塞流,具有与数据流变量 兼容的接口。
数据流队列
一个线程安全的无界阻塞队列,具有与数据流变量 兼容的接口。
数据流任务
一个轻量级的执行线程,它从线程池中分配一个物理线程来执行任务主体。任务通常应该使用*数据流变量* 和 *流* 来交换数据。
数据流运算符
更彻底的数据流并发算法 的基石。这些算法通常定义一些运算符,并将它们与通道连接起来,通道由数据流流、队列 或变量 表示。
每个运算符指定其输入和输出通道,以便与其他运算符通信。重复地,只要特定运算符的所有输入通道都包含数据,运算符的主体就会被执行,产生的输出就会被发送到输出通道中。
用法
数据流变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import static groovyx.gpars.dataflow.Dataflow.task
final def x = new DataflowVariable()
final def y = new DataflowVariable()
final def z = new DataflowVariable()
task{
z << x.val + y.val
println "Result: ${z.val}"
}
task{
x << 10
}
task{
y << 5
}
数据流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import static groovyx.gpars.dataflow.Dataflow.task
final def df = new Dataflows()
task{
df.z = df.x + df.y
println "Result: ${df.z}"
}
task{
df.x = 10
}
task{
df.y = 5
}
数据流队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static groovyx.gpars.dataflow.Dataflow.task
def words = ['Groovy', 'fantastic', 'concurrency', 'fun', 'enjoy', 'safe', 'GPars', 'data', 'flow']
final def buffer = new DataflowQueue()
task{
for (word in words) {
buffer << word.toUpperCase() // Add to the buffer.
}
}
task{
while(true) println buffer.val // Read from the buffer in a loop.
}
绑定处理程序
1
2
3
4
def a = new DataflowVariable()
a >> {println "The variable has just been bound to $it"}
a.whenBound{println "Just to confirm that the variable has been really set to $it"}
数据流运算符
1
2
3
4
operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
...
bindOutput 0, x + y + z
}
Fork/Join
概念
Fork/Join 或分而治之 是一种非常强大的抽象,用于解决层次化问题。在谈论层次化问题时,想想快速排序、归并排序、文件系统或一般的树导航等等。
-
Fork / Join 算法本质上将手头的問題拆分成几个更小的子问题,并递归地对每个子问题应用相同的算法。
-
一旦子问题足够小,就可以直接解决。
-
所有子问题的解决方案组合起来,以解决其父问题,进而帮助解决其自身的父问题。
用法
使用Fork-Join 构建器
随意在池中尝试不同的 fork/join 线程数量 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
withPool(1){pool ->
println """Number of files: ${
runForkJoin(new File("./src")) {file ->
long count = 0
file.eachFile {
if (it.isDirectory()) {
println "Forking a child task for $it"
// Fork a child task.
forkOffChild(it)
} else {
count++
}
}
// Use results of children tasks to calculate and store own result.
return count + (childrenResults.sum(0))
}
}"""
}
扩展AbstractForkJoinWorker 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class FileCounter extends AbstractForkJoinWorker<Long> {
private final File file;
def FileCounter(final File file) {
this.file = file
}
protected void compute() {
long count = 0;
file.eachFile{
if (it.isDirectory()) {
println "Forking a thread for $it"
// Fork a child task.
forkOffChild(new FileCounter(it))
}
else {
count++
}
}
// Use results of children tasks to calculate and store own result.
setResult(count + ((childrenResults)?.sum() ?: 0))
}
}
withPool(1){pool -> // Feel free to experiment with the number of fork/join threads in the pool.
println "Number of files: ${orchestrate(new FileCounter(new File("..")))}"
}
Fork/Join 池
概念
处理数据通常涉及操作集合。列表、数组、集合、映射、迭代器、字符串和许多其他数据类型可以被视为项目的集合。处理此类集合的常见模式是按顺序逐个获取元素,并对每一行中的每个项目执行一个操作。
例如,考虑min() 函数,该函数应该返回集合中最小的元素。当你对数字集合调用min() 方法时,调用方线程将创建一个累加器或迄今为止最小的值,并将其初始化为给定类型的最小值,例如零。然后,线程将遍历集合中的元素,并将它们与累加器中的值进行比较。一旦处理完所有元素,最小值就会存储在累加器中。
这种算法实际上浪费了芯片 75% 的计算能力 |
这种算法虽然简单,但在多核硬件上完全错误。在双核芯片上运行min() 函数最多只能利用芯片 50% 的计算能力。在四核芯片上,它将只有 25%。正确的是,这种算法实际上浪费了芯片 75% 的计算能力。
这里应该使用树状结构
事实证明,树状结构更适合并行处理。我们示例中的min() 函数不需要按顺序遍历所有元素并将它们的值与累加器进行比较。相反,它可以依赖于硬件的多核特性。
例如,一个parallel_min() 函数可以比较集合中相邻值的配对(或特定大小的元组),并将元组中最小的值提升到下一轮比较。
在不同的元组中搜索最小值可以安全地并行发生,因此同一轮中的元组可以由不同的内核同时处理,而不会出现线程之间的竞争或争用。
用法
并行集合处理
以下方法目前在Groovy 中的所有对象上都受支持
-
eachParallel()
-
eachWithIndexParallel()
-
collectParallel()
-
findAllParallel()
-
findParallel()
-
everyParallel()
-
anyParallel()
-
grepParallel()
-
groupByParallel()
-
foldParallel()
-
minParallel()
-
maxParallel()
-
sumParallel()
1
2
3
4
5
6
7
8
9
10
11
ForkJoinPool.withPool{
final AtomicInteger result = new AtomicInteger(0)
[1, 2, 3, 4, 5].eachParallel{result.addAndGet(it)}
assert 15 == result
}
// Multiply numbers asynchronously.
ForkJoinPool.withPool{
final List result = [1, 2, 3, 4, 5].collectParallel{it * 2}
assert ([2, 4, 6, 8, 10].equals(result))
}
元类增强器
1
2
3
4
5
6
7
import groovyx.gpars.ParallelEnhancer
def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]
ParallelEnhancer.enhanceInstance(list)
println list.collectParallel{it * 2 }
透明并行集合
1
2
3
4
5
6
7
8
9
10
11
ForkJoinPool.withPool{
assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeConcurrent())
}
/**
* A function implemented using standard sequential collect() and findAll() methods.
*/
def selectImportantNames(names) {
names.collect{it.toUpperCase()}.findAll{it.size() > 4}
}
Map/Reduce
可用方法
-
map()
-
reduce()
-
filter()
-
size()
-
sum()
-
min()
-
max()
collection 属性将返回包装在 Groovy 集合实例中的所有元素。
1
2
3
4
5
6
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
.map{it.toURL().text.toUpperCase()}
.filter{it.contains('GROOVY')}
.map{it.split()}
.map{it.findAll{word -> word.contains 'GROOVY'}.size()}
.sum()
软件事务内存
概念
软件事务内存 或STM,为开发人员提供用于访问内存中数据的交易语义。当多个线程共享内存中的数据时,通过将代码块标记为交易(原子),开发人员将数据一致性的责任委托给 Stm 引擎。GPars 利用Multiverse STM 引擎。
原子闭包
GPars 允许开发人员将他们的并发代码构建成原子块(闭包),然后将它们作为单个单元执行,保留交易 ACI(原子性、一致性、隔离)属性。
用法
以原子方式运行一段代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import groovyx.gpars.stm.GParsStm
import org.multiverse.api.references.TxnInteger
import static org.multiverse.api.StmUtils.newTxnInteger
public class Account {
private final TxnInteger amount = newTxnInteger(0);
public void transfer(final int a) {
GParsStm.atomic {
amount.increment(a);
}
}
public int getCurrentAmount() {
GParsStm.atomicWithInt {
amount.get();
}
}
}
自定义交易属性
1
2
3
4
5
6
7
8
9
import groovyx.gpars.stm.GParsStm
import org.multiverse.api.AtomicBlock
import org.multiverse.api.PropagationLevel
final TxnExecutor block = GParsStm.createTxnExecutor(maxRetries: 3000, familyName: 'Custom', PropagationLevel: PropagationLevel.Requires, interruptible: false)
assert GParsStm.atomicWithBoolean(block) {
true
}
线程池
概念
在多核系统上,你可以从让一些任务在后台异步运行中受益,从而卸载你的主要执行线程。ThreadPool 类允许你轻松地在后台启动要异步执行的任务,并在稍后收集结果。
用法
ThreadPool 的使用 - 基于 Java Executors 的并发集合处理器
闭包增强
1
2
3
4
5
6
7
8
9
10
11
12
GParsExecutorsPool.withPool() {
Closure longLastingCalculation = {calculate()}
// Create a new closure, which starts the original closure on a thread pool.
Closure fastCalculation = longLastingCalculation.async()
// Returns almost immediately.
Future result=fastCalculation()
// Do stuff while calculation performs...
println result.get()
}
1
2
3
4
5
6
7
8
GParsExecutorsPool.withPool() {
/**
* The callAsync() method is an asynchronous variant of the default call() method
* to invoke a closure. It will return a Future for the result value.
*/
assert 6 == {it * 2}.call(3).get()
assert 6 == {it * 2}.callAsync(3).get()
}
Executor 服务增强
1
2
3
GParsExecutorsPool.withPool {ExecutorService executorService ->
executorService << {println 'Inside parallel task'}
}
异步函数处理
1
2
3
4
5
6
7
8
GParsExecutorsPool.withPool {
// Waits for results.
assert [10, 20] == AsyncInvokerUtil.doInParallel({calculateA()}, {calculateB()})
// Returns a Future and doesn't wait for results to be calculated.
assert [10, 20] == AsyncInvokerUtil.executeAsync({calculateA()}, {calculateB()})*.get()
}