GPars Logo

要将此文档下载为 PDF - 点击此处

Actor

概念

Actor 是独立隔离的活动对象,它们之间不共享数据,只通过消息传递进行通信。避免共享可变状态可以使开发人员免受许多典型的并发问题,例如死锁或竞争条件。每个actor 的主体(代码)由线程池中的随机线程执行,因此actor 可以并发且独立地执行。由于Actor 可以共享一个相对较小的线程池,因此它们可以避免 JVM 的线程限制,即使在应用程序包含数千个actor 的情况下,也不需要过多的系统资源。


Actor 通常在其常规任务之上执行三种基本类型的操作

  • 创建一个新的actor

  • 向另一个actor 发送消息

  • 接收消息

Actor 可以作为特定actor 类的子类创建,也可以使用工厂方法将actor 的主体作为闭包参数提供。有各种方法可以发送消息,可以使用>> 运算符或任何send()sendAndWait()sendAndContinua() 方法。

接收消息可以在阻塞或非阻塞方式下执行,当物理线程返回到池中直到消息可用时。


Actor 可以编排成各种算法,可能利用类似于企业消息传递系统中已知的架构模式。

生命周期

使用工厂方法创建Actor

创建一个Actor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Actors.actor {
    println "actor1 has started"
    delegate.metaClass {
        afterStop = {List undeliveredMessages ->
            println "actor1 has stopped"
        }
        onInterrupt = {InterruptedException e ->
            println "actor1 has been interrupted"
        }
        onTimeout = {->
            println "actor1 has timed out"
        }
        onException = {Exception e ->
            println "actor1 threw an exception"
        }
    }
    println("Running actor1")
    ...
}

子类化DefaultActor

子类化一个Actor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class PooledLifeCycleSampleActor extends DefaultActor {
    protected void act() {
        println("Running actor2")
        ...
    }
    private void afterStart() {
        println "actor2 has started"
    }
    private void afterStop(List undeliveredMessages) {
        println "actor2 has stopped"
    }
    private void onInterrupt(InterruptedException e) {
        println "actor2 has been interrupted"
    }
    private void onTimeout() {
        println "actor2 has timed out"
    }
    private void onException(Exception e) {
        println "actor2 threw an exception"
    }
}

用法

使用工厂方法创建Actor

来自工厂的Actor
1
2
3
4
5
6
7
8
9
10
import static groovyx.gpars.actor.Actors.actor

def console = actor {
    loop {
        react {
            println it
        }
    }
    ...
}

子类化DefaultActor

子类化一个DefaultActor
1
2
3
4
5
6
7
8
9
10
11
12
class CustomActor extends DefaultActor {
    @Override protected void act() {
        loop {
            react {
                println it
            }
        }
    }
}

def console=new CustomActor()
console.start()

发送消息

Actor 的消息
1
2
3
4
console.send('Message')
console << 'Message'
console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"}
console.sendAndWait 'Message'

超时

如何处理计时问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static groovyx.gpars.actor.Actors.actor

def me = actor {
    friend.send('Hi')
    react(30.seconds) {msg ->
        if (msg == Actor.TIMEOUT) {
            friend.send('I see, busy as usual. Never mind.')
            stop()
        } else {
            //continue conversation
        }
    }
}
me.join()

当等待消息时超时到期时,Actor.TIMEOUT 消息将到达。如果actor 上存在onTimeout() 处理程序,该处理程序也会被调用。

Actor 超时时会发生什么
1
2
3
4
5
6
7
8
9
10
11
12
13
import static groovyx.gpars.actor.Actors.actor

def me = actor {
    delegate.metaClass.onTimeout = {->
        friend.send('I see, busy as usual. Never mind.')
        stop()
    }
    friend.send('Hi')
    react(30.seconds) {
        // Continue conversation.
    }
}
me.join()

Actor

一组Actor 叫做什么?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def coreActors = new NonDaemonPGroup(5)  //5 non-daemon threads pool
def helperActors = new DefaultPGroup(1)  //1 daemon thread pool
def priceCalculator = coreActors.actor {
...
}
def paymentProcessor = coreActors.actor {
...
}
def emailNotifier = helperActors.actor {
...
}
def cleanupActor = helperActors.actor {
...
}

// Increase size of the core actor group.
coreActors.resize 6

// Shutdown the group's pool once you no longer need the group to release resources.
helperActors.shutdown()

DynamicDispatchActor

动态调度
1
2
3
4
5
6
final Actor actor = new DynamicDispatchActor({
    when {String msg -> println 'A String'; reply 'Thanks'}
    when {Double msg -> println 'A Double'; reply 'Thanks'}
    when {msg -> println 'A something ...'; reply 'What was that?'}
})
actor.start()

Reactor

Actor 反应时
import groovyx.gpars.actor.Actors

final def doubler = Actors.reactor {
    2 * it
}.start()


Agent

概念

Clojure 编程语言中,你可以找到一个Agent 的概念,它本质上就像接受代码(函数)作为消息的 actor。接收后,接收到的函数会针对Agent 的内部状态运行,函数的返回值被认为是Agent 的新内部状态。本质上,agent 通过只允许一个agent 管理的线程 对它们进行修改来保护可变值。可变值不能直接从外部访问,而是必须将请求发送到agentagent 保证按顺序代表调用者处理请求。Agent 保证所有请求的顺序执行,因此值的 consistency。


用法

Agent 实现了一个类似 Clojure 的Agent 概念

一个Agent 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import groovyx.gpars.agent.Agent

def jugMembers = new Agent<List>(['Me'])  // Add Me.
jugMembers.send {it.add 'James'}  // Add James.

final Thread t1 = Thread.start{
    jugMembers {it.add 'Jo'}  // Add Jo --- using the implicit call() method to send the function.
}

final Thread t2 = Thread.start{
    jugMembers << {it.add 'Dave'}  // Add Dave.
    jugMembers << {it.add 'Alice'}  // Add Alice.
}

[t1, t2]*.join()

println jugMembers.val
jugMembers.valAsync {println "Current members: $it"}
System.in.read()
jugMembers.stop()


通信 Seq. Procs

概念

CSP通信顺序进程)并发概念提供了一个消息传递模型,该模型具有同步会合式通信。

它主要因其高度的确定性和组合并行进程的能力而受到重视。

GPars GroovyCSP 封装了来自坎特伯雷大学的 JCSP 库,并建立在Jon Kerridge 的工作基础之上。请在这里查看他的作品。

有关CSP 并发模型的更多信息,请查看用户指南中的CSP 部分或参考以下链接


用法

GroovyCSP

看看这个 CSP 风格并发的 Groovy API 示例

你如何编写这个?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import groovyx.gpars.csp.PAR

import org.jcsp.lang.CSProcess
import org.jcsp.lang.Channel
import org.jcsp.lang.ChannelOutput
import org.jcsp.lang.One2OneChannel

import groovyx.gpars.csp.plugAndPlay.GPrefix
import groovyx.gpars.csp.plugAndPlay.GPCopy
import groovyx.gpars.csp.plugAndPlay.GPairs
import groovyx.gpars.csp.plugAndPlay.GPrint

class FibonacciV2Process implements CSProcess {
    ChannelOutput outChannel

    void run() {
        One2OneChannel a = Channel.createOne2One()
        One2OneChannel b = Channel.createOne2One()
        One2OneChannel c = Channel.createOne2One()
        One2OneChannel d = Channel.createOne2One()
        new PAR([
            new GPrefix(prefixValue: 0, inChannel: d.in(), outChannel: a.out()),
            new GPrefix(prefixValue: 1, inChannel: c.in(), outChannel: d.out()),
            new GPCopy(inChannel: a.in(), outChannel0: b.out(), outChannel1: outChannel),
            new GPairs(inChannel: b.in(), outChannel: c.out()),
        ]).run()
    }
}

One2OneChannel N2P = Channel.createOne2One()

new PAR([
    new FibonacciV2Process(outChannel: N2P.out()),
    new GPrint(inChannel: N2P.in(), heading: "Fibonacci Numbers")
]).run()


数据流并发

概念

数据流并发 提供了一种替代的并发模型,它本质上是安全且健壮的。它强调数据及其通过进程的流动,而不是实际处理数据的进程。数据流 算法使开发人员免于处理死锁、竞争条件,并使死锁变得确定性,从而 100% 可重现。如果你在测试中没有得到死锁,那么你在生产中就不会得到死锁。

数据流变量

一个单赋值多读变量,提供线程之间安全的线程数据交换。

数据流类

一个数据流变量 的虚拟无限映射,具有按需创建策略。

数据流流

一个线程安全的无界确定性阻塞流,具有与数据流变量 兼容的接口。

数据流队列

一个线程安全的无界阻塞队列,具有与数据流变量 兼容的接口。

数据流任务

一个轻量级的执行线程,它从线程池中分配一个物理线程来执行任务主体。任务通常应该使用*数据流变量* 和 *流* 来交换数据。

数据流运算符

更彻底的数据流并发算法 的基石。这些算法通常定义一些运算符,并将它们与通道连接起来,通道由数据流流、队列变量 表示。

每个运算符指定其输入和输出通道,以便与其他运算符通信。重复地,只要特定运算符的所有输入通道都包含数据,运算符的主体就会被执行,产生的输出就会被发送到输出通道中。


用法

数据流变量

一个示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import static groovyx.gpars.dataflow.Dataflow.task

final def x = new DataflowVariable()
final def y = new DataflowVariable()
final def z = new DataflowVariable()
task{
    z << x.val + y.val
    println "Result: ${z.val}"
}

task{
    x << 10
}

task{
    y << 5
}

数据流

一个示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import static groovyx.gpars.dataflow.Dataflow.task
final def df = new Dataflows()

task{
    df.z = df.x + df.y
    println "Result: ${df.z}"
}

task{
    df.x = 10
}

task{
    df.y = 5
}

数据流队列

一个示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import static groovyx.gpars.dataflow.Dataflow.task

def words = ['Groovy', 'fantastic', 'concurrency', 'fun', 'enjoy', 'safe', 'GPars', 'data', 'flow']
final def buffer = new DataflowQueue()

task{
    for (word in words) {
        buffer << word.toUpperCase()  // Add to the buffer.
    }
}

task{
    while(true) println buffer.val  // Read from the buffer in a loop.
}

绑定处理程序

一个示例
1
2
3
4
def a = new DataflowVariable()
a >> {println "The variable has just been bound to $it"}

a.whenBound{println "Just to confirm that the variable has been really set to $it"}

数据流运算符

一个示例
1
2
3
4
operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
    ...
    bindOutput 0, x + y + z
}


Fork/Join

概念

Fork/Join分而治之 是一种非常强大的抽象,用于解决层次化问题。在谈论层次化问题时,想想快速排序、归并排序、文件系统或一般的树导航等等。

  • Fork / Join 算法本质上将手头的問題拆分成几个更小的子问题,并递归地对每个子问题应用相同的算法。

  • 一旦子问题足够小,就可以直接解决。

  • 所有子问题的解决方案组合起来,以解决其父问题,进而帮助解决其自身的父问题。


用法

使用Fork-Join 构建器

随意在池中尝试不同的 fork/join 线程数量
一个示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
withPool(1){pool ->

    println """Number of files: ${

        runForkJoin(new File("./src")) {file ->
            long count = 0

            file.eachFile {
                if (it.isDirectory()) {
                    println "Forking a child task for $it"
                    // Fork a child task.
                    forkOffChild(it)
                } else {
                    count++
                }
            }

            // Use results of children tasks to calculate and store own result.
            return count + (childrenResults.sum(0))
        }
    }"""
}

扩展AbstractForkJoinWorker

一个示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class FileCounter extends AbstractForkJoinWorker<Long> {
    private final File file;

    def FileCounter(final File file) {
        this.file = file
    }

    protected void compute() {
        long count = 0;
        file.eachFile{
            if (it.isDirectory()) {
                println "Forking a thread for $it"
                // Fork a child task.
                forkOffChild(new FileCounter(it))
            }
            else {
                count++
            }
        }

        // Use results of children tasks to calculate and store own result.
        setResult(count + ((childrenResults)?.sum() ?: 0))
    }
}

withPool(1){pool ->  // Feel free to experiment with the number of fork/join threads in the pool.
    println "Number of files: ${orchestrate(new FileCounter(new File("..")))}"
}


Fork/Join 池

概念

处理数据通常涉及操作集合。列表、数组、集合、映射、迭代器、字符串和许多其他数据类型可以被视为项目的集合。处理此类集合的常见模式是按顺序逐个获取元素,并对每一行中的每个项目执行一个操作。

例如,考虑min() 函数,该函数应该返回集合中最小的元素。当你对数字集合调用min() 方法时,调用方线程将创建一个累加器或迄今为止最小的值,并将其初始化为给定类型的最小值,例如零。然后,线程将遍历集合中的元素,并将它们与累加器中的值进行比较。一旦处理完所有元素,最小值就会存储在累加器中。

这种算法实际上浪费了芯片 75% 的计算能力

这种算法虽然简单,但在多核硬件上完全错误。在双核芯片上运行min() 函数最多只能利用芯片 50% 的计算能力。在四核芯片上,它将只有 25%。正确的是,这种算法实际上浪费了芯片 75% 的计算能力。

这里应该使用树状结构

事实证明,树状结构更适合并行处理。我们示例中的min() 函数不需要按顺序遍历所有元素并将它们的值与累加器进行比较。相反,它可以依赖于硬件的多核特性。

例如,一个parallel_min() 函数可以比较集合中相邻值的配对(或特定大小的元组),并将元组中最小的值提升到下一轮比较。

在不同的元组中搜索最小值可以安全地并行发生,因此同一轮中的元组可以由不同的内核同时处理,而不会出现线程之间的竞争或争用。


用法

并行集合处理

以下方法目前在Groovy 中的所有对象上都受支持

  • eachParallel()

  • eachWithIndexParallel()

  • collectParallel()

  • findAllParallel()

  • findParallel()

  • everyParallel()

  • anyParallel()

  • grepParallel()

  • groupByParallel()

  • foldParallel()

  • minParallel()

  • maxParallel()

  • sumParallel()

使用此示例并发地汇总数字
1
2
3
4
5
6
7
8
9
10
11
ForkJoinPool.withPool{
    final AtomicInteger result = new AtomicInteger(0)
    [1, 2, 3, 4, 5].eachParallel{result.addAndGet(it)}
    assert 15 == result
}

// Multiply numbers asynchronously.
ForkJoinPool.withPool{
    final List result = [1, 2, 3, 4, 5].collectParallel{it * 2}
    assert ([2, 4, 6, 8, 10].equals(result))
}

元类增强器

一个示例
1
2
3
4
5
6
7
import groovyx.gpars.ParallelEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

ParallelEnhancer.enhanceInstance(list)

println list.collectParallel{it * 2 }

透明并行集合

selectImportantNames() 方法并发地处理名称集合。
1
2
3
4
5
6
7
8
9
10
11
ForkJoinPool.withPool{

    assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeConcurrent())
}

/**
 * A function implemented using standard sequential collect() and findAll() methods.
 */
def selectImportantNames(names) {
    names.collect{it.toUpperCase()}.findAll{it.size() > 4}
}

Map/Reduce

可用方法

  • map()

  • reduce()

  • filter()

  • size()

  • sum()

  • min()

  • max()

collection 属性将返回包装在 Groovy 集合实例中的所有元素。

一个示例
1
2
3
4
5
6
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
        .map{it.toURL().text.toUpperCase()}
        .filter{it.contains('GROOVY')}
        .map{it.split()}
        .map{it.findAll{word -> word.contains 'GROOVY'}.size()}
        .sum()


软件事务内存

概念

软件事务内存STM,为开发人员提供用于访问内存中数据的交易语义。当多个线程共享内存中的数据时,通过将代码块标记为交易(原子),开发人员将数据一致性的责任委托给 Stm 引擎。GPars 利用Multiverse STM 引擎

原子闭包

GPars 允许开发人员将他们的并发代码构建成原子块(闭包),然后将它们作为单个单元执行,保留交易 ACI(原子性、一致性、隔离)属性。


用法

以原子方式运行一段代码

一个原子示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import groovyx.gpars.stm.GParsStm
import org.multiverse.api.references.TxnInteger

import static org.multiverse.api.StmUtils.newTxnInteger

public class Account {
    private final TxnInteger amount = newTxnInteger(0);

    public void transfer(final int a) {
        GParsStm.atomic {
            amount.increment(a);
        }
    }

    public int getCurrentAmount() {
        GParsStm.atomicWithInt {
            amount.get();
        }
    }
}

自定义交易属性

一个示例
1
2
3
4
5
6
7
8
9
import groovyx.gpars.stm.GParsStm
import org.multiverse.api.AtomicBlock
import org.multiverse.api.PropagationLevel

final TxnExecutor block = GParsStm.createTxnExecutor(maxRetries: 3000, familyName: 'Custom', PropagationLevel: PropagationLevel.Requires, interruptible: false)

assert GParsStm.atomicWithBoolean(block) {
    true
}


线程池

概念

在多核系统上,你可以从让一些任务在后台异步运行中受益,从而卸载你的主要执行线程。ThreadPool 类允许你轻松地在后台启动要异步执行的任务,并在稍后收集结果。


用法

ThreadPool 的使用 - 基于 Java Executors 的并发集合处理器

闭包增强

一个示例
1
2
3
4
5
6
7
8
9
10
11
12
GParsExecutorsPool.withPool() {
    Closure longLastingCalculation = {calculate()}

    // Create a new closure, which starts the original closure on a thread pool.
    Closure fastCalculation = longLastingCalculation.async()

    // Returns almost immediately.
    Future result=fastCalculation()

    // Do stuff while calculation performs...
    println result.get()
}
另一个示例
1
2
3
4
5
6
7
8
GParsExecutorsPool.withPool() {
    /**
     * The callAsync() method is an asynchronous variant of the default call() method
     * to invoke a closure. It will return a Future for the result value.
     */
    assert 6 == {it * 2}.call(3).get()
    assert 6 == {it * 2}.callAsync(3).get()
}

Executor 服务增强

一个示例
1
2
3
GParsExecutorsPool.withPool {ExecutorService executorService ->
    executorService << {println 'Inside parallel task'}
}

异步函数处理

一个示例
1
2
3
4
5
6
7
8
GParsExecutorsPool.withPool {

    // Waits for results.
    assert [10, 20] == AsyncInvokerUtil.doInParallel({calculateA()}, {calculateB()})

    // Returns a Future and doesn't wait for results to be calculated.
    assert [10, 20] == AsyncInvokerUtil.executeAsync({calculateA()}, {calculateB()})*.get()
}