(快速参考)

GPars 框架 - 参考文档

作者:GPars 全员

版本 1.2.1

1 简介

当今主流计算世界正在快速变化。如果您打开计算机的引擎盖,看看里面的东西,您很有可能看到一个双核处理器。或者,如果您有一台高端电脑,则可能看到一个四核处理器。我们现在都将软件运行在多处理器系统上。我们今天和明天编写的代码可能永远不会在单处理器系统上运行:并行硬件已成为标准。但软件却没有,至少现在还没有。人们仍然创建单线程代码,即使它无法利用当前和未来硬件的全部能力。一些开发人员尝试使用低级并发原语,例如线程、锁或同步块。但是,很明显,在应用程序级别使用的共享内存多线程方法带来的麻烦比它解决的麻烦还要多。低级并发处理通常很难做对,而且也不太有趣。随着硬件的这种根本性变化,软件不可避免地也必须发生巨大变化。像 map/reduce、fork/join、actor 和数据流这样更高级的并发和并行概念,为不同的问题域提供了自然的抽象,同时利用了多核硬件。

1.1 GPars 简介

认识一下 GPars - 一个针对 Java 和 Groovy 的开源并发和并行库,它为您提供了许多编写 Groovy 并发和并行代码的高级抽象(map/reduce、fork/join、异步闭包、actor、agent、数据流并发和其他概念),可以轻松地使您的 Java 和 Groovy 代码并发和/或并行。使用 GPars,您的 Java 和/或 Groovy 代码可以轻松地利用目标系统上的所有可用处理器。您可以同时运行多个计算、并行请求网络资源、安全地解决分层分治问题、执行函数式风格的 map/reduce 或数据并行集合处理,或者以 Actor 或数据流模型为基础构建应用程序。

该项目在 Apache 2 许可证 下开源。如果您正在使用 Groovy 开发商业、开源、教育或任何其他类型的软件项目,请下载二进制文件或从 Maven 存储库中集成它们,然后开始吧。编写高度并发和/或并行 Java 和 Groovy 代码的方式是畅通无阻的。尽情享受吧!

1.2 致谢

如果没有许多个人提供的巨大帮助和贡献,该项目不可能达到目前的水平,他们贡献了时间、精力和专业知识,使 GPars 成为一款可靠的产品。首先,应该提到的就是核心团队的人员
  • Václav Pech
  • Dierk Koenig
  • Alex Tkachman
  • Russel Winder
  • Paul King
  • Jon Kerridge

随着时间的推移,许多人贡献了他们的想法,提供了有用的反馈,或以某种方式帮助了 GPars。这个群体中有很多很多人,太多以至于无法一一列出,但至少让我们列出最活跃的人

  • Hamlet d'Arcy
  • Hans Dockter
  • Guillaume Laforge
  • Robert Fischer
  • Johannes Link
  • Graeme Rocher
  • Alex Miller
  • Jeff Gortatowsky
  • Jiří Kropáček

衷心感谢所有人!

2 入门

在我们开始之前,让我们先做一些假设
  1. 您了解并使用 Groovy 和 Java:否则,您就不会花宝贵的时间研究针对 Groovy 和 Java 的并发和并行库。
  2. 您绝对希望使用 Groovy 和 Java 编写利用并发和并行的代码。
  3. 如果您不使用 Groovy 编写代码,那么您准备为使用 Java 支付不可避免的冗长性税。
  4. 您的代码针对的是多核硬件。
  5. 您理解在并发和并行代码中,事情可能在任何时间、以任何顺序发生,而且很可能不止一件事同时发生。

有了这些假设,我们就开始吧。

越来越明显的是,使用 JVM 提供的线程/同步/锁级别来处理并发和并行,对于安全性和舒适性来说,级别太低了。许多高级概念,例如 actor 和数据流,已经存在了一段时间:并行计算机至少在数据中心(如果不是在桌面)上已经使用,早于多核芯片进入硬件主流。现在是时候在主流软件行业中采用这些更高级的抽象了。这就是 GPars 为 Groovy 和 Java 语言提供的功能,它允许 Groovy 和 Java 程序员使用更高级的抽象,从而使开发并发和并行软件变得更容易,错误更少。

GPars 中可用的概念可以分为三组

  1. 代码级别帮助程序 可以应用于代码库的较小部分(例如单个算法或数据结构)的构造,而无需对整个项目架构进行任何重大更改
    1. 并行集合
    2. 异步处理
    3. Fork/Join(分治)
  2. 架构级别概念 在设计项目结构时需要考虑的构造
    1. Actor
    2. 通信顺序进程 (CSP)
    3. 数据流
    4. 数据并行
  3. 共享可变状态保护 尽管目前约 95% 的共享可变状态的使用可以通过使用适当的抽象来避免,但对于剩余的 5% 的用例来说,好的抽象仍然是必要的,因为在这种情况下无法避免共享可变状态
    1. Agent
    2. 软件事务内存(GPars 尚未完全实现)

2.1 下载和安装

GPars 现在与 Groovy 一起作为标准发行。因此,如果您有 Groovy 安装,则应该已经拥有 GPars。当然,您拥有的 GPars 的确切版本将取决于 Groovy 的版本。如果您还没有 GPars,但您确实有 Groovy,那么也许您应该升级您的 Groovy!

如果您没有 Groovy 安装,但通过使用依赖项或仅仅拥有 groovy-all 工件来获取 Groovy,那么您将需要获取 GPars。另外,如果您想要使用与 Groovy 一起提供的 GPars 版本不同的版本,或者拥有一个无法升级的旧的没有 GPars 的 Groovy,那么您将需要获取 GPars。获取 GPars 的方法有

  • 从存储库中下载工件,并手动添加它以及所有传递依赖项。
  • 在 Gradle、Maven 或 Ivy(或 Gant 或 Ant)构建文件中指定依赖项。
  • 使用 Grapes(对于 Groovy 脚本特别有用)。

如果您正在构建 Grails 或 Griffon 应用程序,可以使用相应的插件来为您获取 jar 文件。

GPars 工件

如上所述,GPars 现在与 Groovy 一起作为标准发行。但是,如果您必须手动管理此依赖项,则 GPars 工件位于主 Maven 存储库以及 Codehaus 主存储库和快照存储库中。发布版本位于 Maven 和 Codehaus 主存储库中,当前开发版本 (SNAPSHOT) 位于 Codehaus 快照存储库中。要从 Gradle 或 Grapes 中使用,请使用以下规范

"org.codehaus.gpars:gpars:1.2.1"
用于发布版本,以及
"org.codehaus.gpars:gpars:1.3-SNAPSHOT"
用于开发版本。在这种情况下,您可能需要将 Codehaus 快照存储库手动添加到搜索列表中。使用 Maven,依赖项为
<dependency>
    <groupId>org.codehaus.gpars</groupId>
    <artifactId>gpars</artifactId>
    <version>1.2.0</version>
</dependency>
或者,如果使用最新的快照,则为版本 1.3-SNAPSHOT。

传递依赖项

GPars 作为库依赖于 Groovy 版本等于或大于 2.0。此外,Fork/Join 并发库,即 jsr166y(来自 JSR-166 项目 的一个工件)必须在使用 GPars 的程序的类路径上,才能进行编译和执行。该工件的发布版本位于主 Maven 和 Codehaus 存储库中。该工件的开发版本位于 Codehaus 快照存储库中。使用 Gradle 或 Grapes,您将使用以下依赖项规范

"org.codehaus.jsr166-mirror:jsr166y:1.7.0"
对于 Maven,规范将为
<dependency>
    <groupId>org.codehaus.jsr166-mirror</groupId>
    <artifactId>jsr166y</artifactId>
    <version>1.7.0</version>
</dependency>
开发版本的版本号为 1.7.0.1-SNAPSHOT。

GPars 在其自己的描述符中定义了此依赖项,因此,如果您使用 Gradle、Grails、Griffon、Maven、Ivy 或其他类型的自动依赖项解析工具,理想情况下,所有依赖项管理都应该自动完成。

有关更多详细信息,请访问 GPars 网站上的 集成 页面。

2.2 Hello World 示例

设置完成后,尝试运行以下 Groovy 脚本以测试您的设置是否按预期工作。
import static groovyx.gpars.actor.Actors.actor

/** * A demo showing two cooperating actors. The decryptor decrypts received messages * and replies them back. The console actor sends a message to decrypt, prints out * the reply and terminates both actors. The main thread waits on both actors to * finish using the join() method to prevent premature exit, since both actors use * the default actor group, which uses a daemon thread pool. * @author Dierk Koenig, Vaclav Pech */

def decryptor = actor { loop { react { message -> if (message instanceof String) reply message.reverse() else stop() } } }

def console = actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it decryptor.send false } }

[decryptor, console]*.join()

运行代码时,您应该在控制台上看到一条消息“解密后的消息:Groovy 是并行的”。

GPars 主要设计用于与 Groovy 编程语言一起使用。当然,所有 Java 和 Groovy 程序都只是在 JVM 上运行的字节码,因此 GPars 可以与 Java 源代码一起使用。尽管 GPars 针对 Groovy 代码使用,但其坚实的技术基础加上良好的性能特性,使其成为 Java 程序的优秀库。事实上,GPars 的大部分是用 Java 编写的,因此使用 GPars 的 Java 应用程序不会产生任何性能损失。

有关详细信息,请参阅 Java API 部分。

要使用 Java API 通过 GPars 进行快速测试,您可以编译并运行以下 Java 代码

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.DynamicDispatchActor;

public class StatelessActorDemo { public static void main(String[] args) throws InterruptedException { final MyStatelessActor actor = new MyStatelessActor(); actor.start(); actor.send("Hello"); actor.sendAndWait(10); actor.sendAndContinue(10.0, new MessagingRunnable<String>() { @Override protected void doRun(final String s) { System.out.println("Received a reply " + s); } }); } }

class MyStatelessActor extends DynamicDispatchActor { public void onMessage(final String msg) { System.out.println("Received " + msg); replyIfExists("Thank you"); }

public void onMessage(final Integer msg) { System.out.println("Received a number " + msg); replyIfExists("Thank you"); }

public void onMessage(final Object msg) { System.out.println("Received an object " + msg); replyIfExists("Thank you"); } }

请记住,您几乎肯定需要将 Groovy 工件添加到构建中,以及 GPars 工件。GPars 使用 Java 应用程序可能以 Java 速度工作,但它仍然对 Groovy 有一些编译依赖项。

2.3 代码规范

我们在代码示例中遵循某些规范。了解这些规范可能有助于您更好地阅读和理解 GPars 代码示例。
  • 在 Actor、Agent 和数据流表达式(变量和流)上,左移 运算符 << 已经重载,表示向其 发送 消息或 赋值

myActor << 'message'

myAgent << {account -> account.add('5 USD')}

myDataflowVariable << 120332

  • 在演员和代理中,默认的 call() 方法也被重载为 send 。因此,向演员或代理发送消息看起来就像一个普通的函数调用。

myActor "message"

myAgent {house -> house.repair()}

  • 在 GPars 中,rightShift 运算符 >> 具有 when bound 含义。所以

myDataflowVariable >> {value -> doSomethingWith(value)}
将调度闭包仅在 myDataflowVariable 绑定到一个值后才运行,该值作为参数。

在示例中,我们倾向于静态导入常用的工厂方法

  • GParsPool.withPool()
  • GParsPool.withExistingPool()
  • GParsExecutorsPool.withPool()
  • GParsExecutorsPool.withExistingPool()
  • Actors.actor()
  • Actors.reactor()
  • Actors.fairReactor()
  • Actors.messageHandler()
  • Actors.fairMessageHandler()
  • Agent.agent()
  • Agent.fairAgent()
  • Dataflow.task()
  • Dataflow.operator()

这更多是一个风格偏好和个人品味的问题,但我们认为静态导入使代码更紧凑易读。

2.4 在 IDE 中进行设置

将 GPars jar 文件添加到您的项目中或在 pom.xml 中定义适当的依赖项,就足以让您在 IDE 中开始使用 GPars。

GPars DSL 识别

IntelliJ IDEA,无论是免费的 社区版 还是商业版的 终极版,都将识别 GPars 领域特定语言,完成方法,例如 eachParallel()reduce()callAsync() 并验证它们。GPars 使用 GroovyDSL 机制,该机制会在 GPars jar 文件添加到项目中时立即将 DSL 教给 IntelliJ IDEA。

2.5 概念的适用性

GPars 提供了许多概念可供选择。我们正在不断构建和更新一个页面,试图帮助用户为其手头的任务选择正确的抽象。有关详细信息,请参阅 概念比较 页面。

为了简要总结建议,您可以在下面找到基本指南

  1. 您正在查看一个集合,需要 迭代 或使用众多优美的 Groovy 集合方法(如 each()collect()find() 等)进行处理。假设处理集合的每个元素都独立于其他项目,可以使用 GPars 并行集合 来推荐。
  2. 如果您有一个 长时间运行的计算,可以安全地在后台运行,请使用 GPars 中的 异步调用支持。由于 GPars 异步函数可以组合,因此您可以快速并行化复杂的函数计算,而无需显式标记独立计算。
  3. 您需要 并行化 手头的算法。您可以识别一组具有相互依赖关系的 任务。这些任务通常不需要共享数据,但某些任务可能需要等待其他任务完成才能启动。您已准备好以代码明确表达这些依赖关系。使用 GPars 数据流任务,您可以创建内部顺序任务,每个任务都可以与其他任务并发运行。数据流变量和通道为任务提供了表达其依赖关系并安全交换数据的能力。
  4. 您无法避免在算法中使用 共享可变状态。多个线程将访问共享数据,并(其中一些)修改它。传统的锁定和同步方法感觉太冒险或不熟悉。选择 代理,它将包装您的数据并序列化对它的所有访问。
  5. 您正在构建一个具有高并发需求的系统。在这里或那里调整数据结构或任务将无法满足需求。您需要从头开始构建体系结构,以考虑并发。消息传递 可能是可行的解决方案。
    1. Groovy CSP 将为您提供用于并发进程的高度确定性和可组合模型。该模型围绕 计算进程 的概念进行组织,这些计算或进程并发运行并通过同步通道进行通信。
    2. 如果您正在尝试解决复杂的数据处理问题,请考虑使用 GPars 数据流运算符 来构建数据流网络。该概念围绕使用异步通道连接到管道中的事件驱动变换进行组织。
    3. 演员活动对象 将在您需要根据面向对象范式构建通用的、高度并发和可扩展的体系结构时大放异彩。

现在,您可能对在当前项目中使用哪些概念有了更好的了解。转到用户指南中查看有关它们的更多详细信息。

2.6 新功能

新的 GPars 1.2.0 版本在先前版本的基础上引入了一些增强功能和改进,主要是在数据流领域。

查看 JIRA 发布说明

项目更改

查看 重大更改列表 以了解重大更改的列表。

异步函数

并行集合

Fork / Join

Actor

  • 使用 @DelegatesTo 对演员和演员工厂方法进行注释,以允许在静态编译的 Groovy 代码中使用演员

数据流

  • 向 Promises 添加了一个 thenForkAndJoin() 方法,以简化使用 promises 创建 fork/join 算法
  • 添加了延迟任务,以简化异步活动的创建,其执行将延迟到实际请求其结果时
  • 使用 @DelegatesTo 对运算符工厂方法进行注释,以允许在静态编译的 Groovy 代码中使用数据流运算符和选择器

代理

Stm

其他

  • 将 java.util.Timer 替换为 ScheduledExecutorService 用于超时处理
  • 在 GParsConfig 类中添加了对计时器和线程本地变量的正确关闭

重命名提示

2.7 Java API - 从 Java 中使用 GPars

使用 GPars 非常令人上瘾,我保证。一旦你上瘾,你就无法没有它进行编码。如果世界强迫你用 Java 编写代码,你仍然可以从大多数 GPars 功能中受益。

Java API 特性

GPars 的某些部分与 Java 无关,最好直接使用底层的 Java 库

  • 并行集合 - 直接使用 jsr-166y 库的并行数组
  • Fork/Join - 直接使用 jsr-166y 库的 Fork/Join 支持
  • 异步函数 - 直接使用 Java 执行程序服务

GPars 的其他部分可以从 Java 中使用,就像从 Groovy 中使用一样,尽管大多数人会错过 Groovy DSL 的功能。

Java API 中的 GPars 闭包

为了克服 Java 中缺少闭包作为语言元素的问题,并避免强迫用户通过 Java API 直接使用 Groovy 闭包,提供了一些方便的包装类来帮助您定义回调、演员主体或数据流任务。

  • groovyx.gpars.MessagingRunnable - 用于单参数回调或演员主体
  • groovyx.gpars.ReactorMessagingRunnable - 用于 ReactiveActor 主体
  • groovyx.gpars.DataflowMessagingRunnable - 用于数据流运算符的主体

这些类可以在 GPars API 预期使用 Groovy 闭包的所有地方使用。

Actor

DynamicDispatchActor 以及 ReactiveActor 类可以像在 Groovy 中一样使用

import groovyx.gpars.MessagingRunnable;
 import groovyx.gpars.actor.DynamicDispatchActor;

public class StatelessActorDemo { public static void main(String[] args) throws InterruptedException { final MyStatelessActor actor = new MyStatelessActor(); actor.start(); actor.send("Hello"); actor.sendAndWait(10); actor.sendAndContinue(10.0, new MessagingRunnable<String>() { @Override protected void doRun(final String s) { System.out.println("Received a reply " + s); } }); } }

class MyStatelessActor extends DynamicDispatchActor { public void onMessage(final String msg) { System.out.println("Received " + msg); replyIfExists("Thank you"); }

public void onMessage(final Integer msg) { System.out.println("Received a number " + msg); replyIfExists("Thank you"); }

public void onMessage(final Object msg) { System.out.println("Received an object " + msg); replyIfExists("Thank you"); } }

尽管 Groovy 和 Java GPars 的使用之间没有太多区别,但请注意,回调实例化 MessagingRunnable 类以代替 groovy 闭包。

import groovy.lang.Closure;
import groovyx.gpars.ReactorMessagingRunnable;
import groovyx.gpars.actor.Actor;
import groovyx.gpars.actor.ReactiveActor;

public class ReactorDemo { public static void main(final String[] args) throws InterruptedException { final Closure handler = new ReactorMessagingRunnable<Integer, Integer>() { @Override protected Integer doRun(final Integer integer) { return integer * 2; } }; final Actor actor = new ReactiveActor(handler); actor.start();

System.out.println("Result: " + actor.sendAndWait(1)); System.out.println("Result: " + actor.sendAndWait(2)); System.out.println("Result: " + actor.sendAndWait(3)); } }

便捷工厂方法

显然,所有用于快速构建演员的基本工厂方法都可以在您期望的地方找到。

import groovy.lang.Closure;
import groovyx.gpars.ReactorMessagingRunnable;
import groovyx.gpars.actor.Actor;
import groovyx.gpars.actor.Actors;

public class ReactorDemo { public static void main(final String[] args) throws InterruptedException { final Closure handler = new ReactorMessagingRunnable<Integer, Integer>() { @Override protected Integer doRun(final Integer integer) { return integer * 2; } }; final Actor actor = Actors.reactor(handler);

System.out.println("Result: " + actor.sendAndWait(1)); System.out.println("Result: " + actor.sendAndWait(2)); System.out.println("Result: " + actor.sendAndWait(3)); } }

Agent

import groovyx.gpars.MessagingRunnable;
 import groovyx.gpars.agent.Agent;

public class AgentDemo { public static void main(final String[] args) throws InterruptedException { final Agent counter = new Agent<Integer>(0); counter.send(10); System.out.println("Current value: " + counter.getVal()); counter.send(new MessagingRunnable<Integer>() { @Override protected void doRun(final Integer integer) { counter.updateValue(integer + 1); } }); System.out.println("Current value: " + counter.getVal()); } }

数据流并发

从 Java 中使用 DataflowVariablesDataflowQueues 都没有问题。只需避免方便的重载运算符,直接使用方法,例如 bindwhenBoundgetVal 等。您也可以继续使用数据流 任务,将 RunnableCallable 的实例传递给它们,就像 groovy Closure 一样。

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.dataflow.DataflowVariable;
import groovyx.gpars.group.DefaultPGroup;

import java.util.concurrent.Callable;

public class DataflowTaskDemo { public static void main(final String[] args) throws InterruptedException { final DefaultPGroup group = new DefaultPGroup(10);

final DataflowVariable a = new DataflowVariable();

group.task(new Runnable() { public void run() { a.bind(10); } });

final Promise result = group.task(new Callable() { public Object call() throws Exception { return (Integer)a.getVal() + 10; } });

result.whenBound(new MessagingRunnable<Integer>() { @Override protected void doRun(final Integer integer) { System.out.println("arguments = " + integer); } });

System.out.println("result = " + result.getVal()); } }

数据流运算符

下面的示例应该说明 Groovy 和 Java API 在数据流运算符方面的主要区别。

  1. 使用接受通道列表的便捷工厂方法来创建运算符或选择器
  2. 使用 DataflowMessagingRunnable 指定运算符主体
  3. 调用 getOwningProcessor() 以从主体内部获取运算符,以便例如绑定输出值

import groovyx.gpars.DataflowMessagingRunnable;
import groovyx.gpars.dataflow.Dataflow;
import groovyx.gpars.dataflow.DataflowQueue;
import groovyx.gpars.dataflow.operator.DataflowProcessor;

import java.util.Arrays; import java.util.List;

public class DataflowOperatorDemo { public static void main(final String[] args) throws InterruptedException { final DataflowQueue stream1 = new DataflowQueue(); final DataflowQueue stream2 = new DataflowQueue(); final DataflowQueue stream3 = new DataflowQueue(); final DataflowQueue stream4 = new DataflowQueue();

final DataflowProcessor op1 = Dataflow.selector(Arrays.asList(stream1), Arrays.asList(stream2), new DataflowMessagingRunnable(1) { @Override protected void doRun(final Object… objects) { getOwningProcessor().bindOutput(2*(Integer)objects[0]); } });

final List secondOperatorInput = Arrays.asList(stream2, stream3);

final DataflowProcessor op2 = Dataflow.operator(secondOperatorInput, Arrays.asList(stream4), new DataflowMessagingRunnable(2) { @Override protected void doRun(final Object… objects) { getOwningProcessor().bindOutput((Integer) objects[0] + (Integer) objects[1]); } });

stream1.bind(1); stream1.bind(2); stream1.bind(3); stream3.bind(100); stream3.bind(100); stream3.bind(100); System.out.println("Result: " + stream4.getVal()); System.out.println("Result: " + stream4.getVal()); System.out.println("Result: " + stream4.getVal()); op1.stop(); op2.stop(); } }

性能

一般来说,无论您是使用 Groovy 还是 Java,GPars 的开销都是相同的,并且往往非常低。例如,GPars 演员可以与其他 JVM 演员选项(如 Scala 演员)正面竞争。

由于 Groovy 代码通常比 Java 代码运行速度慢,这主要是因为动态方法调用,因此您可能考虑用 Java 编写代码以提高性能。通常,任务或演员主体内的数值运算或频繁的细粒度方法调用可能会从重写为 Java 中获益。

先决条件

所有 GPars 集成规则都适用于 Java 项目,就像它们适用于 Groovy 项目一样。您只需要将 groovy 分发 jar 文件包含到您的项目中,一切就都清楚了。您也可以查看示例 Java Maven 项目,以获取有关如何将 GPars 集成到基于 maven 的纯 Java 应用程序中的技巧 - 示例 Java Maven 项目

3 数据并行

专注于数据而不是进程,有助于创建健壮的并发程序。作为程序员,您定义数据以及应该应用于它的函数,然后让底层机制处理数据。通常,将创建一组并发任务,然后将它们提交给线程池进行处理。

GPars 中,GParsPoolGParsExecutorsPool 类可以让您访问低级数据并行技术。虽然 GParsPool 类依赖于 jsr-166y Fork/Join 框架,因此提供了更大的功能和更好的性能,但 GParsExecutorsPool 使用的是传统的 Java 执行程序,因此在受管理或受限制的环境中更容易设置。

GPars 低级数据并行涵盖了三个基本领域

  1. 并发处理集合
  2. 异步运行函数(闭包)
  3. 执行 Fork/Join(分治)算法

3.1 并行集合

处理数据通常涉及操作集合。列表、数组、集合、映射、迭代器、字符串以及许多其他数据类型都可以被视为项目的集合。处理此类集合的常见模式是从头到尾依次取元素,对每个项目进行操作。

例如,min() 函数应该返回集合中最小的元素。当您在数字集合上调用 min() 方法时,调用者线程将创建一个 累加器到目前为止最小的值,并将其初始化为给定类型的最小值,例如零。然后,线程将遍历集合中的元素,并将它们与 累加器 中的值进行比较。处理完所有元素后,最小值将存储在 累加器 中。

然而,这种算法虽然简单,但对于多核硬件来说 完全错误。在双核芯片上运行 min() 函数最多只能利用芯片 50% 的计算能力。在四核芯片上,它将仅为 25%。没错,该算法实际上 浪费了芯片 75% 的计算能力

树状结构被证明更适合并行处理。我们示例中的 min() 函数不需要遍历行中的所有元素并将它们的值与 累加器 进行比较。它可以做的是依赖硬件的多核特性。例如,parallel_min() 函数可以比较集合中相邻值的成对(或特定大小的元组),并将元组中最小的值提升到下一轮比较。在不同的元组中搜索最小值可以安全地并行进行,因此同一轮中的元组可以由不同的核心同时处理,而不会出现线程之间的竞争或冲突。

认识并行数组

jsr-166y 库提供了一个非常方便的抽象,称为 并行数组。GPars 以多种方式利用并行数组实现。GParsPoolGParsExecutorsPool 类提供了常见 Groovy 迭代方法的并行变体,例如 each()collect()findAll() 等。

def selfPortraits = images.findAllParallel{it.contains me}.collectParallel {it.resize()}
它还允许以更函数式的方式进行 map/reduce 集合处理。
def smallestSelfPortrait = images.parallel.filter{it.contains me}.map{it.resize()}.min{it.sizeInMB}

3.1.1 GParsPool

使用 GParsPool - 基于 JSR-166y 的并发集合处理器

GParsPool 的用法

GParsPool 类为集合和对象提供了基于并行数组(来自 JSR-166y)的并发 DSL。

使用示例

//summarize numbers concurrently
 GParsPool.withPool {
     final AtomicInteger result = new AtomicInteger(0)
     [1, 2, 3, 4, 5].eachParallel {result.addAndGet(it)}
     assert 15 == result
 }

//multiply numbers asynchronously GParsPool.withPool { final List result = [1, 2, 3, 4, 5].collectParallel {it * 2} assert ([2, 4, 6, 8, 10].equals(result)) }

传入的闭包以 ForkJoinPool 的实例作为参数,然后可以在闭包中自由使用。
//check whether all elements within a collection meet certain criteria
 GParsPool.withPool(5) {ForkJoinPool pool ->
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }
GParsPool.withPool() 方法接受创建池的线程数和未处理异常处理程序的可选参数。
withPool(10) {...}
withPool(20, exceptionHandler) {...}

GParsPool.withExistingPool() 接受一个已存在的 ForkJoinPool 实例以供重用。DSL 仅在关联的代码块内有效,并且仅对调用了 withPool()withExistingPool() 方法的线程有效。 withPool() 方法只有在所有工作线程完成其任务并且池被销毁后才会返回,返回关联代码块的返回值。 withExistingPool() 方法不会等待池线程完成。

或者,可以静态导入 GParsPoolimport static groovyx.gpars.GParsPool.`*`,这将允许省略 GParsPool 类名。

withPool {
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }

以下方法当前在 Groovy 中的所有对象上都受支持

  • eachParallel()
  • eachWithIndexParallel()
  • collectParallel()
  • collectManyParallel()
  • findAllParallel()
  • findAnyParallel
  • findParallel()
  • everyParallel()
  • anyParallel()
  • grepParallel()
  • groupByParallel()
  • foldParallel()
  • minParallel()
  • maxParallel()
  • sumParallel()
  • splitParallel()
  • countParallel()
  • foldParallel()

元类增强器

作为替代方案,可以使用 ParallelEnhancer 类来增强任何类或单个实例的元类,并使用并行方法。

import groovyx.gpars.ParallelEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9] ParallelEnhancer.enhanceInstance(list) println list.collectParallel {it * 2 }

def animals = ['dog', 'ant', 'cat', 'whale'] ParallelEnhancer.enhanceInstance animals println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found') println (animals.everyParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')

使用 ParallelEnhancer 类时,您无需使用 GParsPool DSL 限制在 withPool() 块中。增强后的类或实例将一直保持增强状态,直到它们被垃圾回收。

异常处理

如果在处理任何传入的闭包时抛出异常,则第一个异常将从 xxxParallel 方法中重新抛出,并且算法将在尽快停止。

GParsPool 的异常处理机制建立在 Fork/Join 框架中内置的机制之上。由于 Fork/Join 算法本质上是分层的,一旦算法的任何部分发生故障,继续计算通常没有太大好处,因为算法的某些分支将永远不会返回结果。

请记住,GParsPool 实现不保证在发生第一个未处理异常后的行为,除了停止算法并将第一个检测到的异常重新抛出给调用者之外。毕竟,这种行为与传统的顺序迭代方法所做的一致。

透明并行集合

除了添加新的 xxxParallel() 方法之外,GPars 还允许您更改原始迭代方法的语义。例如,您可能将集合传递到一个库方法中,该方法将以顺序方式处理您的集合,例如使用 collect() 方法。通过更改集合上的 collect() 方法的语义,您可以有效地并行化库顺序代码。

GParsPool.withPool {

//The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeConcurrent()) }

/** * A function implemented using standard sequential collect() and findAll() methods. */ def selectImportantNames(names) { names.collect {it.toUpperCase()}.findAll{it.size() > 4} }

makeSequential() 方法将重置集合以返回到原始的顺序语义。

import static groovyx.gpars.GParsPool.withPool

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

println 'Sequential: ' list.each { print it + ',' } println()

withPool {

println 'Sequential: ' list.each { print it + ',' } println()

list.makeConcurrent()

println 'Concurrent: ' list.each { print it + ',' } println()

list.makeSequential()

println 'Sequential: ' list.each { print it + ',' } println() }

println 'Sequential: ' list.each { print it + ',' } println()

asConcurrent() 方便方法将允许您指定代码块,在这些代码块中集合保持并发语义。

import static groovyx.gpars.GParsPool.withPool

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

println 'Sequential: ' list.each { print it + ',' } println()

withPool {

println 'Sequential: ' list.each { print it + ',' } println()

list.asConcurrent { println 'Concurrent: ' list.each { print it + ',' } println() }

println 'Sequential: ' list.each { print it + ',' } println() }

println 'Sequential: ' list.each { print it + ',' } println()

透明并行化,包括 makeConcurrent()makeSequential()asConcurrent() 方法,也可以与 ParallelEnhancer 结合使用。

/**
 * A function implemented using standard sequential collect() and findAll() methods.
 */
def selectImportantNames(names) {
    names.collect {it.toUpperCase()}.findAll{it.size() > 4}
}

def names = ['Joe', 'Alice', 'Dave', 'Jason'] ParallelEnhancer.enhanceInstance(names) //The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(names.makeConcurrent())

import groovyx.gpars.ParallelEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

println 'Sequential: ' list.each { print it + ',' } println()

ParallelEnhancer.enhanceInstance(list)

println 'Sequential: ' list.each { print it + ',' } println()

list.asConcurrent { println 'Concurrent: ' list.each { print it + ',' } println()

} list.makeSequential()

println 'Sequential: ' list.each { print it + ',' } println()

避免函数中的副作用

我们必须提醒您。由于提供给并行方法(例如 eachParallel()collectParallel())的闭包可能并行运行,因此您必须确保每个闭包都以线程安全的方式编写。闭包不得持有任何内部状态、共享数据或具有超出调用其单个元素边界的副作用。违反这些规则将为竞争条件和死锁打开大门,它们是现代多核程序员最严重的敌人。

不要这样做

def thumbnails = []
images.eachParallel {thumbnails << it.thumbnail}  //Concurrently accessing a not-thread-safe collection of thumbnails, don't do this!
至少,您已被警告。

因为 GParsPool 使用 Fork/Join 池(带工作窃取),即使线程可能看起来处于空闲状态,它们也可能不会应用于等待处理的任务。使用工作窃取算法,用完任务的工作线程可以从仍然忙碌的其他线程中窃取任务。

如果您使用 GParsExecutorsPool,它不使用 Fork/Join,则您会获得您天真地期望的线程分配行为。

3.1.2 GParsExecutorsPool

使用 GParsExecutorsPool - 基于 Java Executors 的并发集合处理器

GParsExecutorsPool 的用法

GParsExecutorsPool 类为集合和对象提供了基于 Java Executors 的并发 DSL。

GParsExecutorsPool 类可用作纯 JDK 基于的集合并行处理器。与 GParsPool 类不同,GParsExecutorsPool 不需要 jsr-166y jar 文件,但利用标准 JDK 执行器服务来并行化迭代处理集合或对象的闭包。但是,需要说明的是,GParsPool 的性能通常比 GParsExecutorsPool 好得多。

使用示例

//multiply numbers asynchronously
 GParsExecutorsPool.withPool {
     Collection<Future> result = [1, 2, 3, 4, 5].collectParallel{it * 10}
     assert new HashSet([10, 20, 30, 40, 50]) == new HashSet((Collection)result*.get())
 }

//multiply numbers asynchronously using an asynchronous closure GParsExecutorsPool.withPool { def closure={it * 10} def asyncClosure=closure.async() Collection<Future> result = [1, 2, 3, 4, 5].collect(asyncClosure) assert new HashSet([10, 20, 30, 40, 50]) == new HashSet((Collection)result*.get()) }

传入的闭包以 ExecutorService 的实例作为参数,然后可以在闭包中自由使用。
//find an element meeting specified criteria
 GParsExecutorsPool.withPool(5) {ExecutorService service ->
     service.submit({performLongCalculation()} as Runnable)
 }
GParsExecutorsPool.withPool() 方法接受创建池的线程数和线程工厂的可选参数。
withPool(10) {...}
withPool(20, threadFactory) {...}

GParsExecutorsPool.withExistingPool() 接受一个已存在的执行器服务实例以供重用。DSL 仅在关联的代码块内有效,并且仅对调用了 withPool()withExistingPool() 方法的线程有效。 withPool() 方法只有在所有工作线程完成其任务并且执行器服务被销毁后才会返回,返回关联代码块的返回值。 withExistingPool() 方法不会等待执行器服务线程完成。

或者,可以静态导入 GParsExecutorsPoolimport static groovyx.gpars.GParsExecutorsPool.`*`,这将允许省略 GParsExecutorsPool 类名。

withPool {
     def result = [1, 2, 3, 4, 5].findParallel{Number number -> number > 2}
     assert result in [3, 4, 5]
 }
以下方法在所有支持 Groovy 中迭代的对象上都受支持
  • eachParallel()
  • eachWithIndexParallel()
  • collectParallel()
  • findAllParallel()
  • findParallel()
  • allParallel()
  • anyParallel()
  • grepParallel()
  • groupByParallel()

元类增强器

作为替代方案,可以使用 GParsExecutorsPoolEnhancer 类来增强任何类或单个实例的元类,并使用异步方法。

import groovyx.gpars.GParsExecutorsPoolEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9] GParsExecutorsPoolEnhancer.enhanceInstance(list) println list.collectParallel {it * 2 }

def animals = ['dog', 'ant', 'cat', 'whale'] GParsExecutorsPoolEnhancer.enhanceInstance animals println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found') println (animals.allParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')

使用 GParsExecutorsPoolEnhancer 类时,您无需使用 GParsExecutorsPool DSL 限制在 withPool() 块中。增强后的类或实例将一直保持增强状态,直到它们被垃圾回收。

异常处理

如果在处理任何传入的闭包时抛出异常,则将重新抛出包装所有原始异常的 AsyncException 实例,从 xxxParallel 方法抛出。

避免函数中的副作用

我们再次需要提醒您,不要使用具有副作用的闭包来影响超出当前处理元素范围的对象,或者保持状态的闭包。不要这样做!将它们传递给任何 xxxParallel() 方法都是危险的。

3.1.3 记忆化

memoize 函数允许缓存函数的返回值。对记忆化函数的重复调用,如果使用相同的参数值,将不会调用原始函数中编码的计算,而是从内部透明缓存中检索结果值。如果计算明显比从缓存中检索缓存值慢,这将允许用户以内存换取性能。查看示例,我们将尝试扫描多个网站以查找特定内容

GPars 的记忆化功能已在 Groovy 1.8 版本中贡献,如果您在 Groovy 1.8 或更高版本上运行,建议使用 Groovy 功能。GPars 中的记忆化几乎相同,只是它使用周围的线程池并发地搜索记忆化缓存,因此在某些情况下可能会带来性能优势。

GPars 记忆化功能已重命名,以避免与 Groovy 中的记忆化功能发生将来冲突。GPars 现在使用前缀字母 g 调用方法,例如 gmemoize()。

使用示例

GParsPool.withPool {
    def urls = ['http://www.dzone.com', 'http://www.theserverside.com', 'http://www.infoq.com']
    Closure download = {url ->
        println "Downloading $url"
        url.toURL().text.toUpperCase()
    }
    Closure cachingDownload = download.gmemoize()

println 'Groovy sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GROOVY')} println 'Grails sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRAILS')} println 'Griffon sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRIFFON')} println 'Gradle sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GRADLE')} println 'Concurrency sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('CONCURRENCY')} println 'GPars sites today: ' + urls.findAllParallel {url -> cachingDownload(url).contains('GPARS')} }

请注意,闭包在 GParsPool.withPool() 块中使用 memoize() 函数增强,该函数返回一个新的闭包,该闭包使用缓存包装原始闭包。在示例中,我们在代码中调用了 cachingDownload 函数,但是,每个唯一的 url 仅下载一次 - 第一次需要它时。然后缓存这些值,并可供后续调用使用。以及所有线程,无论哪个线程最初使用特定 url 的下载请求并必须处理实际计算/下载。

因此,总结一下,记忆化通过过去返回值的缓存来保护函数。但是,memoize 可以做得更多。在某些算法中,添加少量内存可能会对计算的计算复杂度产生巨大影响。让我们看一个经典的斐波那契数示例。

斐波那契示例

一个纯粹的函数式递归实现,紧密遵循斐波那契数的定义,是指数级的复杂度

Closure fib = {n -> n > 1 ? call(n - 1) + call(n - 2) : n}

尝试使用大约 30 的数字调用 fib 函数,您将看到它有多慢。

现在,稍加调整并添加记忆化缓存,算法神奇地变成了线性复杂度

Closure fib
fib = {n -> n > 1 ? fib(n - 1) + fib(n - 2) : n}.gmemoize()

我们添加的额外内存切断了除一个递归分支之外的所有分支。所有后续对相同 fib 函数的调用也将从缓存的值中受益。

此外,请参见下面,memoizeAtMost 变体如何在我们的示例中减少内存消耗,同时保留算法的线性复杂度。

可用变体

memoize

基本变体,它在记忆化函数的整个生命周期内将值保留在内部缓存中。提供所有变体中最佳的性能特征。

memoizeAtMost

允许用户为缓存的项目数量设置硬限制。一旦达到限制,所有随后添加的值将使用 LRU(最近最少使用)策略从缓存中消除最旧的值。

因此,对于我们的斐波那契数示例,我们可以安全地将缓存大小缩减为两个项目。

Closure fib
fib = {n -> n > 1 ? fib(n - 1) + fib(n - 2) : n}.memoizeAtMost(2)

对缓存大小设置上限可能有两个目的。

  1. 将缓存的内存占用量保持在定义的范围内。
  2. 保留函数所需的性能特征。过大的缓存可能需要更长的时间来检索缓存的值,而直接计算结果所花的时间更短。

memoizeAtLeast

允许内部缓存无限增长,直到 JVM 的垃圾收集器决定介入并从内存中逐出 SoftReferences(我们的实现使用)。memoizeAtLeast() 方法的单个参数值指定应保护免受 gc 逐出的最小缓存项目数。缓存永远不会缩小到指定的条目数以下。缓存确保它仅使用 LRU(最近最少使用)策略保护最近使用的项目免受逐出。

memoizeBetween

结合了 memoizeAtLeast 和 memoizeAtMost,因此允许缓存根据可用内存和 gc 活动在两个参数值之间的范围内增长和缩小,但缓存大小永远不会超过上限以保留缓存所需的性能特征。

3.2 地图减少

并行集合 Map/Reduce DSL 为 GPars 带来了更具功能性的风格。一般来说,Map/Reduce DSL 可用于与xxxParallel() 系列方法相同的目的,并且具有非常相似的语义。另一方面,如果您需要将多个方法链接起来以在多个步骤中处理单个集合,则 Map/Reduce 可以执行得快得多。
println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
            .map {it.toURL().text.toUpperCase()}
            .filter {it.contains('GROOVY')}
            .map{it.split()}
            .map{it.findAll{word -> word.contains 'GROOVY'}.size()}
            .sum()

xxxParallel() 方法必须遵循其非并行对等方法的约定。因此,collectParallel() 方法必须返回一个合法项目的集合,您可以再次将其视为 Groovy 集合。在内部,并行收集方法构建了一个高效的并行结构,称为并行数组,并发地执行所需的操作,并在返回之前销毁并行数组,构建要返回给您的结果集合。对结果集合进行 let say findAllParallel() 的潜在调用将重复在后台进行并行数组实例的构建和销毁的整个过程。

使用 Map/Reduce,您只需将集合转换为并行数组并返回一次。Map/Reduce 系列方法不返回 Groovy 集合,而是可以自由地直接传递内部并行数组。在集合上调用parallel 属性将为集合构建一个并行数组,并返回一个围绕并行数组实例的薄包装器。然后,您可以链接所有必需的方法,如

  • map()
  • reduce()
  • filter()
  • size()
  • sum()
  • min()
  • max()
  • sort()
  • groupBy()
  • combine()

返回到普通的 Groovy 集合实例始终只是检索collection 属性的问题。

def myNumbers = (1..1000).parallel.filter{it % 2 == 0}.map{Math.sqrt it}.collection

避免函数中的副作用

我们再次需要提醒您。为了避免出现意外情况,请保持传递给 Map/Reduce 函数的闭包无状态且没有副作用。

可用性

此功能仅在使用基于 Fork/Join 的GParsPool 时可用,在GParsExecutorsPool 中不可用。

经典示例

一个经典的示例,受 http://github.com/thevery 启发,计算字符串中单词的出现次数

import static groovyx.gpars.GParsPool.withPool

def words = "This is just a plain text to count words in" print count(words)

def count(arg) { withPool { return arg.parallel .map{[it, 1]} .groupBy{it[0]}.getParallel() .map {it.value=it.value.size();it} .sort{-it.value}.collection } }

同一个示例,现在用更通用的combine 操作实现

def words = "This is just a plain text to count words in"
print count(words)

def count(arg) { withPool { return arg.parallel .map{[it, 1]} .combine(0) {sum, value -> sum + value}.getParallel() .sort{-it.value}.collection } }

组合

combine 操作期望其输入是一个元组列表(两个元素列表),被认为是键值对(如 [key1, value1, key2, value2, key1, value3, key3, value4 … ]),其中可能包含重复键。调用时,combine 使用提供的累加器函数合并相同键的值,并生成一个映射原始(唯一)键与其累积值的映射。例如,[a, b, c, d, a, e, c, f] 将被组合成 a : b+e, c : d+f,而值上的 '+' 操作需要由用户作为累加闭包提供。

累加器函数 参数需要指定一个函数,用于组合(累加)属于同一个键的值。还需要提供初始累加器值。由于combine 方法并行处理项目,因此将多次重复使用初始累加器值。因此,提供的值必须允许重复使用。它应该是一个可克隆的不可变的值,或者是一个闭包,每次请求时返回一个新的初始累加器。累加器函数和可重复使用的初始值的良好组合包括

accumulator = {List acc, value -> acc << value} initialValue = []
accumulator = {List acc, value -> acc << value} initialValue = {-> []}
accumulator = {int sum, int value -> acc + value} initialValue = 0
accumulator = {int sum, int value -> sum + value} initialValue = {-> 0}
accumulator = {ShoppingCart cart, Item value -> cart.addItem(value)} initialValue = {-> new ShoppingCart()}

返回值类型为映射。例如,['he', 1, 'she', 2, 'he', 2, 'me', 1, 'she, 5, 'he', 1 与提供的初始值为 0 将被组合成 'he' : 4, 'she' : 7, 'he', : 2, 'me' : 1

键将使用它们的 equals 和 hashCode 方法相互比较。考虑使用@Canonical@EqualsAndHashCode 来注释用作键的类。就像 Groovy 中的所有哈希映射一样,请确保使用的是字符串而不是 GString 作为键!

对于更复杂的场景,当您combine() 复杂对象时,这里一个好的策略是拥有一个类,该类可以用作常见用例的键,并为不常见用例应用不同的键。

import groovy.transform.ToString
import groovy.transform.TupleConstructor

import static groovyx.gpars.GParsPool.withPool

TupleConstructor ToString class PricedCar implements Cloneable { String model String color Double price

boolean equals(final o) { if (this.is(o)) return true if (getClass() != o.class) return false

final PricedCar pricedCar = (PricedCar) o

if (color != pricedCar.color) return false if (model != pricedCar.model) return false

return true }

int hashCode() { int result result = (model != null ? model.hashCode() : 0) result = 31 * result + (color != null ? color.hashCode() : 0) return result }

@Override protected Object clone() { return super.clone() } }

def cars = [new PricedCar('F550', 'blue', 2342.223), new PricedCar('F550', 'red', 234.234), new PricedCar('Da', 'white', 2222.2), new PricedCar('Da', 'white', 1111.1)]

withPool { //Combine by model def result = cars.parallel.map { [it.model, it] }.combine(new PricedCar('', 'N/A', 0.0)) {sum, value -> sum.model = value.model sum.price += value.price sum }.values()

println result

//Combine by model and color (the PricedCar's equals and hashCode)) result = cars.parallel.map { [it, it] }.combine(new PricedCar('', 'N/A', 0.0)) {sum, value -> sum.model = value.model sum.color = value.color sum.price += value.price sum }.values()

println result }

3.3 并行数组

作为替代方案,可以直接使用 JSR-166y 中定义的高效树状数据结构。任何集合或对象上的parallelArray 属性将返回一个jsr166y.forkjoin.ParallelArray 实例,该实例保存原始集合的元素,然后可以通过 jsr166y API 对其进行操作。有关 API 详细信息,请参阅 jsr166y 文档。

import groovyx.gpars.extra166y.Ops

groovyx.gpars.GParsPool.withPool { assert 15 == [1, 2, 3, 4, 5].parallelArray.reduce({a, b -> a + b} as Ops.Reducer, 0) //summarize assert 55 == [1, 2, 3, 4, 5].parallelArray.withMapping({it ** 2} as Ops.Op).reduce({a, b -> a + b} as Ops.Reducer, 0) //summarize squares assert 20 == [1, 2, 3, 4, 5].parallelArray.withFilter({it % 2 == 0} as Ops.Predicate) //summarize squares of even numbers .withMapping({it ** 2} as Ops.Op) .reduce({a, b -> a + b} as Ops.Reducer, 0)

assert 'aa:bb:cc:dd:ee' == 'abcde'.parallelArray //concatenate duplicated characters with separator .withMapping({it * 2} as Ops.Op) .reduce({a, b -> "$a:$b"} as Ops.Reducer, "")

3.4 异步调用

在后台运行长时间运行的任务属于活动,对这些任务的需求非常频繁。您的主执行线程希望初始化一些计算、下载、搜索或类似的操作,但是,可能不需要立即获得结果。GPars 为开发人员提供了工具,可以安排在后台处理异步活动,并在需要时收集结果。

GParsPool 和 GParsExecutorsPool 异步处理功能的使用

GParsPoolGParsExecutorsPool 都在此领域提供几乎相同的服务,尽管它们利用了不同的底层机制,这取决于用户选择这两个类中的哪一个。

闭包增强

以下方法已添加到GPars(Executors)Pool.withPool() 块内的闭包中。

  • async() - 创建提供的闭包的异步变体,该变体在调用时返回潜在返回值的未来。
  • callAsync() - 在单独的线程中调用闭包,提供给定的参数,返回潜在返回值的未来。

示例

GParsPool.withPool() {
    Closure longLastingCalculation = {calculate()}
    Closure fastCalculation = longLastingCalculation.async()  //create a new closure, which starts the original closure on a thread pool
    Future result=fastCalculation()                           //returns almost immediately
    //do stuff while calculation performs …
    println result.get()
}

GParsPool.withPool() {
    /**
     * The callAsync() method is an asynchronous variant of the default call() method to invoke a closure.
     * It will return a Future for the result value.
     */
    assert 6 == {it * 2}.call(3)
    assert 6 == {it * 2}.callAsync(3).get()
}

超时

callTimeoutAsync() 方法(接受 long 值或 Duration 实例)允许用户在给定时间间隔后取消计算。

{->
    while(true) {
        Thread.sleep 1000  //Simulate a bit of interesting calculation
        if (Thread.currentThread().isInterrupted()) break;  //We've been cancelled
    }
}.callTimeoutAsync(2000)

为了允许取消,异步运行的代码必须不断检查其自身线程的interrupted 标志,并在标志设置为 true 时停止计算。

执行器服务增强

ExecutorService 和 jsr166y.forkjoin.ForkJoinPool 类使用 <<(左移)运算符进行增强,以将任务提交到池中并返回结果的Future

示例

GParsExecutorsPool.withPool {ExecutorService executorService ->
    executorService << {println 'Inside parallel task'}
}

并行运行函数(闭包)

GParsPoolGParsExecutorsPool 类还提供便捷方法executeAsync()executeAsyncAndWait(),以便轻松异步运行多个闭包。

示例

GParsPool.withPool {
    assert [10, 20] == GParsPool.executeAsyncAndWait({calculateA()}, {calculateB()}         //waits for results
    assert [10, 20] == GParsPool.executeAsync({calculateA()}, {calculateB()})*.get()  //returns Futures instead and doesn't wait for results to be calculated
}

3.5 可组合的异步函数

函数应该被组合。事实上,组合无副作用函数非常容易。比组合对象容易得多,而且更可靠,例如。给定相同的输入,函数始终返回相同的结果,它们不会以意外的方式改变其行为,也不会在多个线程同时调用它们时崩溃。

Groovy 中的函数

我们可以将 Groovy 闭包视为函数。它们接受参数,执行计算并返回值。假设您不让自己闭包接触其作用域之外的任何内容,那么您的闭包将是行为良好的纯函数。您可以组合起来更好地使用的函数。
def sum = (0..100000).inject(0, {a, b -> a + b})
例如,通过将添加两个数字的函数与inject 函数组合,该函数迭代整个集合,您可以快速汇总所有项目。然后,用comparison 函数替换adding 函数将立即为您提供一个计算最大值的组合函数。
def max = myNumbers.inject(0, {a, b -> a>b?a:b})

您会发现,函数式编程之所以流行是有原因的。

我们并发了吗?

这一切都运行良好,直到您意识到您没有充分利用昂贵的硬件的全部能力。函数是纯粹的顺序。这里没有并行性。除了一个处理器核心外,所有核心都无所事事,它们处于空闲状态,完全被浪费了。
那些注意的人会建议使用前面描述的并行集合技术,他们当然是对的。对于我们这里描述的场景,我们处理集合,使用那些parallel 方法将是最佳选择。但是,我们现在正在寻找一种创建和组合异步函数的通用方法,这将不仅帮助我们进行集合处理,而且主要是在其他更通用的情况下,就像下面将要介绍的那样。
为了使事情更清楚,这里有一个组合四个函数的示例,这些函数应该检查特定的网页是否与本地文件的原始内容匹配。我们需要下载页面,加载文件,计算两者的哈希值,最后比较结果值。
Closure download = {String url ->
    url.toURL().text
}

Closure loadFile = {String fileName -> … //load the file here }

Closure hash = {s -> s.hashCode()}

Closure compare = {int first, int second -> first == second }

def result = compare(hash(download('http://www.gpars.org')), hash(loadFile('/coolStuff/gpars/website/index.html'))) println "The result of comparison: " + result

我们需要下载页面,加载文件,计算两者的哈希值,最后比较结果值。每个函数负责一项特定工作。一个下载内容,第二个加载文件,第三个计算哈希值,最后第四个进行比较。组合函数就像嵌套它们的调用一样简单。

使这一切都异步

我们代码的缺点是我们没有利用download()loadFile() 函数的独立性。我们也不允许两个哈希值同时运行。它们完全可以并行运行,但我们组合函数的方式限制了任何并行性。

显然,并非所有函数都可以同时运行。有些函数依赖于其他函数的结果。它们不能在另一个函数完成之前开始。我们需要阻塞它们,直到它们的参数可用。hash() 函数需要一个字符串来处理。compare() 函数需要两个数字进行比较。

因此,我们只能并行化一些函数,而阻塞其他函数的并行性。看起来这是一个具有挑战性的任务。

函数式世界一片光明

幸运的是,函数之间的依赖关系已经在代码中隐式地表达出来了。我们不需要重复依赖关系信息。如果一个函数接受参数,而这些参数需要先由另一个函数计算,那么我们隐式地在这里存在依赖关系。在我们的例子中,hash() 函数依赖于 loadFile() 以及 download() 函数。在我们之前的例子中,inject 函数依赖于对集合中所有元素逐个调用 addition 函数的结果。

尽管一开始可能看起来很困难,但我们的任务实际上很简单。我们只需要教会我们的函数返回其未来结果的promises。我们还需要教会其他函数接受这些promises 作为参数,以便它们在开始工作之前等待真实的值。如果我们说服函数在等待值时释放它们持有的线程,我们就会直接到达魔法发生的地方。

秉承GPars 的优良传统,我们让您说服任何函数相信其他函数的承诺变得非常简单。在闭包上调用 asyncFun() 函数,您就异步了。

withPool {
    def maxPromise = numbers.inject(0, {a, b -> a>b?a:b}.asyncFun())
    println "Look Ma, I can talk to the user while the math is being done for me!"
    println maxPromise.get()
}

inject 函数并不关心 addition 函数返回什么对象,也许它只是有点惊讶每次调用 addition 函数都返回得如此快,但并没有抱怨太多,继续迭代,最终将最终结果返回给您。

现在,这是您应该坚持自己所说并做您想让别人做的事情的时候了。不要对结果皱眉,只要接受您得到了一个承诺。一个承诺,在计算完成时尽快交付结果。从您的笔记本电脑中散发出的额外热量表明,该计算利用了您函数中的自然并行性,并尽最大努力快速将结果交付给您。

promise 是一个老式的 DataflowVariable,因此您可以查询其状态、注册通知钩子或将其作为数据流算法的输入。

withPool {
    def sumPromise = (0..100000).inject(0, {a, b -> a + b}.asyncFun())
    println "Are we done yet? " + sumPromise.bound
    sumPromise.whenBound {sum -> println sum}
}

get() 方法还具有一个带有超时参数的变体,如果您想避免无限期等待的风险。

事情会出错吗?

当然。但是您会从结果承诺 get() 方法中抛出一个异常。

try {
    sumPromise.get()
} catch (MyCalculationException e) {
    println "Guess, things are not ideal today."
}

这一切都很好,但是哪些函数可以真正组合起来呢?

没有限制。您可以将任何需要组合的顺序函数组合起来,并且也应该能够组合它们的异步变体。

回到我们最初的例子,比较文件内容和网页内容,我们只需通过对所有函数调用 asyncFun() 方法将它们全部设为异步,我们就可以出发了。

Closure download = {String url ->
        url.toURL().text
    }.asyncFun()

Closure loadFile = {String fileName -> … //load the file here }.asyncFun()

Closure hash = {s -> s.hashCode()}.asyncFun()

Closure compare = {int first, int second -> first == second }.asyncFun()

def result = compare(hash(download('http://www.gpars.org')), hash(loadFile('/coolStuff/gpars/website/index.html'))) println 'Allowed to do something else now' println "The result of comparison: " + result.get()

从异步函数内部调用异步函数

异步函数的另一个非常有价值的特性是,它们的 result promises 也可以被组合。

import static groovyx.gpars.GParsPool.withPool

withPool { Closure plus = {Integer a, Integer b -> sleep 3000 println 'Adding numbers' a + b }.asyncFun()

Closure multiply = {Integer a, Integer b -> sleep 2000 a * b }.asyncFun()

Closure measureTime = {-> sleep 3000 4 }.asyncFun()

Closure distance = {Integer initialDistance, Integer velocity, Integer time -> plus(initialDistance, multiply(velocity, time)) }.asyncFun()

Closure chattyDistance = {Integer initialDistance, Integer velocity, Integer time -> println 'All parameters are now ready - starting' println 'About to call another asynchronous function' def innerResultPromise = plus(initialDistance, multiply(velocity, time)) println 'Returning the promise for the inner calculation as my own result' return innerResultPromise }.asyncFun()

println "Distance = " + distance(100, 20, measureTime()).get() + ' m' println "ChattyDistance = " + chattyDistance(100, 20, measureTime()).get() + ' m' }

如果一个异步函数(例如示例中的 distance 函数)在其主体中调用另一个异步函数(例如 plus)并返回调用的函数的 promise,则内部函数(plus)的结果 promise 将与外部函数(distance)的结果 promise 组合。内部函数(plus)现在将将其结果绑定到外部函数(distance)的 promise 上,一旦内部函数(plus)完成计算。这种 promises 组合的能力允许函数在等待参数时以及在函数体内任何地方调用另一个异步函数时停止计算,而不会阻塞线程。

方法作为异步函数

方法可以使用 .& 运算符引用为闭包。然后,这些闭包可以使用 asyncFun 转换为可组合的异步函数,就像普通的闭包一样。

class DownloadHelper {
    String download(String url) {
        url.toURL().text
    }

int scanFor(String word, String text) { text.findAll(word).size() }

String lower(s) { s.toLowerCase() } } //now we'll make the methods asynchronous withPool { final DownloadHelper d = new DownloadHelper() Closure download = d.&download.asyncFun() Closure scanFor = d.&scanFor.asyncFun() Closure lower = d.&lower.asyncFun()

//asynchronous processing def result = scanFor('groovy', lower(download('http://www.infoq.com'))) println 'Allowed to do something else now' println result.get() }

使用注解创建异步函数

与其调用 asyncFun() 函数,不如使用 @AsyncFun 注解来注释 Closure 类型的字段。这些字段必须在原地初始化,并且包含类需要在 withPool 块中实例化。

import static groovyx.gpars.GParsPool.withPool
import groovyx.gpars.AsyncFun

class DownloadingSearch { @AsyncFun Closure download = {String url -> url.toURL().text }

@AsyncFun Closure scanFor = {String word, String text -> text.findAll(word).size() }

@AsyncFun Closure lower = {s -> s.toLowerCase()}

void scan() { def result = scanFor('groovy', lower(download('http://www.infoq.com'))) //synchronous processing println 'Allowed to do something else now' println result.get() } }

withPool { new DownloadingSearch().scan() }

替代池

AsyncFun 注解默认情况下使用来自包装 withPool 块的 GParsPool 实例。但是,您也可以显式指定池类型。

@AsyncFun(GParsExecutorsPoolUtil) def sum6 = {a, b -> a + b }

通过注解阻塞函数

AsyncFun 还允许用户指定生成的函数应该具有阻塞(true)还是非阻塞(false - 默认)语义。

@AsyncFun(blocking = true)
def sum = {a, b -> a + b }

显式和延迟池分配

当直接使用 GPars(Executors)PoolUtil.asyncFun() 函数创建异步函数时,您有两个额外的选项将线程池分配给函数。

  1. 可以在创建时将函数要使用的线程池作为附加参数显式指定。
  2. 可以从周围的范围内在调用时而不是在创建时获取隐式线程池。

当显式指定线程池时,调用不需要包装在 withPool() 块中。

Closure sPlus = {Integer a, Integer b ->
    a + b
}

Closure sMultiply = {Integer a, Integer b -> sleep 2000 a * b }

println "Synchronous result: " + sMultiply(sPlus(10, 30), 100)

final pool = new FJPool()

Closure aPlus = GParsPoolUtil.asyncFun(sPlus, pool) Closure aMultiply = GParsPoolUtil.asyncFun(sMultiply, pool)

def result = aMultiply(aPlus(10, 30), 100)

println "Time to do something else while the calculation is running" println "Asynchronous result: " + result.get()

对于延迟池分配,只有函数调用必须用 withPool() 块包围。

Closure aPlus = GParsPoolUtil.asyncFun(sPlus)
Closure aMultiply = GParsPoolUtil.asyncFun(sMultiply)

withPool { def result = aMultiply(aPlus(10, 30), 100)

println "Time to do something else while the calculation is running" println "Asynchronous result: " + result.get() }

在我们看来,这是一个非常有趣的探索领域,因此欢迎您对组合异步函数的任何评论、问题或建议,或者关于其限制的提示。

3.6 Fork-Join

Fork/Join 或分治是一种非常强大的抽象,可以解决层次问题。

抽象

当谈到层次问题时,想想快速排序、归并排序、文件系统或一般树导航等。

  • Fork/Join 算法本质上将手头的問題分成几个较小的子问题,并将相同算法递归地应用于每个子问题。
  • 一旦子问题足够小,就会直接解决。
  • 所有子问题的解决方案组合起来解决它们父问题,进而帮助解决其自身的父问题。

查看漂亮的交互式 Fork/Join 可视化演示,它将向您展示线程如何协作解决常见的划分和征服算法。

强大的JSR-166y 库为我们很好地解决了 Fork/Join 编排问题,但留下了几个粗糙的边缘,如果您没有足够注意,可能会伤害您。您仍然要处理线程、池或同步屏障。

GPars 抽象便利层

GPars 可以隐藏您处理线程、池和递归任务的复杂性,但仍可以让您利用 jsr166y 中强大的 Fork/Join 实现。

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

withPool() { println """Number of files: ${ runForkJoin(new File("./src")) {file -> long count = 0 file.eachFile { if (it.isDirectory()) { println "Forking a child task for $it" forkOffChild(it) //fork a child task } else { count++ } } return count + (childrenResults.sum(0)) //use results of children tasks to calculate and store own result } }""" }

runForkJoin() 工厂方法将使用提供的递归代码以及提供的 value,构建一个层次化的 Fork/Join 计算。传递给 runForkJoin() 方法的值数量必须与闭包的预期参数数量以及传递给 forkOffChild()runChildDirectly() 方法的参数数量相匹配。

def quicksort(numbers) {
    withPool {
        runForkJoin(0, numbers) {index, list ->
            def groups = list.groupBy {it <=> list[list.size().intdiv(2)]}
            if ((list.size() < 2) || (groups.size() == 1)) {
                return [index: index, list: list.clone()]
            }
            (-1..1).each {forkOffChild(it, groups[it] ?: [])}
            return [index: index, list: childrenResults.sort {it.index}.sum {it.list}]
        }.list
    }
}

这里需要提到的一个重要部分是,forkOffChild() 不会等待子任务运行。它只是安排它在将来的某个时间执行。如果一个子任务由于抛出异常而失败,您不应该期望异常从 forkOffChild() 方法本身抛出。异常很可能在父任务从调用 forkOffChild() 方法返回后很久才会发生。

它是 getChildrenResults() 方法,它将重新抛出子任务中发生的异常回父任务。

替代方法

或者,可以直接使用嵌套 Fork/Join 工作者任务的底层机制。定制的工作者可以消除使用通用工作者时参数扩散带来的性能开销。此外,自定义工作者可以在 Java 中实现,从而进一步提高算法的性能。

public final class FileCounter extends AbstractForkJoinWorker<Long> {
    private final File file;

def FileCounter(final File file) { this.file = file }

@Override protected Long computeTask() { long count = 0; file.eachFile { if (it.isDirectory()) { println "Forking a thread for $it" forkOffChild(new FileCounter(it)) //fork a child task } else { count++ } } return count + ((childrenResults)?.sum() ?: 0) //use results of children tasks to calculate and store own result } }

withPool(1) {pool -> //feel free to experiment with the number of fork/join threads in the pool println "Number of files: ${runForkJoin(new FileCounter(new File("..")))}" }

AbstractForkJoinWorker 子类可以在 Java 或 Groovy 中编写,让您能够轻松优化执行速度,如果工作者的行性能成为瓶颈。

Fork/Join 节省您的资源

由于内部使用 TaskBarrier 类来同步线程,因此 Fork/Join 操作可以安全地用少量线程运行。当一个线程在算法内部被阻塞,等待其子问题被计算时,该线程会被静默地返回到池中,以承担任务队列中的任何可用子问题并处理它们。尽管该算法创建了与子目录一样多的任务,并且任务会等待子目录任务完成,但只需要一个线程就足以让计算继续进行,并最终计算出有效的结果。

归并排序示例

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]

withPool(3) { //feel free to experiment with the number of fork/join threads in the pool println """Sorted numbers: ${ runForkJoin(numbers) {nums -> println "Thread ${Thread.currentThread().name[-1]}: Sorting $nums" switch (nums.size()) { case 0..1: return nums //store own result case 2: if (nums[0] <= nums[1]) return nums //store own result else return nums[-1..0] //store own result default: def splitList = split(nums) [splitList[0], splitList[1]].each {forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }""" }

使用定制工作者类的归并排序示例

public final class SortWorker extends AbstractForkJoinWorker<List<Integer>> {
    private final List numbers

def SortWorker(final List<Integer> numbers) { this.numbers = numbers.asImmutable() }

/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

/** * Sorts a small list or delegates to two children, if the list contains more than two elements. */ @Override protected List<Integer> computeTask() { println "Thread ${Thread.currentThread().name[-1]}: Sorting $numbers" switch (numbers.size()) { case 0..1: return numbers //store own result case 2: if (numbers[0] <= numbers[1]) return numbers //store own result else return numbers[-1..0] //store own result default: def splitList = split(numbers) [new SortWorker(splitList[0]), new SortWorker(splitList[1])].each{forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }

final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]

withPool(1) { //feel free to experiment with the number of fork/join threads in the pool println "Sorted numbers: ${runForkJoin(new SortWorker(numbers))}" }

直接运行子任务

forkOffChild() 方法有一个兄弟姐妹 - runChildDirectly() 方法,它将在当前线程中直接且立即运行子任务,而不是将子任务调度到线程池中进行异步处理。通常,您将在除最后一个子任务以外的所有子任务上调用 _forkOffChild(),最后一个子任务直接调用,没有调度开销。

Closure fib = {number ->
            if (number <= 2) {
                return 1
            }
            forkOffChild(number - 1)                            //  This task will run asynchronously, probably in a different thread
            final def result = runChildDirectly(number - 2)     //  This task is run directly within the current thread
            return (Integer) getChildrenResults().sum() + result
        }

withPool { assert 55 == runForkJoin(10, fib) }

可用性

此功能仅在使用基于 Fork/Join 的GParsPool 时可用,在GParsExecutorsPool 中不可用。

3.7 并行推测

随着处理器内核变得越来越丰富,某些算法可能会从暴力并行复制中受益。您不必事先决定如何解决问题、使用什么算法或连接到哪个位置,而是并行运行所有可能的解决方案。

并行推测

假设您需要执行一项任务,例如计算一个昂贵的函数或从文件、数据库或互联网读取数据。幸运的是,您知道几种好方法(例如函数或 URL)来实现您的目标。然而,它们并不完全相同。尽管它们返回相同的结果(就您的需求而言),但它们完成所需的时间可能各不相同,其中一些甚至可能失败(例如网络问题)。更糟糕的是,没有人会告诉你哪条路径最先给你解决方案,或者哪条路径根本没有解决方案。我应该在我的列表上运行 快速排序 还是 归并排序?哪个 URL 最好用?这项服务在其主位置可用,还是我应该使用备份位置?

GPars 推测为您提供了并行尝试所有可用备选方案的选项,从而从最快的功能路径获取结果,并静默地忽略缓慢或损坏的路径。

这就是 GParsPoolGParsExecutorsPool() 上的 speculate() 方法可以做到的。

def numbers = …
def quickSort = …
def mergeSort = …
def sortedNumbers = speculate(quickSort, mergeSort)

在这里,我们同时执行 快速排序归并排序 并发,同时获取速度更快的一个的结果。鉴于当今主流硬件上提供的并行资源,并行运行这两个函数不会对任一函数的计算速度产生重大影响,因此我们在大约与仅运行两个计算中速度更快的一个相同的时间内获得了结果。并且我们比运行速度较慢的一个更快地获得了结果。然而,我们不必事先知道两种排序算法中哪一种在我们的数据上表现更好。因此我们进行了推测。

同样,从多个速度和可靠性不同的源下载文档将如下所示

import static groovyx.gpars.GParsPool.speculate
import static groovyx.gpars.GParsPool.withPool

def alternative1 = { 'http://www.dzone.com/links/index.html'.toURL().text }

def alternative2 = { 'http://www.dzone.com/'.toURL().text }

def alternative3 = { 'http://www.dzzzzzone.com/'.toURL().text //wrong url }

def alternative4 = { 'http://dzone.com/'.toURL().text }

withPool(4) { println speculate([alternative1, alternative2, alternative3, alternative4]).contains('groovy') }

确保周围的线程池有足够的线程来并行处理所有备选方案。池的大小应该与提供的闭包数量相匹配。

使用数据流变量和流的备选方案

在不需要停止不成功的备选方案的情况下,可以使用数据流变量或流从获胜的推测中获取结果值。

有关数据流变量和流的详细信息,请参阅用户指南中的数据流并发部分。

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.task

def alternative1 = { 'http://www.dzone.com/links/index.html'.toURL().text }

def alternative2 = { 'http://www.dzone.com/'.toURL().text }

def alternative3 = { 'http://www.dzzzzzone.com/'.toURL().text //will fail due to wrong url }

def alternative4 = { 'http://dzone.com/'.toURL().text }

//Pick either one of the following, both will work: final def result = new DataflowQueue() // final def result = new DataflowVariable()

[alternative1, alternative2, alternative3, alternative4].each {code -> task { try { result << code() } catch (ignore) { } //We deliberately ignore unsuccessful urls } }

println result.val.contains('groovy')

4 Groovy CSP

CSP(通信顺序进程)抽象建立在独立的可组合进程之上,这些进程以同步的方式交换消息。GPars 利用了英国肯特大学开发的JCSP 库

GPars 中 CSP 实现的作者 Jon Kerridge 在他的网站上提供了关于 GroovyCSP 使用的详尽示例:

GroovyCSP 实现利用了 JCSP,一个基于 Java 的 CSP 库,该库在 LGPL 许可下授权。Apache 2 许可证(GPars 使用的许可证)与 LGPL 之间存在一些差异。请确保您的应用程序符合 LGPL 规则,然后再在您的代码中启用 JCSP 的使用。

如果 LGPL 许可证不适合您的使用,您可能需要考虑查看本用户指南的数据流并发章节,了解关于 任务选择器运算符 的知识,这些知识可能有助于您以类似于 CSP 方法的方式解决并发问题。事实上,GPars 中实现的数据流和 CSP 概念非常接近。

默认情况下,如果您没有在构建文件中显式添加对 JCSP 的依赖关系,或者没有下载并将 JCSP jar 文件包含在您的项目中,则您的项目将适用标准的商业软件友好型 Apache 2 许可证条款。GPars 仅直接依赖于在与 Apache 2 许可证兼容的许可证下授权的软件。

CSP 模型原则

从本质上讲,CSP 模型建立在独立的并发进程之上,这些进程通过使用同步(即会合)消息传递的通道相互通信。与围绕事件处理模式的 actor 或数据流运算符不同,CSP 进程专注于它们的活动(又名步骤序列),并使用通信在整个过程中保持相互同步。

由于寻址是通过通道间接进行的,因此进程不需要了解彼此。它们通常由一组输入和输出通道以及一个主体组成。一旦 CSP 进程启动,它就会从线程池中获取一个线程并开始处理它的主体,仅在从通道读取或写入通道时暂停。一些实现(例如 GoLang)还可以从 CSP 进程中分离线程,使其在通道阻塞时处于阻塞状态。

CSP 程序是确定性的。程序输入上的相同数据将始终生成相同的输出,而与实际使用的线程调度方案无关。这在调试 CSP 程序以及分析死锁时非常有帮助。

确定性和间接寻址相结合,使 CSP 进程具有高度的可组合性。您可以通过连接它们的输入和输出通道,然后将它们包装在另一个更大的包含进程中,将小的 CSP 进程组合成更大的进程。

CSP 模型使用 备选方案 引入非确定性。一个进程可以通过称为 备选方案选择 的结构,尝试同时从多个通道读取值。在参与 选择 的任何通道中第一个可用的值将被进程读取和使用。由于通过 选择 收到的消息的顺序取决于程序运行时的不可预测条件,因此将读取的值是非确定性的。

GPars 数据流中的 CSP

GPars 提供了创建 CSP 进程所需的所有构建块。

  • CSP 进程 可以通过 GPars 任务使用 闭包RunnableCallable 来建模,以保存进程的实际实现
  • CSP 通道 应该使用 SyncDataflowQueueSyncDataflowBroadcast 类来建模
  • CSP 备选方案 通过 Select 类及其 selectprioritySelect 方法提供

进程

要启动一个进程,只需使用 task 工厂方法。

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.ResizeablePool

group = new DefaultPGroup(new ResizeablePool(true))

def t = group.task { println "I am a process" }

t.join()

由于每个进程在其生命周期中都会消耗一个线程,因此建议使用可调整大小的线程池,如上面的示例所示。

也可以从 Runnable 或 Callable 对象创建进程

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.ResizeablePool

group = new DefaultPGroup(new ResizeablePool(true))

class MyProcess implements Runnable {

@Override void run() { println "I am a process" } } def t = group.task new MyProcess()

t.join()

使用 Callable 允许通过 get() 方法返回值

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.ResizeablePool

import java.util.concurrent.Callable

group = new DefaultPGroup(new ResizeablePool(true))

class MyProcess implements Callable<String> {

@Override String call() { println "I am a process" return "CSP is great!" } } def t = group.task new MyProcess()

println t.get()

通道

进程通常需要通道来与其他进程以及外部世界通信

import groovy.transform.TupleConstructor
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.ResizeablePool

import java.util.concurrent.Callable import groovyx.gpars.dataflow.SyncDataflowQueue

group = new DefaultPGroup(new ResizeablePool(true))

@TupleConstructor class Greeter implements Callable<String> { DataflowReadChannel names DataflowWriteChannel greetings

@Override String call() { while(!Thread.currentThread().isInterrupted()) { String name = names.val greetings << "Hello " + name } return "CSP is great!" } }

def a = new SyncDataflowQueue() def b = new SyncDataflowQueue()

group.task new Greeter(a, b)

a << "Joe" a << "Dave" println b.val println b.val

CSP 模型使用同步消息传递,但是,在 GPars 中,您可以考虑使用异步通道以及同步通道。您也可以在一个进程中组合这两种类型的通道。

组合

对进程进行分组只是将它们与通道连接起来的问题

group = new DefaultPGroup(new ResizeablePool(true))

@TupleConstructor class Formatter implements Callable<String> { DataflowReadChannel rawNames DataflowWriteChannel formattedNames

@Override String call() { while(!Thread.currentThread().isInterrupted()) { String name = rawNames.val formattedNames << name.toUpperCase() } } }

@TupleConstructor class Greeter implements Callable<String> { DataflowReadChannel names DataflowWriteChannel greetings

@Override String call() { while(!Thread.currentThread().isInterrupted()) { String name = names.val greetings << "Hello " + name } } }

def a = new SyncDataflowQueue() def b = new SyncDataflowQueue() def c = new SyncDataflowQueue()

group.task new Formatter(a, b) group.task new Greeter(b, c)

a << "Joe" a << "Dave" println c.val println c.val

备选方案

为了引入非确定性,GPars 提供了 Select 类及其 selectprioritySelect 方法

import groovy.transform.TupleConstructor
import groovyx.gpars.dataflow.SyncDataflowQueue
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.Select
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.ResizeablePool

import static groovyx.gpars.dataflow.Dataflow.select

group = new DefaultPGroup(new ResizeablePool(true))

@TupleConstructor class Receptionist implements Runnable { DataflowReadChannel emails DataflowReadChannel phoneCalls DataflowReadChannel tweets DataflowWriteChannel forwardedMessages

private final Select incomingRequests = select([phoneCalls, emails, tweets]) //prioritySelect() would give highest precedence to phone calls

@Override void run() { while(!Thread.currentThread().isInterrupted()) { String msg = incomingRequests.select() forwardedMessages << msg.toUpperCase() } } }

def a = new SyncDataflowQueue() def b = new SyncDataflowQueue() def c = new SyncDataflowQueue() def d = new SyncDataflowQueue()

group.task new Receptionist(a, b, c, d)

a << "my email" b << "my phone call" c << "my tweet"

//The values come in random order since the process uses a Select to read its input 3.times{ println d.val.value }

组件

CSP 进程可以组合成更大的实体。假设您已经有一组 CSP 进程(又名 Runnable/Callable 类),您可以将它们组合成一个更大的进程

final class Prefix implements Callable {
    private final DataflowChannel inChannel
    private final DataflowChannel outChannel
    private final def prefix

def Prefix(final inChannel, final outChannel, final prefix) { this.inChannel = inChannel; this.outChannel = outChannel; this.prefix = prefix }

public def call() { outChannel << prefix while (true) { sleep 200 outChannel << inChannel.val } } }

final class Copy implements Callable {
    private final DataflowChannel inChannel
    private final DataflowChannel outChannel1
    private final DataflowChannel outChannel2

def Copy(final inChannel, final outChannel1, final outChannel2) { this.inChannel = inChannel; this.outChannel1 = outChannel1; this.outChannel2 = outChannel2; }

public def call() { final PGroup group = Dataflow.retrieveCurrentDFPGroup() while (true) { def i = inChannel.val group.task { outChannel1 << i outChannel2 << i }.join() } } }

import groovyx.gpars.dataflow.DataflowChannel
import groovyx.gpars.dataflow.SyncDataflowQueue
import groovyx.gpars.group.DefaultPGroup

group = new DefaultPGroup(6)

def fib(DataflowChannel out) { group.task { def a = new SyncDataflowQueue() def b = new SyncDataflowQueue() def c = new SyncDataflowQueue() def d = new SyncDataflowQueue() [new Prefix(d, a, 0L), new Prefix(c, d, 1L), new Copy(a, b, out), new StatePairs(b, c)].each { group.task it} } }

final SyncDataflowQueue ch = new SyncDataflowQueue() group.task new Print('Fibonacci numbers', ch) fib(ch)

sleep 10000

5 Actor

GPars 中的 actor 支持最初受到 Scala 中的 Actors 库的启发,但现在已经远远超出了 Scala 作为标准提供的功能。

Actor 允许基于消息传递的并发模型:程序是独立活动对象的集合,它们交换消息,并且没有可变的共享状态。Actor 可以帮助开发人员避免死锁、活锁和饥饿等问题,这些问题是基于共享内存的方法的常见问题。Actor 是一种利用当今硬件的多核性质而不会出现与共享内存多线程相关的所有传统问题的方法,这就是为什么 Erlang 和 Scala 等编程语言采用了这种模型。

Ruben Vermeersch 最近撰写了一篇总结 actor 关键概念 的好文章。Actor 始终保证最多只有一个线程处理 actor 的主体,并且在幕后也保证内存每次分配给 actor 时都会同步,因此 actor 的状态可以由主体中的代码安全地修改而无需任何其他额外(同步或锁定)工作。理想情况下,actor 的代码应该从外部直接调用,因此 actor 类的所有代码只能由处理最后接收的消息的线程执行,因此 actor 的所有代码都是隐式线程安全的。如果允许其他对象直接调用 actor 的任何方法,则 actor 的代码和状态的线程安全保证将不再有效

Actor 类型

一般来说,您可以在野外找到两种类型的 actor - 具有隐式状态的 actor 和没有隐式状态的 actor。GPars 为您提供了这两种选择。无状态 actor,在GPars 中由 DynamicDispatchActorReactiveActor 类表示,不跟踪之前到达的消息。您可以将它们视为扁平的消息处理程序,它们会按顺序处理消息。任何基于状态的行为都必须由用户实现。

DefaultActor 类(以前也由 AbstractPooledActor 类)在 GPars 中表示的有状态 actor 允许用户直接处理隐式状态。在接收到消息后,actor 会进入一个新的状态,并以不同的方式处理未来的消息。举个例子,一个新启动的 actor 可能只接受某些类型的消息,例如,只有在它接收到加密密钥后才能接受用于解密的加密消息。有状态 actor 允许直接在消息处理代码的结构中编码这种依赖关系。但是,隐式状态管理会带来轻微的性能成本,这主要是由于 JVM 上缺乏对延续的支持。

Actor 线程模型

由于 actor 与系统线程分离,因此大量 actor 可以共享一个相对较小的线程池。这可以扩展到具有许多并发 actor 共享一个池化线程的情况。这种架构可以避免 JVM 的一些线程限制。一般来说,虽然 JVM 只能为您提供有限数量的线程(通常约为几千个),但 actor 的数量仅受可用内存的限制。如果 actor 没有工作要做,它就不会消耗线程。

actor 代码在等待新事件(消息)的静默期间由块处理。这可以通过 延续 自然地建模。由于 JVM 不直接支持延续,因此必须在 actor 框架中模拟它们,这对 actor 代码的组织有一定的影响。但是,在大多数情况下,好处大于困难。

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.DefaultActor

class GameMaster extends DefaultActor { int secretNum

void afterStart() { secretNum = new Random().nextInt(10) }

void act() { loop { react { int num -> if (num > secretNum) reply 'too large' else if (num < secretNum) reply 'too small' else { reply 'you win' terminate() } } } } }

class Player extends DefaultActor { String name Actor server int myNum

void act() { loop { myNum = new Random().nextInt(10) server.send myNum react { switch (it) { case 'too large': println "$name: $myNum was too large"; break case 'too small': println "$name: $myNum was too small"; break case 'you win': println "$name: I won $myNum"; terminate(); break } } } } }

def master = new GameMaster().start() def player = new Player(name: 'Player', server: master).start()

//this forces main thread to live until both actors stop [master, player]*.join()

示例作者:Jordi Campos i Miralles,巴塞罗那大学,数学系,MAiA 数学系

Actor 的使用

Gpars 提供一致的 Actor API 和 DSL。Actor 主要执行三个特定操作 - 发送消息、接收消息和创建新 actor。虽然没有被GPars 特别强制执行,但消息应该是不可变的,或者至少在发送者在消息发送后不再触碰消息时遵循 hands-off 策略。

发送消息

可以使用 send() 方法将消息发送给 actor。

def passiveActor = Actors.actor{
    loop {
        react { msg -> println "Received: $msg"; }
    }
}
passiveActor.send 'Message 1'
passiveActor << 'Message 2'    //using the << operator
passiveActor 'Message 3'       //using the implicit call() method

或者,可以使用 << 运算符或隐式 call() 方法。可以使用 sendAndWait() 方法系列来阻塞调用方,直到收到 actor 的回复。 reply 将作为返回值从 sendAndWait() 方法返回。 sendAndWait() 方法也可以在超时到期或被调用 actor 终止时返回。

def replyingActor = Actors.actor{
    loop {
        react { msg ->
            println "Received: $msg";
            reply "I've got $msg"
        }
    }
}
def reply1 = replyingActor.sendAndWait('Message 4')
def reply2 = replyingActor.sendAndWait('Message 5', 10, TimeUnit.SECONDS)
use (TimeCategory) {
    def reply3 = replyingActor.sendAndWait('Message 6', 10.seconds)
}

sendAndContinue() 方法允许调用方在提供的闭包等待 actor 的回复时继续其处理。

friend.sendAndContinue 'I need money!', {money -> pocket money}
println 'I can continue while my friend is collecting money for me'

sendAndPromise() 方法返回对最终回复的 Promise(又名 Future),因此允许调用方在 actor 处理提交的消息时继续其处理。

Promise loan = friend.sendAndPromise 'I need money!'
println 'I can continue while my friend is collecting money for me'
loan.whenBound {money -> pocket money}  //asynchronous waiting for a reply
println "Received ${loan.get()}"  //synchronous waiting for a reply

所有 send()sendAndWait()sendAndContinue() 方法如果在非活动 actor 上调用,都会抛出异常。

接收消息

非阻塞消息检索

在 actor 的代码中调用 react() 方法(可选地使用超时参数)将从 actor 的收件箱中使用下一条消息,如果立即没有消息要处理,可能会等待。

println 'Waiting for a gift'
react {gift ->
    if (myWife.likes gift) reply 'Thank you!'
}

在幕后,提供的闭包不会直接被调用,而是被安排在消息可用时由线程池中的任何线程处理。安排后,当前线程将从 actor 中分离,并释放以处理已收到消息的任何其他 actor。

为了允许从线程中分离 actor, react() 方法要求代码以特殊的延续风格编写。

Actors.actor {
    loop {
        println 'Waiting for a gift'
        react {gift ->
            if (myWife.likes gift) reply 'Thank you!'
            else {
                reply 'Try again, please'
                react {anotherGift ->
                    if (myChildren.like gift) reply 'Thank you!'
                }
                println 'Never reached'
            }
        }
        println 'Never reached'
    }
    println 'Never reached'
}

react() 方法具有特殊的语义,允许 actor 在其邮箱中没有消息可用时从线程中分离。本质上, react() 安排提供的代码(闭包)在下一条消息到达时执行并返回。提供给 react() 方法的闭包是计算应该继续的代码。因此,延续风格

由于 Actor 必须保证其主体中最多只有一个线程处于活动状态,因此在当前消息处理完成之前无法处理下一个消息。通常情况下,不需要在调用 react() 之后放置代码。某些 Actor 实现甚至强制执行此规则,但 GPars 出于性能原因不执行此规则。 loop() 方法允许在 Actor 主体中进行迭代。与典型的循环结构(如 forwhile 循环)不同,loop() 与嵌套的 react() 块协作,并确保跨后续消息检索进行循环。

发送回复

除了 Actor 本身,reply/replyIfExists 方法也定义在 AbstractPooledActor 上(在 DefaultActorDynamicDispatchActorReactiveActor 类中不可用),当接收消息时,它也定义在处理过的消息本身之上,这在一次调用中处理多个消息时非常有用。在这种情况下,在 Actor 上调用的 reply() 会向所有当前正在处理的消息(最后一个消息)的作者发送回复,而调用消息上的 reply() 只会向特定消息的作者发送回复。

请在此处查看演示

发送者属性

检索到的消息提供发送者属性来标识消息的发送者。该属性在 Actor 的闭包内可用

react {tweet ->
    if (isSpam(tweet)) ignoreTweetsFrom sender
    sender.send 'Never write me again!'
}

转发

发送消息时,可以指定不同的 Actor 作为发送者,以便潜在的回复将转发到指定的 Actor,而不是实际的发送者。

def decryptor = Actors.actor {
    react {message ->
        reply message.reverse()
//        sender.send message.reverse()    //An alternative way to send replies
    }
}

def console = Actors.actor { //This actor will print out decrypted messages, since the replies are forwarded to it react { println 'Decrypted message: ' + it } }

decryptor.send 'lellarap si yvoorG', console //Specify an actor to send replies to console.join()

创建 Actor

Actor 共享一个 线程池,当 Actor 需要对发送给它们的 消息 作出反应 时,这些线程会动态分配给 Actor。当消息处理完毕,并且 Actor 处于空闲状态等待更多消息到达时,这些线程会返回到线程池。

例如,以下是如何创建打印接收到的所有消息的 Actor。

def console = Actors.actor {
    loop {
        react {
            println it
        }
    }
}

注意 loop() 方法调用,它确保 Actor 在处理完第一条消息后不会停止。

这是一个解密服务的示例,它可以解密提交的消息,并将解密后的消息发送回发送者。

final def decryptor = Actors.actor {
    loop {
        react {String message ->
            if ('stopService' == message) {
                println 'Stopping decryptor'
                stop()
            }
            else reply message.reverse()
        }
    }
}

Actors.actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it decryptor.send 'stopService' } }.join()

以下是一个 Actor 的示例,它等待最多 30 秒以接收其消息的回复。

def friend = Actors.actor {
    react {
        //this doesn't reply -> caller won't receive any answer in time
        println it
        //reply 'Hello' //uncomment this to answer conversation
        react {
            println it
        }
    }
}

def me = Actors.actor { friend.send('Hi') //wait for answer 1sec react(1000) {msg -> if (msg == Actor.TIMEOUT) { friend.send('I see, busy as usual. Never mind.') stop() } else { //continue conversation println "Thank you for $msg" } } }

me.join()

未送达的消息

有时消息无法传递到目标 Actor。当需要对未送达的消息采取特殊操作时,在 Actor 终止时,其队列中所有未处理的消息都会调用其 onDeliveryError() 方法。 onDeliveryError() 方法或定义在消息上的闭包可以,例如,将通知发送回消息的原始发送者。

final DefaultActor me
me = Actors.actor {
    def message = 1

message.metaClass.onDeliveryError = {-> //send message back to the caller me << "Could not deliver $delegate" }

def actor = Actors.actor { react { //wait 2sec in order next call in demo can be emitted Thread.sleep(2000) //stop actor after first message stop() } }

actor << message actor << message

react { //print whatever comes back println it }

}

me.join()

或者,可以指定发送者本身的 onDeliveryError() 方法。该方法既可以动态添加

final DefaultActor me
me = Actors.actor {
    def message1 = 1
    def message2 = 2

def actor = Actors.actor { react { //wait 2sec in order next call in demo can be emitted Thread.sleep(2000) //stop actor after first message stop() } }

me.metaClass.onDeliveryError = {msg -> //callback on actor inaccessibility println "Could not deliver message $msg" }

actor << message1 actor << message2

actor.join()

}

me.join()

也可以在 Actor 定义中静态添加

class MyActor extends DefaultActor {
    public void onDeliveryError(msg) {
        println "Could not deliver message $msg"
    }
    …
}

加入 Actor

Actor 提供 join() 方法,允许调用者等待 Actor 终止。还提供了一个接受超时的变体。当同时加入多个 Actor 时,Groovy 的 spread-dot 运算符非常有用。

def master = new GameMaster().start()
def player = new Player(name: 'Player', server: master).start()

[master, player]*.join()

条件和计数循环

loop() 方法允许指定条件或迭代次数,可以选择性地附加一个闭包以在循环结束后调用 - 循环结束后的代码处理器

以下 Actor 将循环三次以接收 3 条消息,然后打印接收到的消息的最大值。

final Actor actor = Actors.actor {
    def candidates = []
    def printResult = {-> println "The best offer is ${candidates.max()}"}

loop(3, printResult) { react { candidates << it } } }

actor 10 actor 30 actor 20 actor.join()

以下 Actor 将接收消息,直到收到大于 30 的值。

final Actor actor = Actors.actor {
    def candidates = []
    final Closure printResult = {-> println "Reached best offer - ${candidates.max()}"}

loop({-> candidates.max() < 30}, printResult) { react { candidates << it } } }

actor 10 actor 20 actor 25 actor 31 actor 20 actor.join()

循环结束后的代码处理器 可以使用 Actor 的 react{} 但不能使用 loop()

DefaultActor 可以设置为以公平或非公平(默认)的方式运行。根据选择的策略,Actor 要么使线程对共享相同并行组的其他 Actor 可用(公平),要么将线程保留给自己,直到消息队列为空(非公平)。一般来说,非公平 Actor 的性能比公平 Actor 好 2 到 3 倍。

使用 fairActor() 工厂方法或 Actor 的 makeFair() 方法。

自定义调度器

Actor 默认情况下利用标准 JDK 并发库。要提供自定义线程调度器,请在创建并行组(PGroup 类)时使用相应的构造函数参数。提供的调度器将在组的线程池中协调线程。

请参阅众多 Actor 演示

5.1 Actor 原则

Actor 共享一个 线程池,当 Actor 需要对发送给它们的 消息 作出反应 时,这些线程会动态分配给 Actor。当消息处理完毕,并且 Actor 处于空闲状态等待更多消息到达时,这些线程会返回到线程池。Actor 从底层线程中分离出来,因此相对较小的线程池可以为潜在的无限数量的 Actor 提供服务。Actor 数量的虚拟无限可扩展性是 基于事件的 Actor 的主要优势,它们与底层物理线程分离。

以下是一些使用 Actor 的示例。以下是如何创建打印接收到的所有消息的 Actor。

import static groovyx.gpars.actor.Actors.actor

def console = actor { loop { react { println it } }

注意 loop() 方法调用,它确保 Actor 在处理完第一条消息后不会停止。

或者,您可以扩展 DefaultActor 类并重写 act() 方法。实例化 Actor 后,您需要启动它,以便它附加到线程池并开始接受消息。 actor() 工厂方法将负责启动 Actor。

class CustomActor extends DefaultActor {
    @Override
    protected void act() {
        loop {
            react {
                println it
            }
        }
    }
}

def console=new CustomActor() console.start()

可以使用多种方法向 Actor 发送消息

console.send('Message')
console 'Message'
console.sendAndWait 'Message'                                                     //Wait for a reply
console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"}  //Forward the reply to a function

创建异步服务

import static groovyx.gpars.actor.Actors.actor

final def decryptor = actor { loop { react {String message-> reply message.reverse() } } }

def console = actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it } }

console.join()

如您所见,您可以使用 actor() 方法创建新的 Actor,将 Actor 主体作为闭包参数传递。在 Actor 主体中,您可以使用 loop() 进行迭代,使用 react() 接收消息,使用 reply() 向发送当前处理消息的 Actor 发送消息。当前消息的发送者也可以通过 Actor 的 sender 属性获得。当解密 Actor 在调用 react() 时,其消息队列中没有消息时,react() 方法会放弃线程并将其返回到线程池供其他 Actor 拾取。只有在 Actor 的消息队列中收到新消息后,才会将 react() 方法的闭包安排在池中进行处理。基于事件的 Actor 在内部模拟了延续 - Actor 的工作被分割成按顺序运行的块,这些块在消息在收件箱中可用时被调用。单个 Actor 的每个块都可以由线程池中的不同线程执行。

Groovy 灵活的语法与闭包相结合,使我们的库能够提供多种定义 Actor 的方式。例如,以下是一个 Actor 的示例,它等待最多 30 秒以接收其消息的回复。Actor 允许由 org.codehaus.groovy.runtime.TimeCategory 类定义的时间 DSL 用于对 react() 方法指定超时,前提是用户将调用包含在 TimeCategory 使用块中。

def friend = Actors.actor {
    react {
        //this doesn't reply -> caller won't receive any answer in time
        println it
        //reply 'Hello' //uncomment this to answer conversation
        react {
            println it
        }
    }
}

def me = Actors.actor { friend.send('Hi') //wait for answer 1sec react(1000) {msg -> if (msg == Actor.TIMEOUT) { friend.send('I see, busy as usual. Never mind.') stop() } else { //continue conversation println "Thank you for $msg" } } }

me.join()

当等待消息时超时到期时,将收到 Actor.TIMEOUT 消息。如果存在,还会调用 onTimeout() 处理程序

def friend = Actors.actor {
    react {
        //this doesn't reply -> caller won't receive any answer in time
        println it
        //reply 'Hello' //uncomment this to answer conversation
        react {
            println it
        }
    }
}

def me = Actors.actor { friend.send('Hi')

delegate.metaClass.onTimeout = {-> friend.send('I see, busy as usual. Never mind.') stop() }

//wait for answer 1sec react(1000) {msg -> if (msg != Actor.TIMEOUT) { //continue conversation println "Thank you for $msg" } } }

me.join()

注意可以使用 Groovy 元编程来动态定义 Actor 的生命周期通知方法(例如 onTimeout() )。当然,当您决定为您的 Actor 定义一个新类时,可以以通常的方式定义生命周期方法。

class MyActor extends DefaultActor {
    public void onTimeout() {
        …
    }

protected void act() { … } }

Actor 保证非线程安全代码的线程安全

Actor 保证始终最多有一个线程一次处理 Actor 的主体,并且在幕后,每次线程被分配给 Actor 时都会同步内存,因此 Actor 的状态 可以安全地修改,代码在主体中 无需任何其他额外(同步或锁定)工作

class MyCounterActor extends DefaultActor {
    private Integer counter = 0

protected void act() { loop { react { counter++ } } } }

理想情况下,Actor 的代码 绝不应该 从外部直接调用,因此 Actor 类中的所有代码都只能由处理最后一个接收到的消息的线程执行,因此所有 Actor 的代码都是 隐式线程安全 的。如果允许其他对象直接调用 Actor 的任何方法,则 Actor 代码和状态的线程安全保证 不再有效

简单的计算器

这是一个更现实的基于事件的 Actor 示例,它接收两个数字消息,将它们加起来,并将结果发送到控制台 Actor。

import groovyx.gpars.group.DefaultPGroup

//not necessary, just showing that a single-threaded pool can still handle multiple actors def group = new DefaultPGroup(1);

final def console = group.actor { loop { react { println 'Result: ' + it } } }

final def calculator = group.actor { react {a -> react {b -> console.send(a + b) } } }

calculator.send 2 calculator.send 3

calculator.join() group.shutdown()

请注意,基于事件的 Actor 需要特别注意 react() 方法。由于 基于事件的 Actor 需要将代码分割成可以按顺序分配给不同线程的独立块,并且 JVM 本身不支持 延续 ,因此这些块是人工创建的。 react() 方法创建下一个消息处理程序。一旦当前消息处理程序完成,下一个消息处理程序(延续)就会被调度。

并发归并排序示例

为了进行比较,我还包含了一个更复杂的示例,它使用 Actor 对整数列表进行并发归并排序。您可以看到,由于 Groovy 的灵活性,我们非常接近 Scala 模型,尽管我仍然错过了 Scala 模式匹配用于消息处理。

import groovyx.gpars.group.DefaultPGroup
import static groovyx.gpars.actor.Actors.actor

Closure createMessageHandler(def parentActor) { return { react {List<Integer> message -> assert message != null switch (message.size()) { case 0..1: parentActor.send(message) break case 2: if (message[0] <= message[1]) parentActor.send(message) else parentActor.send(message[-1..0]) break default: def splitList = split(message)

def child1 = actor(createMessageHandler(delegate)) def child2 = actor(createMessageHandler(delegate)) child1.send(splitList[0]) child2.send(splitList[1])

react {message1 -> react {message2 -> parentActor.send merge(message1, message2) } } } } } }

def console = new DefaultPGroup(1).actor { react { println "Sorted array:t${it}" System.exit 0 } }

def sorter = actor(createMessageHandler(console)) sorter.send([1, 5, 2, 4, 3, 8, 6, 7, 3, 9, 5, 3]) console.join()

def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

由于 Actor 从池中重用线程,因此脚本将使用几乎 任何大小的线程池 工作,无论沿途创建了多少 Actor。

Actor 生命周期方法

每个 Actor 都可以定义生命周期观察方法,这些方法将在每次发生特定生命周期事件时调用。
  • afterStart() - 在 Actor 启动后立即调用。
  • afterStop(List undeliveredMessages) - 在 Actor 停止后立即调用,传入队列中所有未处理的消息。
  • onInterrupt(InterruptedException e) - 当 Actor 的线程被中断时调用。线程中断将导致 Actor 在任何情况下停止。
  • onTimeout() - 当在为当前阻塞的 react 方法指定的超时时间内没有消息发送到 Actor 时调用。
  • onException(Throwable e) - 当 Actor 的事件处理程序中发生异常时调用。Actor 将在此方法返回后停止。

您可以在您的 Actor 类中静态定义这些方法,也可以动态地将它们添加到 Actor 的元类中

class MyActor extends DefaultActor {
    public void afterStart() {
        …
    }
    public void onTimeout() {
        …
    }

protected void act() { … } }

def myActor = actor {
    delegate.metaClass.onException = {
        log.error('Exception occurred', it)
    }

… }

为了提高性能,您可以考虑使用 silentStart() 方法而不是 start() 来启动 DynamicDispatchActorReactiveActor 。调用 silentStart() 将绕过部分启动机制,因此也将避免调用 afterStart() 方法。由于其有状态的性质,DefaultActor 无法以静默方式启动。

池管理

Actor 可以被组织成组,默认情况下,始终有一个应用程序范围的池化 actor 组可用。就像使用 Actor 抽象工厂可以在默认组中创建 actor 一样,自定义组可以作为抽象工厂来创建属于这些组的新 actor 实例。

def myGroup = new DefaultPGroup()

def actor1 = myGroup.actor { … }

def actor2 = myGroup.actor { … }

actor 的 parallelGroup 属性指向它所属的组。默认情况下,它指向默认 actor 组,即 Actors.defaultActorPGroup,并且只能在 actor 启动之前更改。

class MyActor extends StaticDispatchActor<Integer> {
    private static PGroup group = new DefaultPGroup(100)

MyActor(...) { this.parallelGroup = group … } }

属于同一组的 actor 共享该组的 底层线程池。默认情况下,池包含 n + 1 个线程,其中 n 代表 JVM 检测到的 CPU 数量。可以通过设置 gpars.poolsize 系统属性或为每个 actor 组分别指定适当的构造函数参数,来 显式 设置 池大小

def myGroup = new DefaultPGroup(10)  //the pool will contain 10 threads

可以通过适当的 DefaultPGroup 类操作线程池,该类 委托 给线程池的 Pool 接口。例如,resize() 方法允许您随时更改池大小,而 resetDefaultSize() 方法将其恢复为默认值。当您需要安全地完成所有任务、销毁池并停止所有线程以便以组织的方式退出 JVM 时,可以调用 shutdown() 方法。

… (n+1 threads in the default pool after startup)

Actors.defaultActorPGroup.resize 1 //use one-thread pool

… (1 thread in the pool)

Actors.defaultActorPGroup.resetDefaultSize()

… (n+1 threads in the pool)

Actors.defaultActorPGroup.shutdown()

作为 DefaultPGroup 的替代方案,DefaultPGroup 创建了一个守护线程池,当需要非守护线程时,可以使用 NonDaemonPGroup 类。

def daemonGroup = new DefaultPGroup()

def actor1 = daemonGroup.actor { … }

def nonDaemonGroup = new NonDaemonPGroup()

def actor2 = nonDaemonGroup.actor { … }

class MyActor { def MyActor() { this.parallelGroup = nonDaemonGroup }

void act() {...} }

属于同一组的 actor 共享 底层线程池。使用池化 actor 组,您可以将 actor 分割以利用多个不同大小的线程池,从而将资源分配给系统的不同组件并调整其性能。

def coreActors = new NonDaemonPGroup(5)  //5 non-daemon threads pool
def helperActors = new DefaultPGroup(1)  //1 daemon thread pool

def priceCalculator = coreActors.actor { … }

def paymentProcessor = coreActors.actor { … }

def emailNotifier = helperActors.actor { … }

def cleanupActor = helperActors.actor { … }

//increase size of the core actor group coreActors.resize 6

//shutdown the group's pool once you no longer need the group to release resources helperActors.shutdown()

不要忘记在不再需要它们及其 actor 时关闭自定义池化 actor 组,以保留系统资源。

默认 actor 组

没有更改其 parallelGroup 属性或通过 Actors 类上的任何工厂方法创建的 actor 共享一个公共组 Actors.defaultActorPGroup。该组使用 可调整大小的线程池,其上限为 1000 个线程。这使您能够安心地让池自动根据 actor 的需求进行调整。另一方面,随着 actor 数量的增加,池可能会变得过大而效率低下。建议将 actor 分组到您自己的 PGroup 中,使用固定大小的线程池,除了最简单的应用程序之外,其他所有应用程序都应如此。

常见陷阱:应用程序终止,而 actor 未收到消息

您最有可能使用的是守护线程和池,这是默认设置,并且您的主线程已完成。在您任何、部分或所有 actor 上调用 actor.join() 将阻塞主线程,直到 actor 终止,从而保持所有 actor 运行。或者,使用 NonDaemonPGroup 的实例,并将一些 actor 分配给这些组。

def nonDaemonGroup = new NonDaemonPGroup()
def myActor = nonDaemonGroup.actor {...}

或者

def nonDaemonGroup = new NonDaemonPGroup()

class MyActor extends DefaultActor { def MyActor() { this.parallelGroup = nonDaemonGroup }

void act() {...} }

def myActor = new MyActor()

阻塞 Actor

在某些情况下,您可能更喜欢使用阻塞 actor,而不是使用事件驱动的 continuation 风格的 actor。阻塞 actor 为其整个生命周期(包括等待消息的时间)保留一个单一的池化线程。它们避免了一些线程管理开销,因为它们在启动后不会争夺线程,而且它们让您能够编写直观的代码,而无需使用 continuation 风格,因为它们只通过 receive 方法执行阻塞消息读取。显然,同时运行的阻塞 actor 数量受共享池中可用线程数量的限制。另一方面,阻塞 actor 通常比 continuation 风格的 actor 具有更好的性能,尤其是在 actor 的消息队列很少为空的情况下。

def decryptor = blockingActor {
    while (true) {
        receive {message ->
            if (message instanceof String) reply message.reverse()
            else stop()
        }
    }
}

def console = blockingActor { decryptor.send 'lellarap si yvoorG' println 'Decrypted message: ' + receive() decryptor.send false }

[decryptor, console]*.join()

阻塞 actor 增加了调整应用程序性能的选项数量。它们可能是 actor 网络中高流量位置的理想选择。

5.2 无状态 Actor

动态分派 Actor

DynamicDispatchActor 类是一种 actor,允许使用消息处理代码的替代结构。通常,DynamicDispatchActor 重复扫描消息并将到达的消息分派给 actor 上定义的 onMessage(message) 方法之一。DynamicDispatchActor 在幕后利用了 Groovy 动态方法分派机制。与 DefaultActor 子类不同,DynamicDispatchActor 不是 ReactiveActor(将在下面讨论),因此不需要在后续消息接收之间隐式地记住 actor 的状态,因此它们的性能特性要好得多,通常与其他 actor 框架(如 Scala Actors)相当。

import groovyx.gpars.actor.Actors
import groovyx.gpars.actor.DynamicDispatchActor

final class MyActor extends DynamicDispatchActor {

void onMessage(String message) { println 'Received string' }

void onMessage(Integer message) { println 'Received integer' reply 'Thanks!' }

void onMessage(Object message) { println 'Received object' sender.send 'Thanks!' }

void onMessage(List message) { println 'Received list' stop() } }

final def myActor = new MyActor().start()

Actors.actor { myActor 1 myActor '' myActor 1.0 myActor(new ArrayList()) myActor.join() }.join()

在某些情况下,通常是在不需要为 actor 保留隐式对话历史记录相关状态时,动态分派代码结构可能比使用嵌套的 loopreact 语句的传统方法更直观。

DynamicDispatchActor 类还提供了一个方便的功能,可以在 actor 构造时或稍后使用 when 处理程序动态添加消息处理程序,这些处理程序可以选择地包装在 become 方法中。

final Actor myActor = new DynamicDispatchActor().become {
    when {String msg -> println 'A String'; reply 'Thanks'}
    when {Double msg -> println 'A Double'; reply 'Thanks'}
    when {msg -> println 'A something ...'; reply 'What was that?';stop()}
}
myActor.start()
Actors.actor {
    myActor 'Hello'
    myActor 1.0d
    myActor 10 as BigDecimal
    myActor.join()
}.join()

显然,这两种方法可以结合使用。

final class MyDDA extends DynamicDispatchActor {

void onMessage(String message) { println 'Received string' }

void onMessage(Integer message) { println 'Received integer' }

void onMessage(Object message) { println 'Received object' }

void onMessage(List message) { println 'Received list' stop() } }

final def myActor = new MyDDA().become { when {BigDecimal num -> println 'Received BigDecimal'} when {Float num -> println 'Got a float'} }.start() Actors.actor { myActor 'Hello' myActor 1.0f myActor 10 as BigDecimal myActor.send([]) myActor.join() }.join()

通过 when 注册的动态消息处理程序优先于静态 onMessage 处理程序。

DynamicDispatchActor 可以设置为以公平或非公平(默认)方式运行。根据选择的策略,actor 既可以将线程提供给共享同一并行组的其他 actor(公平),也可以将线程保留给自己,直到消息队列为空(非公平)。通常,非公平 actor 的性能比公平 actor 好 2 到 3 倍。

使用 fairMessageHandler() 工厂方法或 actor 的 makeFair() 方法。

def fairActor = Actors.fairMessageHandler {...}

静态分派 Actor

虽然 DynamicDispatchActor 根据消息的运行时类型分派消息,因此为每条消息支付额外的性能代价,但 StaticDispatchActor 避免了运行时消息检查,仅根据编译时信息分派消息。

final class MyActor extends StaticDispatchActor<String> {
    void onMessage(String message) {
        println 'Received string ' + message

switch (message) { case 'hello': reply 'Hi!' break case 'stop': stop() } } }

StaticDispatchActor 的实例必须覆盖适用于 actor 的声明类型参数的 onMessage 方法。然后,使用每条接收到的消息调用 onMessage(T message) 方法。

通过帮助器工厂方法,可以更快捷地创建公平或非公平的静态分派 actor。

final actor = staticMessageHandler {String message ->
    println 'Received string ' + message

switch (message) { case 'hello': reply 'Hi!' break case 'stop': stop() } }

println 'Reply: ' + actor.sendAndWait('hello') actor 'bye' actor 'stop' actor.join()

虽然与 DynamicDispatchActor 相比,StaticDispatchActor 类仅限于一个处理程序方法,但简化的创建方式(无需任何 when 处理程序)以及显著的性能优势应该使 StaticDispatchActor 成为您在需要分派消息,但不需要根据消息运行时类型进行分派时的默认选择。例如,与使用 DynamicDispatchActor 相比,StaticDispatchActors 使数据流运算符的速度提高了四倍。

响应式 Actor

ReactiveActor 类通常通过调用 Actors.reactor()DefaultPGroup.reactor() 来构造,它允许使用更类似于事件驱动的 подход。当响应式 actor 接收消息时,构成响应式 actor 主体的代码块将使用消息作为参数运行。从代码返回的结果将作为回复发送。

final def group = new DefaultPGroup()

final def doubler = group.reactor { 2 * it }

group.actor { println 'Double of 10 = ' + doubler.sendAndWait(10) }

group.actor { println 'Double of 20 = ' + doubler.sendAndWait(20) }

group.actor { println 'Double of 30 = ' + doubler.sendAndWait(30) }

for(i in (1..10)) { println "Double of $i = ${doubler.sendAndWait(i)}" }

doubler.stop() doubler.join()

以下是一个将一批数字提交给 ReactiveActor 进行处理,然后在结果到达时逐渐打印结果的 actor 示例。

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actors

final def doubler = Actors.reactor { 2 * it }

Actor actor = Actors.actor { (1..10).each {doubler << it} int i = 0 loop { i += 1 if (i > 10) stop() else { react {message -> println "Double of $i = $message" } } } }

actor.join() doubler.stop() doubler.join()

本质上,响应式 actor 为一个 actor 提供了一种便捷的快捷方式,该 actor 会在循环中等待消息、处理它们并发送回结果。以下是响应式 actor 在内部的示意性表示。

public class ReactiveActor extends DefaultActor {
    Closure body

void act() { loop { react {message -> reply body(message) } } } }

ReactiveActor 可以设置为以公平或非公平(默认)方式运行。根据选择的策略,actor 既可以将线程提供给共享同一并行组的其他 actor(公平),也可以将线程保留给自己,直到消息队列为空(非公平)。通常,非公平 actor 的性能比公平 actor 好 2 到 3 倍。

使用 fairReactor() 工厂方法或 actor 的 makeFair() 方法。

def fairActor = Actors.fairReactor {...}

5.3 提示和技巧

构建 actor 的代码

扩展 DefaultActor 类时,可以在 act() 方法中调用任何 actor 的方法,并在其中使用 react()loop() 方法。
class MyDemoActor extends DefaultActor {

protected void act() { handleA() }

private void handleA() { react {a -> handleB(a) } }

private void handleB(int a) { react {b -> println a + b reply a + b } } }

final def demoActor = new MyDemoActor() demoActor.start()

Actors.actor { demoActor 10 demoActor 20 react { println "Result: $it" } }.join()

请记住,我们所有示例中的 handleA()handleB() 方法只会将提供的消息处理程序调度为对下一条到达的消息的反应的当前计算的 continuation 来运行。

或者,当使用 actor() 工厂方法时,可以通过元类以闭包的形式添加事件处理代码。

Actor demoActor = Actors.actor {
    delegate.metaClass {
        handleA = {->
            react {a ->
                 handleB(a)
            }
        }

handleB = {a -> react {b -> println a + b reply a + b } } }

handleA() }

Actors.actor { demoActor 10 demoActor 20 react { println "Result: $it" } }.join()

具有 actor 作为其委托对象的闭包也可以用来构建事件处理代码。

Closure handleB = {a ->
    react {b ->
        println a + b
        reply a + b
    }
}

Closure handleA = {-> react {a -> handleB(a) } }

Actor demoActor = Actors.actor { handleA.delegate = delegate handleB.delegate = delegate

handleA() }

Actors.actor { demoActor 10 demoActor 20 react { println "Result: $it" } }.join()

事件驱动的循环

在编写事件驱动的 actor 时,您必须牢记,对 react()loop() 方法的调用具有稍微不同的语义。当您尝试在 actor 中实现任何类型的循环时,这会成为一项挑战。另一方面,如果您利用了 react() 仅调度 continuation 并返回的事实,则可以递归调用方法,而不用担心会填满堆栈。请查看以下示例,它们分别使用三种描述的构建 actor 代码的技术。

DefaultActor 的子类

class MyLoopActor extends DefaultActor {

protected void act() { outerLoop() }

private void outerLoop() { react {a -> println 'Outer: ' + a if (a != 0) innerLoop() else println 'Done' } }

private void innerLoop() { react {b -> println 'Inner ' + b if (b == 0) outerLoop() else innerLoop() } } }

final def actor = new MyLoopActor().start() actor 10 actor 20 actor 0 actor 0 actor.join()

增强 actor 的 metaClass

Actor actor = Actors.actor {

delegate.metaClass { outerLoop = {-> react {a -> println 'Outer: ' + a if (a!=0) innerLoop() else println 'Done' } }

innerLoop = {-> react {b -> println 'Inner ' + b if (b==0) outerLoop() else innerLoop() } } }

outerLoop() }

actor 10 actor 20 actor 0 actor 0 actor.join()

使用 Groovy 闭包

Closure innerLoop

Closure outerLoop = {-> react {a -> println 'Outer: ' + a if (a!=0) innerLoop() else println 'Done' } }

innerLoop = {-> react {b -> println 'Inner ' + b if (b==0) outerLoop() else innerLoop() } }

Actor actor = Actors.actor { outerLoop.delegate = delegate innerLoop.delegate = delegate

outerLoop() }

actor 10 actor 20 actor 0 actor 0 actor.join()

此外,不要忘记可以使用 actor 的 loop() 方法来创建一个运行到 actor 终止的循环。

class MyLoopingActor extends DefaultActor {

protected void act() { loop { outerLoop() } }

private void outerLoop() { react {a -> println 'Outer: ' + a if (a!=0) innerLoop() else println 'Done for now, but will loop again' } }

private void innerLoop() { react {b -> println 'Inner ' + b if (b == 0) outerLoop() else innerLoop() } } }

final def actor = new MyLoopingActor().start() actor 10 actor 20 actor 0 actor 0 actor 10 actor.stop() actor.join()

5.4 活动对象

活动对象在 actor 之上提供了一个面向对象的界面,使您能够避免直接处理 actor 机制,例如匹配消息、等待结果和发送回复。

具有友好界面的 Actor

import groovyx.gpars.activeobject.ActiveObject
import groovyx.gpars.activeobject.ActiveMethod

@ActiveObject class Decryptor { @ActiveMethod def decrypt(String encryptedText) { return encryptedText.reverse() }

@ActiveMethod def decrypt(Integer encryptedNumber) { return -1*encryptedNumber + 142 } }

final Decryptor decryptor = new Decryptor() def part1 = decryptor.decrypt(' noitcA ni yvoorG') def part2 = decryptor.decrypt(140) def part3 = decryptor.decrypt('noitide dn')

print part1.get() print part2.get() println part3.get()

您使用 @ActiveObject 注释标记活动对象。这将确保为您的类的每个实例创建一个隐藏的 actor 实例。现在,您可以使用 @ActiveMethod 注释标记方法,表示您希望该方法由目标对象的内部 actor 异步调用。@ActiveMethod 注释的可选布尔值 blocking 参数指定调用者是否应阻塞,直到结果可用,或者调用者是否应仅接收 DataflowVariable 形式的未来结果的 promise,从而使调用者不会被阻塞等待。

默认情况下,所有活动方法都设置为 非阻塞。但是,显式声明返回类型的 method 必须配置为阻塞,否则编译器将报告错误。对于非阻塞方法,只允许 defvoidDataflowVariable 作为返回类型。

在幕后,GPars 将您的方法调用转换为 发送给内部 actor 的消息。actor 最终将通过代表调用者调用所需方法来处理该消息,并在完成后将回复发送回调用者。非阻塞方法返回结果的 promises,即 DataflowVariables

但阻塞意味着我们并不真正异步,是吗?

确实,如果您将活动方法标记为 阻塞,则调用者将被阻塞,等待结果,就像进行正常的普通方法调用一样。我们所取得的成就是从并发访问中保证了 Active 对象内部的线程安全。这正是 synchronized 关键字也能给您带来的东西。因此,正是 非阻塞 方法应该推动您选择使用活动对象。阻塞方法将提供通常的同步语义,但会为并发方法调用提供一致性保证。因此,在与非阻塞方法结合使用时,阻塞方法仍然非常有用。

import groovyx.gpars.activeobject.ActiveMethod
import groovyx.gpars.activeobject.ActiveObject
import groovyx.gpars.dataflow.DataflowVariable

@ActiveObject class Decryptor { @ActiveMethod(blocking=true) String decrypt(String encryptedText) { encryptedText.reverse() }

@ActiveMethod(blocking=true) Integer decrypt(Integer encryptedNumber) { -1*encryptedNumber + 142 } }

final Decryptor decryptor = new Decryptor() print decryptor.decrypt(' noitcA ni yvoorG') print decryptor.decrypt(140) println decryptor.decrypt('noitide dn')

非阻塞语义

现在,调用非阻塞活动方法将立即返回,只要演员已被发送一条消息。调用者现在可以做任何他想做的事,而演员则负责计算。可以使用 promise 上的 bound 属性来轮询计算的状态。在返回的 promise 上调用 get() 方法将阻塞调用者,直到有值可用。对 get() 的调用最终将返回一个值或抛出一个异常,具体取决于实际计算的结果。

get() 方法还具有一个带有超时参数的变体,如果您想避免无限期等待的风险。

注释规则

在注释对象时,需要遵循一些规则

  1. ActiveMethod 注释仅被注释为 ActiveObject 的类接受
  2. 只有实例(非静态)方法可以被注释为 ActiveMethod
  3. 您可以用非活动方法覆盖活动方法,反之亦然
  4. 活动对象的子类可以声明额外的活动方法,前提是它们本身被注释为 ActiveObject
  5. 同时使用活动方法和非活动方法可能会导致竞争条件。理想情况下,将活动对象设计为完全封装的类,所有非私有方法都标记为活动方法

继承

@ActiveObject 注释可以出现在继承层次结构中的任何类上。actor 字段将只在层次结构中最顶层的注释类中创建,子类将重用该字段。

import groovyx.gpars.activeobject.ActiveObject
import groovyx.gpars.activeobject.ActiveMethod
import groovyx.gpars.dataflow.DataflowVariable

@ActiveObject class A { @ActiveMethod def fooA(value) { … } }

class B extends A { }

@ActiveObject class C extends B { @ActiveMethod def fooC(value1, value2) { … } }

在我们的示例中,actor 字段将被生成到 A 类中。 C 类必须用 @ActiveObject 注释,因为它在 fooC() 方法上拥有 @ActiveMethod 注释,而 B 类不需要注释,因为它没有活动方法。

就像演员可以围绕线程池分组一样,活动对象也可以配置为使用来自特定并行组的线程。

@ActiveObject("group1")
class MyActiveObject {
    …
}

@ActiveObject 注释的 value 参数指定了将内部演员绑定到的并行组的名称。只有来自指定组的线程才会被用来运行该类实例的内部演员。但是,在创建属于该组的任何活动对象实例之前,需要创建和注册这些组。如果未明确指定,活动对象将使用默认的演员组 - Actors.defaultActorPGroup

final DefaultPGroup group = new DefaultPGroup(10)
ActiveObjectRegistry.instance.register("group1", group)

内部演员的替代名称

您可能很少会遇到与活动对象的内部演员字段的默认名称发生名称冲突的情况。如果您需要更改默认名称 internalActiveObjectActor,请使用 @ActiveObject 注释的 actorName 参数。

@ActiveObject(actorName = "alternativeActorName")
class MyActiveObject {
    …
}

内部演员的替代名称及其目标组不能在子类中覆盖。确保您只在继承层次结构中最顶层的活动对象中指定这些值。显然,最顶层的活动对象仍然可以子类化其他类,只是它的任何祖先都不能是活动对象。

5.5 经典示例

关于演员使用的几个示例

示例

  • 埃拉托色尼筛法
  • 睡着的理发师
  • 哲学家进餐
  • 单词排序
  • 负载均衡器

埃拉托色尼筛法

问题描述

import groovyx.gpars.actor.DynamicDispatchActor

/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using actors * * In principle, the algorithm consists of concurrently run chained filters, * each of which detects whether the current number can be divided by a single prime number. * (generate nums 1, 2, 3, 4, 5, ...) -> (filter by mod 2) -> (filter by mod 3) -> (filter by mod 5) -> (filter by mod 7) -> (filter by mod 11) -> (caution! Primes falling out here) * The chain is built (grows) on the fly, whenever a new prime is found. */

int requestedPrimeNumberBoundary = 1000

final def firstFilter = new FilterActor(2).start()

/** * Generating candidate numbers and sending them to the actor chain */ (2..requestedPrimeNumberBoundary).each { firstFilter it } firstFilter.sendAndWait 'Poison'

/** * Filter out numbers that can be divided by a single prime number */ final class FilterActor extends DynamicDispatchActor { private final int myPrime private def follower

def FilterActor(final myPrime) { this.myPrime = myPrime; }

/** * Try to divide the received number with the prime. If the number cannot be divided, send it along the chain. * If there's no-one to send it to, I'm the last in the chain, the number is a prime and so I will create and chain * a new actor responsible for filtering by this newly found prime number. */ def onMessage(int value) { if (value % myPrime != 0) { if (follower) follower value else { println "Found $value" follower = new FilterActor(value).start() } } }

/** * Stop the actor on poisson reception */ def onMessage(def poisson) { if (follower) { def sender = sender follower.sendAndContinue(poisson, {this.stop(); sender?.send('Done')}) //Pass the poisson along and stop after a reply } else { //I am the last in the chain stop() reply 'Done' } } }

睡着的理发师

问题描述

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.actor.Actor

final def group = new DefaultPGroup()

final def barber = group.actor { final def random = new Random() loop { react {message -> switch (message) { case Enter: message.customer.send new Start() println "Barber: Processing customer ${message.customer.name}" doTheWork(random) message.customer.send new Done() reply new Next() break case Wait: println "Barber: No customers. Going to have a sleep" break } } } }

private def doTheWork(Random random) { Thread.sleep(random.nextInt(10) * 1000) }

final Actor waitingRoom

waitingRoom = group.actor { final int capacity = 5 final List<Customer> waitingCustomers = [] boolean barberAsleep = true

loop { react {message -> switch (message) { case Enter: if (waitingCustomers.size() == capacity) { reply new Full() } else { waitingCustomers << message.customer if (barberAsleep) { assert waitingCustomers.size() == 1 barberAsleep = false waitingRoom.send new Next() } else reply new Wait() } break case Next: if (waitingCustomers.size()>0) { def customer = waitingCustomers.remove(0) barber.send new Enter(customer:customer) } else { barber.send new Wait() barberAsleep = true } } } }

}

class Customer extends DefaultActor { String name Actor localBarbers

void act() { localBarbers << new Enter(customer:this) loop { react {message -> switch (message) { case Full: println "Customer: $name: The waiting room is full. I am leaving." stop() break case Wait: println "Customer: $name: I will wait." break case Start: println "Customer: $name: I am now being served." break case Done: println "Customer: $name: I have been served." stop(); break

} } } } }

class Enter { Customer customer } class Full {} class Wait {} class Next {} class Start {} class Done {}

def customers = [] customers << new Customer(name:'Joe', localBarbers:waitingRoom).start() customers << new Customer(name:'Dave', localBarbers:waitingRoom).start() customers << new Customer(name:'Alice', localBarbers:waitingRoom).start()

sleep 15000 customers << new Customer(name: 'James', localBarbers: waitingRoom).start() sleep 5000 customers*.join() barber.stop() waitingRoom.stop()

哲学家进餐

问题描述

import groovyx.gpars.actor.DefaultActor
import groovyx.gpars.actor.Actors

Actors.defaultActorPGroup.resize 5

final class Philosopher extends DefaultActor { private Random random = new Random()

String name def forks = []

void act() { assert 2 == forks.size() loop { think() forks*.send new Take() def messages = [] react {a -> messages << [a, sender] react {b -> messages << [b, sender] if ([a, b].any {Rejected.isCase it}) { println "$name: tOops, can't get my forks! Giving up." final def accepted = messages.find {Accepted.isCase it[0]} if (accepted!=null) accepted[1].send new Finished() } else { eat() reply new Finished() } } } } }

void think() { println "$name: tI'm thinking" Thread.sleep random.nextInt(5000) println "$name: tI'm done thinking" }

void eat() { println "$name: tI'm EATING" Thread.sleep random.nextInt(2000) println "$name: tI'm done EATING" } }

final class Fork extends DefaultActor {

String name boolean available = true

void act() { loop { react {message -> switch (message) { case Take: if (available) { available = false reply new Accepted() } else reply new Rejected() break case Finished: assert !available available = true break default: throw new IllegalStateException("Cannot process the message: $message") } } } } }

final class Take {} final class Accepted {} final class Rejected {} final class Finished {}

def forks = [ new Fork(name:'Fork 1'), new Fork(name:'Fork 2'), new Fork(name:'Fork 3'), new Fork(name:'Fork 4'), new Fork(name:'Fork 5') ]

def philosophers = [ new Philosopher(name:'Joe', forks:[forks[0], forks[1]]), new Philosopher(name:'Dave', forks:[forks[1], forks[2]]), new Philosopher(name:'Alice', forks:[forks[2], forks[3]]), new Philosopher(name:'James', forks:[forks[3], forks[4]]), new Philosopher(name:'Phil', forks:[forks[4], forks[0]]), ]

forks*.start() philosophers*.start()

sleep 10000 forks*.stop() philosophers*.stop()

单词排序

给定一个文件夹名称,脚本将对文件夹中所有文件中的单词进行排序。 SortMaster 演员创建给定数量的 WordSortActors,将要排序单词的文件分配给它们,并收集结果。

受 Michael Galpin 的 Scala Concurrency 博客文章启发

//Messages
private final class FileToSort { String fileName }
private final class SortResult { String fileName; List<String> words }

//Worker actor class WordSortActor extends DefaultActor {

private List<String> sortedWords(String fileName) { parseFile(fileName).sort {it.toLowerCase()} }

private List<String> parseFile(String fileName) { List<String> words = [] new File(fileName).splitEachLine(' ') {words.addAll(it)} return words }

void act() { loop { react {message -> switch (message) { case FileToSort: println "Sorting file=${message.fileName} on thread ${Thread.currentThread().name}" reply new SortResult(fileName: message.fileName, words: sortedWords(message.fileName)) } } } } }

//Master actor final class SortMaster extends DefaultActor {

String docRoot = '/' int numActors = 1

List<List<String>> sorted = [] private CountDownLatch startupLatch = new CountDownLatch(1) private CountDownLatch doneLatch

private void beginSorting() { int cnt = sendTasksToWorkers() doneLatch = new CountDownLatch(cnt) }

private List createWorkers() { return (1..numActors).collect {new WordSortActor().start()} }

private int sendTasksToWorkers() { List<Actor> workers = createWorkers() int cnt = 0 new File(docRoot).eachFile { workers[cnt % numActors] << new FileToSort(fileName: it) cnt += 1 } return cnt }

public void waitUntilDone() { startupLatch.await() doneLatch.await() }

void act() { beginSorting() startupLatch.countDown() loop { react { switch (it) { case SortResult: sorted << it.words doneLatch.countDown() println "Received results for file=${it.fileName}" } } } } }

//start the actors to sort words def master = new SortMaster(docRoot: 'c:/tmp/Logs/', numActors: 5).start() master.waitUntilDone() println 'Done'

File file = new File("c:/tmp/Logs/sorted_words.txt") file.withPrintWriter { printer -> master.sorted.each { printer.println it } }

负载均衡器

演示了在可调整的工人集合之间进行工作平衡。负载均衡器接收任务,并将它们排队到一个临时任务队列中。当一个工人完成他的任务时,他会向负载均衡器请求一个新任务。

如果负载均衡器在任务队列中没有可用的任务,则会停止该工人。如果任务队列中的任务数量超过某个限制,则会创建一个新的工人来增加工人池的大小。

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.DefaultActor

/** * Demonstrates work balancing among adaptable set of workers. * The load balancer receives tasks and queues them in a temporary task queue. * When a worker finishes his assignment, it asks the load balancer for a new task. * If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. * If the number of tasks in the task queue exceeds certain limit, a new worker is created * to increase size of the worker pool. */

final class LoadBalancer extends DefaultActor { int workers = 0 List taskQueue = [] private static final QUEUE_SIZE_TRIGGER = 10

void act() { loop { react { message -> switch (message) { case NeedMoreWork: if (taskQueue.size() == 0) { println 'No more tasks in the task queue. Terminating the worker.' reply DemoWorker.EXIT workers -= 1 } else reply taskQueue.remove(0) break case WorkToDo: taskQueue << message if ((workers == 0) || (taskQueue.size() >= QUEUE_SIZE_TRIGGER)) { println 'Need more workers. Starting one.' workers += 1 new DemoWorker(this).start() } } println "Active workers=${workers}tTasks in queue=${taskQueue.size()}" } } } }

final class DemoWorker extends DefaultActor { final static Object EXIT = new Object() private static final Random random = new Random()

Actor balancer

def DemoWorker(balancer) { this.balancer = balancer }

void act() { loop { this.balancer << new NeedMoreWork() react { switch (it) { case WorkToDo: processMessage(it) break case EXIT: terminate() } } }

}

private void processMessage(message) { synchronized (random) { Thread.sleep random.nextInt(5000) } } } final class WorkToDo {} final class NeedMoreWork {}

final Actor balancer = new LoadBalancer().start()

//produce tasks for (i in 1..20) { Thread.sleep 100 balancer << new WorkToDo() }

//produce tasks in a parallel thread Thread.start { for (i in 1..10) { Thread.sleep 1000 balancer << new WorkToDo() } }

Thread.sleep 35000 //let the queues get empty balancer << new WorkToDo() balancer << new WorkToDo() Thread.sleep 10000

balancer.stop() balancer.join()

6 代理

Agent 类是一个线程安全的非阻塞共享可变状态包装器实现,灵感来自 Clojure 中的代理。

当您使用架构消除对共享可变状态的需要时,许多并发问题就会消失。实际上,演员、CSP 或数据流并发等概念完全避免或隔离了可变状态。但是,在某些情况下,共享可变数据是不可避免的,或者使设计更自然、更易于理解。例如,考虑一个典型的电子商务应用程序中的购物车,其中多个 AJAX 请求可能并发地向购物车发出读或写请求。

简介

在 Clojure 编程语言中,您可以找到代理的概念,其目的是保护需要跨线程共享的可变数据。代理隐藏数据并保护它免受直接访问。客户端只能向代理发送命令(函数)。命令将被序列化并依次针对数据进行处理。由于命令是按顺序执行的,因此命令不需要关心并发,并且可以假设数据在运行时都是它们自己的。虽然实现方式不同,但 GPars 代理,称为 Agent,在本质上与演员的行为相似。它们接受消息并异步处理它们。但是,消息必须是命令(函数或 Groovy 闭包),并且将在代理内部执行。接收后,接收到的函数将在代理的内部状态上运行,函数的返回值被认为是代理的新的内部状态。

从本质上讲,代理通过只允许单个 代理管理线程 对它们进行修改来保护可变值。可变值 无法从外部直接访问,而是必须 向代理发送请求,代理保证按顺序代表调用者处理请求。代理保证所有请求的顺序执行,因此保证值的 consistency。

示意图

agent = new Agent(0)  //created a new Agent wrapping an integer with initial value 0
agent.send {increment()}  //asynchronous send operation, sending the increment() function
…
//after some delay to process the message the internal Agent's state has been updated
…
assert agent.val== 1
要包装整数,我们当然可以使用 Java 平台上的 AtomicXXX 类型,但是当状态是一个更复杂的对象时,我们需要更多支持。

概念

GPars 提供了一个 Agent 类,这是一个专门的线程安全的非阻塞实现,灵感来自 Clojure 中的代理。

Agent 包装对可变状态的引用,该引用保存在单个字段中,并接受代码(闭包/命令)作为消息,这些消息可以像使用任何其他演员一样使用 '<<' 运算符、send() 方法或隐式 call() 方法发送到 Agent。在接收闭包/命令后的某个时间点,闭包将在内部可变字段上调用,并可以对它进行更改。保证闭包在没有来自其他线程的干扰的情况下运行,因此可以自由地更改保存在内部 <i>data</i> 字段中的 Agent 的内部状态。

整个更新过程是“发布即忘”类型的,因为一旦将消息(闭包)发送到 Agent,调用者线程就可以去执行其他操作,并稍后返回来使用 Agent.val 或 Agent.valAsync(closure) 检查当前值。

基本规则

  • 执行时,提交的命令获取代理的状态作为参数。
  • 提交的命令/闭包可以调用代理状态上的任何方法。
  • 也可以用新的对象替换状态对象,这可以通过使用 updateValue() 方法来完成。
  • 提交的闭包的 返回值没有特殊含义,将被忽略。
  • 如果发送到 Agent 的消息 不是闭包,则它被认为是内部引用字段的 新值
  • Agentval 属性将等待代理队列中所有前面的命令都被消费,然后安全地返回 Agent 的值。
  • valAsync() 方法将执行相同的操作, 不会阻塞调用者。
  • instantVal 属性将返回代理内部状态的即时快照。
  • 所有 Agent 实例共享一个默认的守护线程池。设置 Agent 实例的 threadPool 属性将允许它使用不同的线程池。
  • 命令抛出的异常可以使用 errors 属性收集。

示例

共享成员列表

Agent 包装了一个成员列表,这些成员已添加到容器中。要添加新成员,必须向 jugMembers Agent 发送消息(添加成员的命令)。

import groovyx.gpars.agent.Agent
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

/** * Create a new Agent wrapping a list of strings */ def jugMembers = new Agent<List<String>>(['Me']) //add Me

jugMembers.send {it.add 'James'} //add James

final Thread t1 = Thread.start { jugMembers.send {it.add 'Joe'} //add Joe }

final Thread t2 = Thread.start { jugMembers << {it.add 'Dave'} //add Dave jugMembers {it.add 'Alice'} //add Alice (using the implicit call() method) }

[t1, t2]*.join() println jugMembers.val jugMembers.valAsync {println "Current members: $it"}

jugMembers.await()

共享会议,统计注册人数

Conference 类允许注册和取消注册,但是这些方法只能从发送到 conference Agent 的命令中调用。

import groovyx.gpars.agent.Agent

/** * Conference stores number of registrations and allows parties to register and unregister. * It inherits from the Agent class and adds the register() and unregister() private methods, * which callers may use it the commands they submit to the Conference. */ class Conference extends Agent<Long> { def Conference() { super(0) } private def register(long num) { data += num } private def unregister(long num) { data -= num } }

final Agent conference = new Conference() //new Conference created

/** * Three external parties will try to register/unregister concurrently */

final Thread t1 = Thread.start { conference << {register(10L)} //send a command to register 10 attendees }

final Thread t2 = Thread.start { conference << {register(5L)} //send a command to register 5 attendees }

final Thread t3 = Thread.start { conference << {unregister(3L)} //send a command to unregister 3 attendees }

[t1, t2, t3]*.join()

assert 12L == conference.val

工厂方法

Agent 实例也可以使用 Agent.agent() 工厂方法创建。

def jugMembers = Agent.agent ['Me']  //add Me

监听器和验证器

代理允许用户添加监听器和验证器。监听器会在内部状态发生变化时收到通知,而验证器则有机会通过抛出异常来拒绝即将发生的更改。

final Agent counter = new Agent()

counter.addListener {oldValue, newValue -> println "Changing value from $oldValue to $newValue"} counter.addListener {agent, oldValue, newValue -> println "Agent $agent changing value from $oldValue to $newValue"}

counter.addValidator {oldValue, newValue -> if (oldValue > newValue) throw new IllegalArgumentException('Things can only go up in Groovy')} counter.addValidator {agent, oldValue, newValue -> if (oldValue == newValue) throw new IllegalArgumentException('Things never stay the same for $agent')}

counter 10 counter 11 counter {updateValue 12} counter 10 //Will be rejected counter {updateValue it - 1} //Will be rejected counter {updateValue it} //Will be rejected counter {updateValue 11} //Will be rejected counter 12 //Will be rejected counter 20 counter.await()

监听器和验证器本质上都是闭包,接受两个或三个参数。从验证器中抛出的异常将在代理内部记录,可以使用 hasErrors() 方法测试,也可以通过 errors 属性检索。

assert counter.hasErrors()
assert counter.errors.size() == 5

验证器注意事项

由于 Groovy 在数据类型和不变性方面不太严格,因此代理用户应该注意潜在的问题。如果提交的代码直接修改状态,则验证器将无法在验证规则违反的情况下撤消更改。有两种可能的解决方案可用

  1. 确保您永远不会更改代表当前代理状态的提供的对象
  2. 在代理上使用自定义复制策略,允许代理创建内部状态的副本

在这两种情况下,您都需要调用 updateValue() 来正确设置和验证新状态。

问题以及这两个解决方案如下所示

//Create an agent storing names, rejecting 'Joe'
final Closure rejectJoeValidator = {oldValue, newValue -> if ('Joe' in newValue) throw new IllegalArgumentException('Joe is not allowed to enter our list.')}

Agent agent = new Agent([]) agent.addValidator rejectJoeValidator

agent {it << 'Dave'} //Accepted agent {it << 'Joe'} //Erroneously accepted, since by-passes the validation mechanism println agent.val

//Solution 1 - never alter the supplied state object agent = new Agent([]) agent.addValidator rejectJoeValidator

agent {updateValue(['Dave', * it])} //Accepted agent {updateValue(['Joe', * it])} //Rejected println agent.val

//Solution 2 - use custom copy strategy on the agent agent = new Agent([], {it.clone()}) agent.addValidator rejectJoeValidator

agent {updateValue it << 'Dave'} //Accepted agent {updateValue it << 'Joe'} //Rejected, since 'it' is now just a copy of the internal agent's state println agent.val

分组

默认情况下,所有 Agent 实例都属于同一个组,共享其守护线程池。

自定义组也可以创建 Agent 实例。这些实例将属于创建它们的组,并将共享一个线程池。要创建属于一个组的 Agent 实例,请在该组上调用 agent() 工厂方法。这样,您可以组织和调整代理的性能。

final def group = new NonDaemonPGroup(5)  //create a group around a thread pool
def jugMembers = group.agent(['Me'])  //add Me

代理的默认线程池包含守护线程。确保您的自定义线程池也使用守护线程,这可以通过使用 DefaultPGroup 或向线程池构造函数提供您自己的线程工厂来实现,或者如果您的线程池使用非守护线程,例如当使用 NonDaemonPGroup 组类时,确保您通过调用其 shutdown() 方法显式地关闭组或线程池,否则您的应用程序将无法退出。

直接池替换

或者,通过在 Agent 实例上调用 attachToThreadPool() 方法,可以为其指定一个自定义线程池。

def jugMembers = new Agent<List<String>>(['Me'])  //add Me

final ExecutorService pool = Executors.newFixedThreadPool(10) jugMembers.attachToThreadPool(new DefaultPool(pool))

请记住,与演员一样,单个 Agent 实例(又名代理)一次只能使用一个线程。

购物车示例

import groovyx.gpars.agent.Agent

class ShoppingCart { private def cartState = new Agent([:]) //----------------- public methods below here ---------------------------------- public void addItem(String product, int quantity) { cartState << {it[product] = quantity} //the << operator sends //a message to the Agent } public void removeItem(String product) { cartState << {it.remove(product)} } public Object listContent() { return cartState.val } public void clearItems() { cartState << performClear }

public void increaseQuantity(String product, int quantityChange) { cartState << this.&changeQuantity.curry(product, quantityChange) } //----------------- private methods below here --------------------------------- private void changeQuantity(String product, int quantityChange, Map items) { items[product] = (items[product] ?: 0) + quantityChange } private Closure performClear = { it.clear() } } //----------------- script code below here ------------------------------------- final ShoppingCart cart = new ShoppingCart() cart.addItem 'Pilsner', 10 cart.addItem 'Budweisser', 5 cart.addItem 'Staropramen', 20

cart.removeItem 'Budweisser' cart.addItem 'Budweisser', 15

println "Contents ${cart.listContent()}"

cart.increaseQuantity 'Budweisser', 3 println "Contents ${cart.listContent()}"

cart.clearItems() println "Contents ${cart.listContent()}"

您可能在代码中注意到两种实现策略。
  1. 公共方法可以在内部将所需的代码发送到 Agent,而不是直接执行相同的功能。

因此,像这样的顺序代码

public void addItem(String product, int quantity) {
    cartState[product]=quantity

}

变成

public void addItem(String product, int quantity) {
    cartState << {it[product] = quantity}
}
2. 公共方法可以发送对内部私有方法或闭包的引用,这些引用包含要执行的所需功能。
public void clearItems() {
    cartState << performClear
}

private Closure performClear = { it.clear() }

可能需要柯里化,如果闭包除了当前内部状态实例之外还接受其他参数。请参见 increaseQuantity 方法。

打印机服务示例

另一个示例 - 一个由多个线程共享的非线程安全的打印机服务。打印机需要在打印之前设置文档和质量属性,因此如果保护不当,显然存在竞态条件的可能性。调用者不想阻塞,直到打印机可用,而 Actor 的即发即弃特性优雅地解决了这个问题。

import groovyx.gpars.agent.Agent

/** * A non-thread-safe service that slowly prints documents on at a time */ class PrinterService { String document String quality

public void printDocument() { println "Printing $document in $quality quality" Thread.sleep 5000 println "Done printing $document" } }

def printer = new Agent<PrinterService>(new PrinterService())

final Thread thread1 = Thread.start { for (num in (1..3)) { final String text = "document $num" printer << {printerService -> printerService.document = text printerService.quality = 'High' printerService.printDocument() } Thread.sleep 200 } println 'Thread 1 is ready to do something else. All print tasks have been submitted' }

final Thread thread2 = Thread.start { for (num in (1..4)) { final String text = "picture $num" printer << {printerService -> printerService.document = text printerService.quality = 'Medium' printerService.printDocument() } Thread.sleep 500 } println 'Thread 2 is ready to do something else. All print tasks have been submitted' }

[thread1, thread2]*.join() printer.await()

有关最新更新,请参阅相应的演示。

读取值

为了更紧密地遵循 Clojure 的哲学,Agent 类赋予读取比写入更高的优先级。通过使用 instantVal 属性,您的读取请求将绕过 Agent 的传入消息队列并返回内部状态的当前快照。 val 属性将在消息队列中等待处理,就像非阻塞变体 valAsync(Clojure cl) 一样,它将使用内部状态作为参数调用提供的闭包。

您必须记住,instantVal 属性可能会返回,尽管正确,但会返回随机查找的结果,因为 Agent 在执行 instantVal 时内部状态是非确定性的,并且取决于在线程调度程序执行 instantVal 的主体之前处理的消息。

await() 方法允许您等待处理之前提交给 Agent 的所有消息,因此会阻塞调用线程。

状态复制策略

为了避免泄露内部状态,Agent 类允许将复制策略指定为第二个构造函数参数。使用指定的复制策略,内部状态将由复制策略闭包处理,并且复制策略值的输出值将返回给调用者,而不是实际的内部状态。这适用于 instantValval 以及 valAsync()

错误处理

从提交的命令中抛出的异常将存储在代理内部,并且可以从 errors 属性中获取。该属性在读取后会清除。

def jugMembers = new Agent<List>()
    assert jugMembers.errors.empty

jugMembers.send {throw new IllegalStateException('test1')} jugMembers.send {throw new IllegalArgumentException('test2')} jugMembers.await()

List errors = jugMembers.errors assert 2 == errors.size() assert errors[0] instanceof IllegalStateException assert 'test1' == errors[0].message assert errors[1] instanceof IllegalArgumentException assert 'test2' == errors[1].message

assert jugMembers.errors.empty

公平代理和非公平代理

代理可以是公平的,也可以是非公平的。公平代理在处理每条消息后放弃线程,非公平代理保持线程,直到其消息队列为空。因此,非公平代理的性能往往优于公平代理。所有 Agent 实例的默认设置是 非公平,但是通过调用其 makeFair() 方法,可以使该实例变为公平。

def jugMembers = new Agent<List>(['Me'])  //add Me
    jugMembers.makeFair()

7 数据流

数据流并发提供了一种替代的并发模型,该模型本质上是安全且健壮的。

简介

查看使用 GPars 在 Groovy 中编写的示例,该示例对三个并发运行的任务执行的计算结果求和。

import static groovyx.gpars.dataflow.Dataflow.task

final def x = new DataflowVariable() final def y = new DataflowVariable() final def z = new DataflowVariable()

task { z << x.val + y.val }

task { x << 10 }

task { y << 5 }

println "Result: ${z.val}"

或者使用 Dataflows 类重写相同的算法。

import static groovyx.gpars.dataflow.Dataflow.task

final def df = new Dataflows()

task { df.z = df.x + df.y }

task { df.x = 10 }

task { df.y = 5 }

println "Result: ${df.z}"

我们启动了三个逻辑任务,它们可以并行运行并执行其特定活动。任务需要交换数据,它们使用 数据流变量 来实现。将数据流变量视为一次性通道,安全可靠地将数据从生产者传输到消费者。

数据流变量具有非常直接的语义。当一个任务需要从 DataflowVariable (通过 val 属性)读取值时,它将阻塞,直到另一个任务或线程(使用 '<<' 运算符)设置该值。每个 DataflowVariable 在其生命周期内只能 设置一次。请注意,您不必担心任务或线程及其对共享变量的访问顺序和同步。这些值会在您无需干预的情况下神奇地跨任务在适当的时间传递。数据在您无需干预或关注的情况下,在任务/线程之间无缝流动。

实现细节:示例中的三个任务 不一定需要映射到三个物理线程。任务表示所谓的“绿色”或“逻辑”线程,并且可以在后台映射到任意数量的物理线程。实际映射取决于调度程序,但数据流算法的结果不依赖于实际调度。

数据流变量的 bind 操作会静默地接受重新绑定到一个值,该值等于已绑定的值。调用 bindUnique 以拒绝对已绑定变量的相等值。

优点

以下列出了使用数据流并发(由 Jonas Bonér)获得的好处。

  • 没有竞争条件
  • 没有死锁
  • 确定性的死锁
  • 完全确定性的程序
  • 优美的代码。

这听起来不错,不是吗?

概念

数据流编程

引用维基百科

操作(在数据流程序中)由具有输入和输出的“黑盒”组成,所有这些都始终明确定义。它们在所有输入变为有效后立即运行,而不是在程序遇到它们时运行。传统程序本质上是一系列语句,指示“现在做这个,现在做那个”,而数据流程序更像是装配线上的一系列工人,他们将在材料到达后立即执行他们分配的任务。这就是数据流语言本质上是并行的原因;操作没有隐藏状态需要跟踪,并且所有操作都同时“准备好”。

原则

使用数据流并发,您可以安全地跨任务共享变量。这些变量(在 Groovy 中是 DataflowVariable 类的实例)在其生命周期内只能 分配(使用 '<<' 运算符)一个值。另一方面,变量的值可以多次读取(在 Groovy 中通过 val 属性),甚至在分配值之前也可以读取。在这种情况下,读取任务将暂停,直到另一个任务设置该值。因此,您可以简单地使用数据流变量为每个任务顺序编写代码,底层机制将确保您以线程安全的方式获得所需的所有值。

简而言之,您通常使用数据流变量执行三个操作。

  • 创建数据流变量
  • 等待变量绑定(读取它)
  • 绑定变量(写入它)

以下是您的程序必须遵循的三个基本规则。

  • 当程序遇到未绑定变量时,它将等待一个值。
  • 一旦绑定数据流变量,就不能更改其值。
  • 数据流变量使创建并发流代理变得容易。

数据流队列和广播

在您查看使用 数据流变量任务运算符的示例之前,您应该了解一些关于流和队列的知识,以便全面了解数据流并发。除了数据流变量之外,还有 DataflowQueuesDataflowBroadcast 的概念,您可以在代码中利用它们。您可以将它们视为线程安全缓冲区或队列,用于并发任务或线程之间的消息传递。查看典型的生产者-消费者演示。

import static groovyx.gpars.dataflow.Dataflow.task

def words = ['Groovy', 'fantastic', 'concurrency', 'fun', 'enjoy', 'safe', 'GPars', 'data', 'flow'] final def buffer = new DataflowQueue()

task { for (word in words) { buffer << word.toUpperCase() //add to the buffer } }

task { while(true) println buffer.val //read from the buffer in a loop }

DataflowBroadcastsDataflowQueues 都像 DataflowVariables 一样,实现了 DataflowChannel 接口,该接口具有通用方法,允许用户向它们写入数据并从它们读取值。当您开始使用它们将 任务运算符选择器 连接在一起时,能够通过 DataflowChannel 接口以相同的方式处理这两种类型非常有用。

DataflowChannel 接口组合了两个接口,每个接口都服务于其目的。
  • DataflowReadChannel 包含从通道读取值所需的所有方法 - getVal()、getValAsync()、whenBound() 等。
  • DataflowWriteChannel 包含将值写入通道所需的所有方法 - bind()、<<。

您可能更喜欢使用这些专用接口,而不是使用通用的 DataflowChannel 接口,以便更好地表达预期的用法。

有关通道接口的更多详细信息,请参阅 API 文档

点对点通信

DataflowQueue 类可以被视为点对点(1 对 1,多对 1)通信通道。它允许一个或多个生产者向一个读取器发送消息。如果多个读取器从同一个 DataflowQueue 读取,则它们将各自消费不同的消息。或者换句话说,每条消息只被一个读取器消费。您可以轻松地想象一个简单的负载均衡方案,该方案围绕一个共享的 DataflowQueue 建立,当您的算法的消费者部分需要扩展时,读取器会动态添加。这也是连接任务或运算符时有用的默认选择。

发布-订阅通信

DataflowBroadcast 类提供发布-订阅(1 对多,多对多)通信模型。一个或多个生产者写入消息,而所有注册的读取器都将接收所有消息。因此,每条消息都会在消息写入通道时被所有具有有效订阅的读取器消费。读取器通过调用 createReadChannel() 方法进行订阅。

DataflowWriteChannel broadcastStream = new DataflowBroadcast()
DataflowReadChannel stream1 = broadcastStream.createReadChannel()
DataflowReadChannel stream2 = broadcastStream.createReadChannel()
broadcastStream << 'Message1'
broadcastStream << 'Message2'
broadcastStream << 'Message3'
assert stream1.val == stream2.val
assert stream1.val == stream2.val
assert stream1.val == stream2.val

在内部, DataflowBroadcast 使用 DataflowStream 类来实现消息传递。

DataflowStream

DataflowStream 类表示一个确定性的数据流通道。它基于函数式队列的概念构建,因此为消息传递提供了无锁线程安全实现。从本质上讲,您可以将 DataflowStream 视为一个 1 对多通信通道,因为当一个读取器消费一条消息时,其他读取器仍然能够读取该消息。此外,所有消息都以相同的顺序到达所有读取器。由于 DataflowStream 是作为函数式队列实现的,因此其 API 要求用户自己遍历流中的值。另一方面, DataflowStream 提供了方便的方法用于值过滤或转换,并具有有趣的性能特征。

DataflowStream 类与其他通信元素不同,它没有实现 DataflowChannel 接口,因为其使用语义有所不同。使用 DataflowStreamReadAdapterDataflowStreamWriteAdapter 类将 DataflowChannel 类的实例包装在 DataflowReadChannelDataflowWriteChannel 实现中。

import groovyx.gpars.dataflow.stream.DataflowStream
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.ResizeablePool

/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using dataflow tasks * * In principle, the algorithm consists of a concurrently run chained filters, * each of which detects whether the current number can be divided by a single prime number. * (generate nums 1, 2, 3, 4, 5, ...) -> (filter by mod 2) -> (filter by mod 3) -> (filter by mod 5) -> (filter by mod 7) -> (filter by mod 11) -> (caution! Primes falling out here) * The chain is built (grows) on the fly, whenever a new prime is found */

/** * We need a resizeable thread pool, since tasks consume threads while waiting blocked for values at DataflowQueue.val */ group = new DefaultPGroup(new ResizeablePool(true))

final int requestedPrimeNumberCount = 100

/** * Generating candidate numbers */ final DataflowStream candidates = new DataflowStream() group.task { candidates.generate(2, {it + 1}, {it < 1000}) }

/** * Chain a new filter for a particular prime number to the end of the Sieve * @param inChannel The current end channel to consume * @param prime The prime number to divide future prime candidates with * @return A new channel ending the whole chain */ def filter(DataflowStream inChannel, int prime) { inChannel.filter { number -> group.task { number % prime != 0 } } }

/** * Consume Sieve output and add additional filters for all found primes */ def currentOutput = candidates requestedPrimeNumberCount.times { int prime = currentOutput.first println "Found: $prime" currentOutput = filter(currentOutput, prime) }

为了方便起见,以及为了能够将 DataflowStream 与其他数据流结构(例如运算符)一起使用,您可以使用 DataflowReadAdapter 对其进行包装以进行读取访问,或使用 DataflowWriteAdapter 对其进行包装以进行写入访问。 DataflowStream 类专为单线程生产者和消费者设计。如果多个线程应该读取或写入流中的值,则必须从外部对其访问流进行序列化,或者应该使用适配器。

DataflowStream 适配器

由于 DataflowStream 的 API 以及其使用语义与 Dataflow(Read/Write)Channel 定义的 API 和语义非常不同,因此必须使用适配器才能允许 DataflowStreams 与其他数据流元素一起使用。 DataflowStreamReadAdapter 类将使用必要的读取值方法包装 DataflowStream ,而 DataflowStreamWriteAdapter 类将围绕包装的 DataflowStream 提供写入方法。

需要说明的是,DataflowStreamWriteAdapter 是线程安全的,允许多个线程通过适配器向包装的 DataflowStream 添加值。另一方面,DataflowStreamReadAdapter 被设计为由单个线程使用。

为了最小化开销并与 DataflowStream 语义保持一致,DataflowStreamReadAdapter 类不是线程安全的,应该只在单个线程中使用。如果多个线程需要从 DataflowStream 读取数据,它们应该分别创建自己的包装 DataflowStreamReadAdapter

由于适配器的存在,DataflowStream 可以用于操作符或选择器之间的通信,这些操作符或选择器期望 Dataflow(Read/Write)Channels

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.stream.DataflowStream
import groovyx.gpars.dataflow.stream.DataflowStreamReadAdapter
import groovyx.gpars.dataflow.stream.DataflowStreamWriteAdapter
import static groovyx.gpars.dataflow.Dataflow.selector
import static groovyx.gpars.dataflow.Dataflow.operator

/** * Demonstrates the use of DataflowStreamAdapters to allow dataflow operators to use DataflowStreams */

final DataflowStream a = new DataflowStream() final DataflowStream b = new DataflowStream() def aw = new DataflowStreamWriteAdapter(a) def bw = new DataflowStreamWriteAdapter(b) def ar = new DataflowStreamReadAdapter(a) def br = new DataflowStreamReadAdapter(b)

def result = new DataflowQueue()

def op1 = operator(ar, bw) { bindOutput it } def op2 = selector([br], [result]) { result << it }

aw << 1 aw << 2 aw << 3 assert([1, 2, 3] == [result.val, result.val, result.val]) op1.stop() op2.stop() op1.join() op2.join()

此外,从多个 DataflowChannels 中选择值的能力只能通过围绕 DataflowStream 的适配器使用。

import groovyx.gpars.dataflow.Select
import groovyx.gpars.dataflow.stream.DataflowStream
import groovyx.gpars.dataflow.stream.DataflowStreamReadAdapter
import groovyx.gpars.dataflow.stream.DataflowStreamWriteAdapter
import static groovyx.gpars.dataflow.Dataflow.select
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates the use of DataflowStreamAdapters to allow dataflow select to select on DataflowStreams */

final DataflowStream a = new DataflowStream() final DataflowStream b = new DataflowStream() def aw = new DataflowStreamWriteAdapter(a) def bw = new DataflowStreamWriteAdapter(b) def ar = new DataflowStreamReadAdapter(a) def br = new DataflowStreamReadAdapter(b)

final Select<?> select = select(ar, br) task { aw << 1 aw << 2 aw << 3 } assert 1 == select().value assert 2 == select().value assert 3 == select().value task { bw << 4 aw << 5 bw << 6 } def result = (1..3).collect{select()}.sort{it.value} assert result*.value == [4, 5, 6] assert result*.index == [1, 0, 1]

如果您不需要任何功能队列 DataflowStream-special 功能,例如生成、过滤或映射,您可以考虑使用 DataflowBroadcast 类,它通过 DataflowChannel 接口提供 发布-订阅 通信模型。

绑定处理程序

def a = new DataflowVariable()
a >> {println "The variable has just been bound to $it"}
a.whenBound {println "Just to confirm that the variable has been really set to $it"}
...

绑定处理程序可以使用 >> 运算符和 then()whenBound() 方法注册到所有数据流通道(变量、队列或广播)上。它们将在值绑定到变量后立即运行。

数据流队列和广播也支持 wheneverBound 方法来注册闭包或消息处理程序,以便在每次将值绑定到它们时运行。

def queue = new DataflowQueue()
queue.wheneverBound {println "A value $it arrived to the queue"}

显然,没有任何东西能阻止您为单个承诺拥有更多这样的处理程序:一旦承诺具有具体的值,它们都将并行触发。

Promise bookingPromise = task {
    final data = collectData()
    return broker.makeBooking(data)
}
…
bookingPromise.whenBound {booking -> printAgenda booking}
bookingPromise.whenBound {booking -> sendMeAnEmailTo booking}
bookingPromise.whenBound {booking -> updateTheCalendar booking}

数据流变量和广播是实现 并行推测 的几种可能方法之一。有关详细信息,请查看用户指南的 并行集合 部分中的 并行推测

绑定处理程序分组

当您需要等待多个 DataflowVariables/Promises 被绑定时,您可以从调用 whenAllBound() 函数中受益,该函数在 Dataflow 类和 PGroup 实例上都可用。

final group = new NonDaemonPGroup()

//Calling asynchronous services and receiving back promises for the reservations Promise flightReservation = flightBookingService('PRG <-> BRU') Promise hotelReservation = hotelBookingService('BRU:Feb 24 2009 - Feb 29 2009') Promise taxiReservation = taxiBookingService('BRU:Feb 24 2009 10:31')

//when all reservations have been made we need to build an agenda for our trip Promise agenda = group.whenAllBound(flightReservation, hotelReservation, taxiReservation) {flight, hotel, taxi -> "Agenda: $flight | $hotel | $taxi" }

//since this is a demo, we will only print the agenda and block till it is ready println agenda.val

如果您无法事先指定 whenAllBound() 处理程序接受的参数数量,请使用一个带有 List 类型参数的闭包。

Promise module1 = task {
    compile(module1Sources)
}
Promise module2 = task {
    compile(module2Sources)
}
//We don't know the number of modules that will be jarred together, so use a List
final jarCompiledModules = {List modules -> ...}

whenAllBound([module1, module2], jarCompiledModules)

绑定处理程序链接

所有数据流通道还支持 then() 方法来注册一个处理程序(回调),该处理程序应在值可用时调用。与 whenBound() 不同,then() 方法允许链接,让您可以选择异步地在函数之间传递结果值。

请注意,Groovy 允许我们省略 then() 方法链中的一些

final DataflowVariable variable = new DataflowVariable()
final DataflowVariable result = new DataflowVariable()

variable.then {it * 2} then {it + 1} then {result << it} variable << 4 assert 9 == result.val

这可以很好地与 异步函数 相结合

final DataflowVariable variable = new DataflowVariable()
final DataflowVariable result = new DataflowVariable()

final doubler = {it * 2} final adder = {it + 1}

variable.then doubler then adder then {result << it}

Thread.start {variable << 4} assert 9 == result.val

ActiveObjects

@ActiveObject
class ActiveDemoCalculator {
    @ActiveMethod
    def doubler(int value) {
        value * 2
    }

@ActiveMethod def adder(int value) { value + 1 } }

final DataflowVariable result = new DataflowVariable() final calculator = new ActiveDemoCalculator(); calculator.doubler(4).then {calculator.adder it}.then {result << it} assert 9 == result.val

当从 whenBound() 处理程序内部调用其他异步服务时,链接可以节省相当多的代码。异步服务,如 异步函数主动方法 ,为其结果返回 Promises 。为了获得实际结果,您的处理程序要么必须阻塞以等待值被绑定,这将使当前线程处于非生产状态,
variable.whenBound {value ->
    Promise promise = asyncFunction(value)
    println promise.get()
}
或者,它将注册另一个(嵌套的) whenBound() 处理程序,这将导致不必要的复杂代码。
variable.whenBound {value ->
    asyncFunction(value).whenBound {
        println it
    }
}
为了说明,请比较以下两个代码片段,一个使用 whenBound(),另一个使用 then() 链接。它们在功能和行为方面都是等效的。

final DataflowVariable variable = new DataflowVariable()

final doubler = {it * 2} final inc = {it + 1}

//Using whenBound() variable.whenBound {value -> task { doubler(value) }.whenBound {doubledValue -> task { inc(doubledValue) }.whenBound {incrementedValue -> println incrementedValue } } }

//Using then() chaining variable.then doubler then inc then this.&println

Thread.start {variable << 4}

链接 Promises 可以优雅地解决这两个问题。
variable >> asyncFunction >> {println it}

右移 ( >> ) 运算符已被重载以调用 then(),因此可以以相同的方式链接。

final DataflowVariable variable = new DataflowVariable()
final DataflowVariable result = new DataflowVariable()

final doubler = {it * 2} final adder = {it + 1}

variable >> doubler >> adder >> {result << it}

Thread.start {variable << 4}

assert 9 == result.val

Promise 链接的错误处理

异步操作显然可能会抛出异常。能够轻松且省力地处理它们非常重要。GPars 承诺可以隐式地将异步计算中的异常跨承诺链传播。

  1. Promises 传播结果值以及异常。阻塞 get() 方法重新抛出绑定到 Promise 的异常,因此调用者可以处理它。
  2. 对于异步通知,whenBound() 处理程序闭包将异常作为参数传递。
  3. then() 方法接受两个参数 - 一个 值处理程序 和一个可选的 错误处理程序。它们将根据结果是常规值还是异常而被调用。如果未指定 errorHandler,则异常将重新抛出到由 then() 返回的 Promise 中。
  4. then() 完全相同的行为适用于 whenAllBound() 方法,该方法监听多个 Promises 以便绑定。

Promise<Integer> initial = new DataflowVariable<Integer>()
    Promise<String> result = initial.then {it * 2} then {100 / it}                  //Will throw exception for 0
            .then {println "Logging the value $it as it passes by"; return it}      //Since no error handler is defined, exceptions will be ignored
                                                                                    //and silently re-thrown to the next handler in the chain
            .then({"The result for $num is $it"}, {"Error detected for $num: $it"}) //Here the exception is caught
    initial << 0
    println result.get()

ErrorHandler 是一个闭包,它接受 Throwable 的实例作为其唯一的(可选)参数,并返回一个应绑定到 then() 方法调用(返回的 Promise)结果的值。如果从错误处理程序内部抛出异常,则它将作为错误绑定到结果 Promise。

promise.then({it+1})                                                         //Implicitly re-throws potential exceptions bound to promise
promise.then({it+1}, {e -> throw e})                                         //Explicitly re-throws potential exceptions bound to promise
promise.then({it+1}, {e -> throw new RuntimeException('Error occurred', e})  //Explicitly re-throws a new exception wrapping a potential exception bound to promise

就像使用 Java 中带有 try-catch 语句的常规异常处理一样,GPars 承诺的这种行为使异步调用能够自由地在最方便的地方处理异常。您可以随意忽略代码中的异常,并假设事情正常工作,但异常不会意外地被吞没。

task {
    'gpars.codehaus.org'.toURL().text  //should throw MalformedURLException
}
.then {page -> page.toUpperCase()}
.then {page -> page.contains('GROOVY')}
.then({mentionsGroovy -> println "Groovy found: $mentionsGroovy"}, {error -> println "Error: $error"}).join()

处理具体异常类型

您也可以更具体地说明处理的异常类型。

url.then(download)
    .then(calculateHash, {MalformedURLException e -> return 0})
    .then(formatResult)
    .then(printResult, printError)
    .then(sendNotificationEmail);
`

客户站点异常处理

您也可以完全不处理异常,而是让客户端(使用者)处理它。

`
Promise<Object> result = url.then(download).then(calculateHash).then(formatResult).then(printResult);
try {
    result.get()
} catch (Exception e) {
    //handle exceptions here
}
`

整合

通过结合 whenAllBound()then(或 >>),您可以轻松地以方便的方式创建大型异步场景。

withPool {
    Closure download = {String url ->
        sleep 3000  //Simulate a web read
        'web content'
    }.asyncFun()

Closure loadFile = {String fileName -> 'file content' //simulate a local file read }.asyncFun()

Closure hash = {s -> s.hashCode()}

Closure compare = {int first, int second -> first == second }

Closure errorHandler = {println "Error detected: $it"}

def all = whenAllBound([ download('http://www.gpars.org') >> hash, loadFile('/coolStuff/gpars/website/index.html') >> hash ], compare).then({println it}, errorHandler) all.join() //optionally block until the calculation is all done

请注意,只有初始操作(函数)需要是异步的。管道中更下游的函数将由承诺异步调用,即使它们是同步的。

使用 Promises 实现 fork/join 模式

Promises 非常灵活,可以作为许多不同场景的实现工具。以下是 Promises 的一项额外的便捷功能。_thenForkAndJoin() 方法在当前承诺绑定后触发多个活动,并返回一个承诺,该承诺仅在所有活动完成后绑定。让我们看看它如何融入到整体画面中。

  • then() - 允许活动链接,以便一个活动在另一个活动完成后执行。
  • whenAllBound() - 允许连接多个活动,以便只有在所有活动完成后才开始新活动。
  • task() - 允许创建(fork)多个异步活动。
  • thenForkAndJoin() - fork 多个活动并加入它们的简写。

因此,使用 thenForkAndJoin(),您只需创建多个活动,这些活动应该由一个共享的(触发)承诺触发。

promise.thenForkAndJoin(task1, task2, task3).then{...}

一旦所有活动都返回结果,它们将被收集到一个列表中,并绑定到由 thenForkAndJoin() 返回的承诺中。

task {
    2
}.thenForkAndJoin({ it ** 2 }, { it**3 }, { it**4 }, { it**5 }).then({ println it}).join()

延迟数据流任务和变量

有时您可能希望将数据流变量的特性与其延迟初始化相结合。

Closure<String> download = {url ->
    println "Downloading"
    url.toURL().text
}

def pageContent = new LazyDataflowVariable(download.curry("http://gpars.codehaus.org"))

LazyDataflowVariable 的实例在构造时指定了一个初始化程序,该初始化程序只有在有人通过阻塞 get() 方法或使用任何非阻塞回调方法(如 then())请求其值时才会触发。由于 LazyDataflowVariables 保留了普通 DataflowVariables 的所有优点,因此您可以再次轻松地将它们与其他 延迟普通 数据流变量链接。

示例

这需要一个更实际的例子。从 http://blog.jcoglan.com/2013/03/30/callbacks-are-imperative-promises-are-functional-nodes-biggest-missed-opportunity/ 中汲取灵感,以下代码片段展示了使用 LazyDataflowVariables 将相互依赖的组件延迟地且异步地加载到内存中。这些组件(模块)将按照其依赖关系的顺序加载,如果可能,将并发加载。每个模块只会被加载一次,无论依赖它的模块数量是多少。由于惰性,只有那些被间接需要的模块才会被加载。我们的示例使用了一个简单的“菱形”依赖关系方案。

  • D 依赖于 B 和 C
  • C 依赖于 A
  • B 依赖于 A

加载 D 时,A 将首先被加载。B 和 C 在 A 加载完成后会并发加载。D 在 B 和 C 都加载完成后开始加载。

def moduleA = new LazyDataflowVariable({->
    println "Loading moduleA into memory"
    sleep 3000
    println "Loaded moduleA into memory"
    return "moduleA"
})

def moduleB = new LazyDataflowVariable({-> moduleA.then { println "->Loading moduleB into memory, since moduleA is ready" sleep 3000 println " Loaded moduleB into memory" return "moduleB" } })

def moduleC = new LazyDataflowVariable({-> moduleA.then { println "->Loading moduleC into memory, since moduleA is ready" sleep 3000 println " Loaded moduleC into memory" return "moduleC" } })

def moduleD = new LazyDataflowVariable({-> whenAllBound(moduleB, moduleC) { b, c -> println "-->Loading moduleD into memory, since moduleB and moduleC are ready" sleep 3000 println " Loaded moduleD into memory" return "moduleD" } })

println "Nothing loaded so far" println "===================================================================" println "Load module: " + moduleD.get() println "===================================================================" println "All requested modules loaded"

使任务延迟

lazyTask() 方法与 task() 方法一起提供,为用户提供面向任务的延迟活动的抽象。一个 延迟任务 返回一个 LazyDataflowVariable 的实例(一个 Promise),其初始化程序设置为提供的闭包。只要有人请求该值,任务就会异步开始,并最终将一个值传递到 LazyDataflowVariable 中。

import groovyx.gpars.dataflow.Dataflow

def pageContent = Dataflow.lazyTask { println "Downloading" "http://gpars.codehaus.org".toURL().text }

println "No-one has asked for the value just yet. Bound = ${pageContent.bound}" sleep 1000 println "Now going to ask for a value" println pageContent.get().size() println "Repetitive requests will receive the already calculated value. No additional downloading." println pageContent.get().size()

数据流表达式

看看下面的魔法。

def initialDistance = new DataflowVariable()
def acceleration = new DataflowVariable()
def time = new DataflowVariable()

task { initialDistance << 100 acceleration << 2 time << 10 }

def result = initialDistance + acceleration*0.5*time**2 println 'Total distance ' + result.val

我们使用 DataflowVariables 来表示计算加速物体总距离的数学方程中的几个参数。但是,在方程本身中,我们直接使用 DataflowVariables。我们没有引用它们表示的值,但我们仍然能够正确地进行数学运算。这表明 DataflowVariables 可以非常灵活。

例如,您可以在它们上面调用方法,这些方法将被分派到绑定的值。

def name = new DataflowVariable()
task {
    name << '  adam   '
}
println name.toUpperCase().trim().val

您可以将其他 DataflowVariables 作为参数传递给这些方法,实际值将自动传递。

def title = new DataflowVariable()
def searchPhrase = new DataflowVariable()
task {
    title << ' Groovy in Action 2nd edition   '
}

task { searchPhrase << '2nd' }

println title.trim().contains(searchPhrase).val

您还可以使用 DataflowVariable 直接查询绑定值的属性。

def book = new DataflowVariable()
def searchPhrase = new DataflowVariable()
task {
    book << [
             title:'Groovy in Action 2nd edition   ',
             author:'Dierk Koenig',
             publisher:'Manning']
}

task { searchPhrase << '2nd' }

book.title.trim().contains(searchPhrase).whenBound {println it} //Asynchronous waiting

println book.title.trim().contains(searchPhrase).val //Synchronous waiting

请注意,结果仍然是一个 DataflowVariable(确切地说是 DataflowExpression),您可以同步和异步地从它获取实际值。

绑定错误通知

DataflowVariables 提供了在每次绑定操作失败时向注册的监听器发送通知的能力。 getBindErrorManager() 方法允许添加和删除监听器。如果尝试绑定已绑定数据流变量的值(通过 bind()、bindSafely()、bindUnique() 或 leftShift())或错误(通过 bindError())失败,监听器将收到通知。

final DataflowVariable variable = new DataflowVariable()

variable.getBindErrorManager().addBindErrorListener(new BindErrorListener() { @Override void onBindError(final Object oldValue, final Object failedValue, final boolean uniqueBind) { println "Bind failed!" }

@Override void onBindError(final Object oldValue, final Throwable failedError) { println "Binding an error failed!" }

@Override public void onBindError(final Throwable oldError, final Object failedValue, final boolean uniqueBind) { println "Bind failed!" }

@Override public void onBindError(final Throwable oldError, final Throwable failedError) { println "Binding an error failed!" }

})

这使您可以自定义对尝试绑定已绑定数据流变量的反应。例如,使用 bindSafely(),您不会收到绑定异常发送到调用者,而是注册的 BindErrorListener 会收到通知。

进一步阅读

Jonas Bonér 的 Scala 数据流库

Jonas Bonér 的 JVM 并发演示幻灯片

Ruby 的数据流并发库。

7.1 任务

**数据流任务**提供了一种易于理解的抽象概念,用于表示相互独立的逻辑任务或线程,它们可以并发运行,并仅通过数据流变量、队列、广播和流来交换数据。数据流任务及其易于表达的相互依赖关系和固有的顺序执行体,也可以用作 UML **活动图**的实际实现。

查看示例。

一个简单的混搭示例

在这个示例中,我们分别在独立的任务中下载三个热门网站的首页,而在另一个任务中,我们过滤掉今天谈论 Groovy 的网站并生成输出。输出任务通过三个数据流变量自动与三个下载任务同步,通过这些变量,每个网站的内容被传递到输出任务。

import static groovyx.gpars.GParsPool.withPool
import groovyx.gpars.dataflow.DataflowVariable
import static groovyx.gpars.dataflow.Dataflow.task

/** * A simple mashup sample, downloads content of three websites * and checks how many of them refer to Groovy. */

def dzone = new DataflowVariable() def jroller = new DataflowVariable() def theserverside = new DataflowVariable()

task { println 'Started downloading from DZone' dzone << 'http://www.dzone.com'.toURL().text println 'Done downloading from DZone' }

task { println 'Started downloading from JRoller' jroller << 'http://www.jroller.com'.toURL().text println 'Done downloading from JRoller' }

task { println 'Started downloading from TheServerSide' theserverside << 'http://www.theserverside.com'.toURL().text println 'Done downloading from TheServerSide' }

task { withPool { println "Number of Groovy sites today: " + ([dzone, jroller, theserverside].findAllParallel { it.val.toUpperCase().contains 'GROOVY' }).size() } }.join()

对任务进行分组

数据流任务可以被组织成组,以便进行性能微调。组提供了一个方便的 **task()** 工厂方法来创建附加到组的任务。使用组可以将任务或操作符组织在不同的线程池周围(封装在组内)。虽然 Dataflow.task() 命令在默认线程池(java.util.concurrent.Executor,固定大小=#cpu+1,守护线程)上调度任务,但你可能更愿意定义自己的线程池来运行你的任务。

import groovyx.gpars.group.DefaultPGroup

def group = new DefaultPGroup()

group.with { task { … }

task { … } }

数据流任务的默认线程池包含守护线程,这意味着你的应用程序将在主线程完成时立即退出,不会等待所有任务完成。当对任务进行分组时,确保你的自定义线程池也使用守护线程,这可以通过使用 DefaultPGroup 或者在线程池构造函数中提供自己的线程工厂来实现,或者,如果你的线程池使用非守护线程(例如,使用 NonDaemonPGroup 组类时),确保你通过调用其 shutdown() 方法显式地关闭组或线程池,否则你的应用程序将不会退出。

你可以使用 _Dataflow.usingGroup() 方法选择性地覆盖代码块中用于任务、操作符、回调和其他数据流元素的默认组。

Dataflow.usingGroup(group) {
    task {
        'http://gpars.codehaus.org'.toURL().text  //should throw MalformedURLException
    }
    .then {page -> page.toUpperCase()}
    .then {page -> page.contains('GROOVY')}
    .then({mentionsGroovy -> println "Groovy found: $mentionsGroovy"}, {error -> println "Error: $error"}).join()
}

你可以始终通过指定来覆盖默认组。

Dataflow.usingGroup(group) {
    anotherGroup.task {
        'http://gpars.codehaus.org'.toURL().text  //should throw MalformedURLException
    }
    .then(anotherGroup) {page -> page.toUpperCase()}
    .then(anotherGroup) {page -> page.contains('GROOVY')}.then(anotherGroup) {println Dataflow.retrieveCurrentDFPGroup();it}
    .then(anotherGroup, {mentionsGroovy -> println "Groovy found: $mentionsGroovy"}, {error -> println "Error: $error"}).join()
}

带方法的混搭变体

为了避免给你关于构建数据流代码的错误印象,下面是对混搭示例的重写,使用一个 **downloadPage()** 方法在一个单独的任务中执行实际下载并返回一个 DataflowVariable 实例,以便主应用程序线程最终可以获取下载的内容。数据流变量显然可以作为参数或返回值传递。

package groovyx.gpars.samples.dataflow

import static groovyx.gpars.GParsExecutorsPool.withPool import groovyx.gpars.dataflow.DataflowVariable import static groovyx.gpars.dataflow.Dataflow.task

/** * A simple mashup sample, downloads content of three websites and checks how many of them refer to Groovy. */ final List urls = ['http://www.dzone.com', 'http://www.jroller.com', 'http://www.theserverside.com']

task { def pages = urls.collect { downloadPage(it) } withPool { println "Number of Groovy sites today: " + (pages.findAllParallel { it.val.toUpperCase().contains 'GROOVY' }).size() } }.join()

def downloadPage(def url) { def page = new DataflowVariable() task { println "Started downloading from $url" page << url.toURL().text println "Done downloading from $url" } return page }

一个物理计算示例

数据流程序自然地随着处理器数量的增加而扩展。在一定程度上,处理器越多,程序运行速度越快。例如,看看下面的脚本,它计算一个简单物理实验的参数并输出结果。每个任务执行其计算的一部分,可能依赖于其他任务计算的值,其结果也可能被其他任务需要。使用数据流并发,你可以根据需要将工作分解到任务中或重新排序任务,数据流机制将确保计算正确完成。

import groovyx.gpars.dataflow.DataflowVariable
import static groovyx.gpars.dataflow.Dataflow.task

final def mass = new DataflowVariable() final def radius = new DataflowVariable() final def volume = new DataflowVariable() final def density = new DataflowVariable() final def acceleration = new DataflowVariable() final def time = new DataflowVariable() final def velocity = new DataflowVariable() final def decelerationForce = new DataflowVariable() final def deceleration = new DataflowVariable() final def distance = new DataflowVariable()

def t = task { println """ Calculating distance required to stop a moving ball. ==================================================== The ball has a radius of ${radius.val} meters and is made of a material with ${density.val} kg/m3 density, which means that the ball has a volume of ${volume.val} m3 and a mass of ${mass.val} kg. The ball has been accelerating with ${acceleration.val} m/s2 from 0 for ${time.val} seconds and so reached a velocity of ${velocity.val} m/s.

Given our ability to push the ball backwards with a force of ${decelerationForce.val} N (Newton), we can cause a deceleration of ${deceleration.val} m/s2 and so stop the ball at a distance of ${distance.val} m.

======================================================================================================================= This example has been calculated asynchronously in multiple tasks using GPars Dataflow concurrency in Groovy. Author: ${author.val} """

System.exit 0 }

task { mass << volume.val * density.val }

task { volume << Math.PI * (radius.val ** 3) }

task { radius << 2.5 density << 998.2071 //water acceleration << 9.80665 //free fall decelerationForce << 900 }

task { println 'Enter your name:' def name = new InputStreamReader(System.in).readLine() author << (name?.trim()?.size()>0 ? name : 'anonymous') }

task { time << 10 velocity << acceleration.val * time.val }

task { deceleration << decelerationForce.val / mass.val }

task { distance << deceleration.val * ((velocity.val/deceleration.val) ** 2) * 0.5 }

t.join()

注意:我尽力使所有物理计算都正确。你可以随意更改值,看看你需要多长的距离才能让滚动的球停止。

确定性的死锁

如果你碰巧在依赖关系中引入了死锁,死锁将在每次运行代码时出现。不允许随机性。这是数据流并发的优势之一。无论实际的线程调度方案如何,如果在测试中没有死锁,那么在生产中也不会出现死锁。

task {
    println a.val
    b << 'Hi there'
}

task { println b.val a << 'Hello man' }

数据流映射

作为一种方便的快捷方式,**Dataflows** 类可以帮助你减少编写代码以利用数据流变量的数量。

def df = new Dataflows()
df.x = 'value1'
assert df.x == 'value1'

Dataflow.task {df.y = 'value2}

assert df.y == 'value2'

将 Dataflows 视为一个映射,其中数据流变量作为键存储其绑定的值作为相应的映射值。读取值(例如 df.x)和绑定值(例如 df.x = 'value')的语义与普通数据流变量的语义相同(分别为 x.val 和 x << 'value')。

混合 **Dataflows** 和 Groovy **with** 块

当位于 Dataflows 实例的 **with** 块内部时,可以访问 Dataflows 实例内部存储的数据流变量,而无需在它们前面添加 Dataflows 实例标识符。

new Dataflows().with {
    x = 'value1'
    assert x == 'value1'

Dataflow.task {y = 'value2}

assert y == 'value2' }

从任务返回一个值

通常,数据流任务通过数据流变量进行通信。除此之外,任务还可以通过数据流变量返回值。当调用 **task()** 工厂方法时,你会得到一个 Promise 实例(实现为 DataflowVariable),通过它可以监听任务的返回值,就像使用任何其他 Promise 或 DataflowVariable 一样。

final Promise t1 = task {
        return 10
    }
    final Promise t2 = task {
        return 20
    }
    def results = [t1, t2]*.val
    println 'Both sub-tasks finished and returned values: ' + results

显然,也可以在不阻塞调用者的前提下使用 **whenBound()** 方法获取该值。

def task = task {
    println 'The task is running and calculating the return value'
    30
}
task >> {value -> println "The task finished and returned $value"}

h2. 加入任务

在任务的结果数据流变量上使用 **join()** 操作,可以阻塞,直到任务完成。

task {
     final Promise t1 = task {
         println 'First sub-task running.'
     }
     final Promise t2 = task {
         println 'Second sub-task running'
     }
     [t1, t2]*.join()
     println 'Both sub-tasks finished'
 }.join()

7.2 选择器

通常需要从多个数据流通道(变量、队列、广播或流)中获取一个值。**Select** 类适合这种情况。**Select** 可以扫描多个数据流通道并从所有输入通道中选择一个当前可读取值的通道。从该通道读取的值将与原始通道的索引一起返回给调用者。选择通道是随机的,或者基于通道优先级,在这种情况下,**Select** 构造函数中位置索引较低的通道具有更高的优先级。

从多个通道选择一个值

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import static groovyx.gpars.dataflow.Dataflow.select
import static groovyx.gpars.dataflow.Dataflow.task

/** * Shows a basic use of Select, which monitors a set of input channels for values and makes these values * available on its output irrespective of their original input channel. * Note that dataflow variables and queues can be combined for Select. * * You might also consider checking out the prioritySelect method, which prioritizes values by the index of their input channel */ def a = new DataflowVariable() def b = new DataflowVariable() def c = new DataflowQueue()

task { sleep 3000 a << 10 }

task { sleep 1000 b << 20 }

task { sleep 5000 c << 30 }

def select = select([a, b, c]) println "The fastest result is ${select().value}"

注意,从 **select()** 返回的类型是 **SelectResult**,它包含值以及原始通道索引。

有多种方法可以从 Select 读取值

def sel = select(a, b, c, d)
def result = sel.select()                                       //Random selection
def result = sel()                                              //Random selection (a short-hand variant)
def result = sel.select([true, true, false, true])              //Random selection with guards specified
def result = sel([true, true, false, true])                     //Random selection with guards specified (a short-hand variant)
def result = sel.prioritySelect()                               //Priority selection
def result = sel.prioritySelect([true, true, false, true])      //Priority selection with guards specifies

默认情况下,**Select** 将阻塞调用者,直到可读取的值可用。替代的 **selectToPromise()** 和 **prioritySelectToPromise()** 方法提供了一种获取未来将被选择的值的承诺的方法。通过返回的 Promise,你可以注册一个回调,以便在选择下一个值时异步调用它。

def sel = select(a, b, c, d)
Promise result = sel.selectToPromise()                                       //Random selection
Promise result = sel.selectToPromise([true, true, false, true])              //Random selection with guards specified
Promise result = sel.prioritySelectToPromise()                               //Priority selection
Promise result = sel.prioritySelectToPromise([true, true, false, true])      //Priority selection with guards specifies

或者,**Select** 允许将值发送到提供的 **MessageStream**(例如,一个演员)而不阻塞调用者。

def handler = actor {...}
def sel = select(a, b, c, d)

sel.select(handler) //Random selection sel(handler) //Random selection (a short-hand variant) sel.select(handler, [true, true, false, true]) //Random selection with guards specified sel(handler, [true, true, false, true]) //Random selection with guards specified (a short-hand variant) sel.prioritySelect(handler) //Priority selection sel.prioritySelect(handler, [true, true, false, true]) //Priority selection with guards specifies

保护

保护允许调用者从选择中省略一些输入通道。保护被指定为传递给 **select()** 或 **prioritySelect()** 方法的布尔标志列表。

def sel = select(leaders, seniors, experts, juniors)
def teamLead = sel([true, true, false, false]).value        //Only 'leaders' and 'seniors' qualify for becoming a teamLead here

保护的典型用途是使 Select 灵活地适应用户状态的变化。

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.select
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()

def t = task { final def select = select(operations, numbers) 3.times { def instruction = select([true, false]).value def num1 = select([false, true]).value def num2 = select([false, true]).value final def formula = "$num1 $instruction $num2" println "$formula = ${new GroovyShell().evaluate(formula)}" } }

task { operations << '+' operations << '+' operations << '*' }

task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }

t.join()

优先级选择

当某些通道在选择时应该优先于其他通道时,应该使用 prioritySelect 方法。

/**
 * Shows a basic use of Priority Select, which monitors a set of input channels for values and makes these values
 * available on its output irrespective of their original input channel.
 * Note that dataflow variables, queues and broadcasts can be combined for Select.
 * Unlike plain select method call, the prioritySelect call gives precedence to input channels with lower index.
 * Available messages from high priority channels will be served before messages from lower-priority channels.
 * Messages received through a single input channel will have their mutual order preserved.
 *
 */
def critical = new DataflowVariable()
def ordinary = new DataflowQueue()
def whoCares = new DataflowQueue()

task { ordinary << 'All working fine' whoCares << 'I feel a bit tired' ordinary << 'We are on target' }

task { ordinary << 'I have just started my work. Busy. Will come back later...' sleep 5000 ordinary << 'I am done for now' }

task { whoCares << 'Huh, what is that noise' ordinary << 'Here I am to do some clean-up work' whoCares << 'I wonder whether unplugging this cable will eliminate that nasty sound.' critical << 'The server room goes on UPS!' whoCares << 'The sound has disappeared' }

def select = select([critical, ordinary, whoCares]) println 'Starting to monitor our IT department' sleep 3000 10.times {println "Received: ${select.prioritySelect().value}"}

收集异步计算的结果

异步活动,无论它们是 **数据流任务**、**活动对象的方法**还是 **异步函数**,都返回 **Promises**。**Promises** 实现 **SelectableChannel** 接口,因此可以与其他 **Promises** 以及 **读取通道** 一起传递到 **selects** 中进行选择。与 Java 的 **CompletionService** 类似,GPars **Select** 使你能够在每个异步活动可用时获得其结果。此外,你可以使用 **Select** 来获取多个并行运行的计算的第一个/最快结果。

import groovyx.gpars.dataflow.Promise
import groovyx.gpars.dataflow.Select
import groovyx.gpars.group.DefaultPGroup
/**
 * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations.
 */

final group = new DefaultPGroup() group.with { Promise p1 = task { sleep(1000) 10 * 10 + 1 } Promise p2 = task { sleep(1000) 5 * 20 + 2 } Promise p3 = task { sleep(1000) 1 * 100 + 3 }

final alt = new Select(group, p1, p2, p3) def result = alt.select() println "Result: " + result }

超时

**Select.createTimeout()** 方法将创建一个 DataflowVariable,它将在给定时间段后绑定到一个值。这可以在 **Selects** 中使用,以便它们在经过所需的延迟后解除阻塞,如果其他通道在那一刻之前没有传递值。只需将 **超时通道** 作为另一个输入通道传递给 **Select**。

import groovyx.gpars.dataflow.Promise
import groovyx.gpars.dataflow.Select
import groovyx.gpars.group.DefaultPGroup
/**
 * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations.
 */

final group = new DefaultPGroup() group.with { Promise p1 = task { sleep(1000) 10 * 10 + 1 } Promise p2 = task { sleep(1000) 5 * 20 + 2 } Promise p3 = task { sleep(1000) 1 * 100 + 3 }

final timeoutChannel = Select.createTimeout(500)

final alt = new Select(group, p1, p2, p3, timeoutChannel) def result = alt.select() println "Result: " + result }

取消

如果你需要在计算出值或超时后取消其他任务,最好的方法是设置一个标志,让任务定期监控该标志。在 **DataflowVariables** 或 **Tasks** 中有意地没有构建取消机制。

import groovyx.gpars.dataflow.Promise
import groovyx.gpars.dataflow.Select
import groovyx.gpars.group.DefaultPGroup

import java.util.concurrent.atomic.AtomicBoolean

/** * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations. * It shows a waz to cancel the slower tasks once a result is known */

final group = new DefaultPGroup() final done = new AtomicBoolean()

group.with { Promise p1 = task { sleep(1000) if (done.get()) return 10 * 10 + 1 } Promise p2 = task { sleep(1000) if (done.get()) return 5 * 20 + 2 } Promise p3 = task { sleep(1000) if (done.get()) return 1 * 100 + 3 }

final alt = new Select(group, p1, p2, p3, Select.createTimeout(500)) def result = alt.select() done.set(true) println "Result: " + result }

7.3 操作符

数据流操作符和选择器提供了一个完整的 Dataflow 实现,其中包含所有常见的仪式。

概念

完整的 Dataflow 并发建立在连接操作符和选择器的通道概念的基础上,这些通道消耗来自输入通道的值,将其转换为新值,并将新值输出到其输出通道。**操作符** 等待 **所有** 输入通道都有可读取的值,然后才开始处理它们,而 **选择器** 则由 **任何** 输入通道上的可用值触发。

operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
    …
    bindOutput 0, x + y + z
}

/**
 * CACHE
 *
 * Caches sites' contents. Accepts requests for url content, outputs the content. Outputs requests for download
 * if the site is not in cache yet.
 */
operator(inputs: [urlRequests], outputs: [downloadRequests, sites]) {request ->

if (!request.content) { println "[Cache] Retrieving ${request.site}" def content = cache[request.site] if (content) { println "[Cache] Found in cache" bindOutput 1, [site: request.site, word:request.word, content: content] } else { def downloads = pendingDownloads[request.site] if (downloads != null) { println "[Cache] Awaiting download" downloads << request } else { pendingDownloads[request.site] = [] println "[Cache] Asking for download" bindOutput 0, request } } } else { println "[Cache] Caching ${request.site}" cache[request.site] = request.content bindOutput 1, request def downloads = pendingDownloads[request.site] if (downloads != null) { for (downloadRequest in downloads) { println "[Cache] Waking up" bindOutput 1, [site: downloadRequest.site, word:downloadRequest.word, content: request.content] } pendingDownloads.remove(request.site) } } }

如果从操作符体中抛出未捕获的异常,标准错误处理将向标准错误输出打印错误消息并终止操作符。要更改行为,你可以注册自己的事件监听器。

def listener = new DataflowEventAdapter() {
    @Override
    boolean onException(final DataflowProcessor processor, final Throwable e) {
        logChannel << e
        return false   //Indicate whether to terminate the operator or not
    }
}

op = group.operator(inputs: [a, b], outputs: [c], listeners: [listener]) {x, y -> … } See the Operator lifecycle section for more details.

操作符类型

有专门的版本的操作符用于特定目的。

  • operator - 基本的通用操作符
  • selector - 由其任何输入通道中可用值触发的操作符
  • prioritySelector - 选择器更喜欢从索引较低的输入通道传递消息,而不是从索引较高的输入通道传递消息
  • splitter - 一个将输入值复制到所有输出通道的单输入操作符

将操作符连接在一起

操作符通常组合成网络,其中一些操作符消耗其他操作符的输出。

operator(inputs:[a, b], outputs:[c, d]) {...}
splitter(c, [e, f])
selector(inputs:[e, d]: outputs:[]) {...}

你也可以通过操作符本身来引用输出通道。

def op1 = operator(inputs:[a, b], outputs:[c, d]) {...}
def sp1 = splitter(op1.outputs[0], [e, f])                            //takes the first output of op1
selector(inputs:[sp1.outputs[0], op1.outputs[1]]: outputs:[]) {...}   //takes the first output of sp1 and the second output of op1

对操作符进行分组

数据流操作符可以被组织成组,以便进行性能微调。组提供了一个方便的 **operator()** 工厂方法来创建附加到组的操作符。

import groovyx.gpars.group.DefaultPGroup

def group = new DefaultPGroup()

group.with { operator(inputs: [a, b, c], outputs: [d]) {x, y, z -> … bindOutput 0, x + y + z } }

数据流操作符的默认线程池包含守护线程,这意味着你的应用程序将在主线程完成时立即退出,不会等待所有任务完成。当对操作符进行分组时,确保你的自定义线程池也使用守护线程,这可以通过使用 DefaultPGroup 或者在线程池构造函数中提供自己的线程工厂来实现,或者,如果你的线程池使用非守护线程(例如,使用 NonDaemonPGroup 组类时),确保你通过调用其 shutdown() 方法显式地关闭组或线程池,否则你的应用程序将不会退出。

你可以使用 _Dataflow.usingGroup() 方法选择性地覆盖代码块中用于任务、操作符、回调和其他数据流元素的默认组。

Dataflow.usingGroup(group) {
    operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
        …
        bindOutput 0, x + y + z
    }
}

你可以始终通过指定来覆盖默认组。

Dataflow.usingGroup(group) {
    anotherGroup.operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
        …
        bindOutput 0, x + y + z
    }
}

构建操作符

操作符的构造属性,例如 **inputs**、**outputs**、**stateObject** 或 **maxForks**,一旦操作符构建完成,就不能再修改。当你在最终构建操作符之前将通道和值逐渐收集到列表中时,你可能会发现 **groovyx.gpars.dataflow.ProcessingNode** 类很有用。

import groovyx.gpars.dataflow.Dataflow
import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.ProcessingNode.node

/** * Shows how to build operators using the ProcessingNode class */

final DataflowQueue aValues = new DataflowQueue() final DataflowQueue bValues = new DataflowQueue() final DataflowQueue results = new DataflowQueue()

//Create a config and gradually set the required properties - channels, code, etc. def adderConfig = node {valueA, valueB -> bindOutput valueA + valueB } adderConfig.inputs << aValues adderConfig.inputs << bValues adderConfig.outputs << results

//Build the operator final adder = adderConfig.operator(Dataflow.DATA_FLOW_GROUP)

//Now the operator is running and processing the data aValues << 10 aValues << 20 bValues << 1 bValues << 2

assert [11, 22] == (1..2).collect { results.val }

操作符中的状态

虽然操作符通常可以不保留后续调用之间的状态,但 GPars 允许操作符根据开发者的意愿维护状态。一种明显的方法是利用 Groovy 闭包功能来关闭其上下文。

int counter = 0
operator(inputs: [a], outputs: [b]) {value ->
    counter += 1
}

另一种方法是,它允许你避免在操作符定义之外声明状态对象,即在构造时将状态对象作为 **stateObject** 参数传递给操作符。

operator(inputs: [a], outputs: [b], stateObject: [counter: 0]) {value ->
    stateObject.counter += 1
}

并行化操作符

默认情况下,操作符的执行体一次由一个线程处理。虽然这是一个安全的设置,允许操作符的执行体以非线程安全的方式编写,但一旦操作符变得“热门”,并且数据开始在操作符的输入队列中累积,你可能需要考虑允许多个线程同时运行操作符的执行体。请记住,在这种情况下,你需要避免或保护共享资源免受多线程访问。要允许多个线程同时运行操作符的执行体,请在创建操作符时传递一个额外的 **maxForks** 参数。

def op = operator(inputs: [a, b, c], outputs: [d, e], maxForks: 2) {x, y, z ->
    bindOutput 0, x + y + z
    bindOutput 1, x * y * z
}

参数 maxForks 的值指示并发运行运算符的线程的最大数量。仅允许正数,默认值为 1。

请始终确保为运算符提供服务的 group 包含足够的线程来支持所有请求的分叉。使用组允许您围绕不同的线程池(包装在组内)组织任务或运算符。虽然 Dataflow.task() 命令在默认线程池(java.util.concurrent.Executor,固定大小 = #cpu+1,守护线程)上调度任务,但您可能更愿意能够定义自己的线程池来运行您的任务。

def group = new DefaultPGroup(10)
group.operator((inputs: [a, b, c], outputs: [d, e], maxForks: 5) {x, y, z -> ...}

默认组使用可调整大小的线程池,因此永远不会耗尽线程。

同步输出

在通过将 maxForks 的值设置为大于 1 来启用运算符的内部并行化时,重要的是要记住,如果没有运算符主体中的显式或隐式同步,可能会发生竞争条件。特别是请记住,写入多个输出通道的值不能保证以相同的顺序原子地写入所有通道

operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    bindOutput 0, msg
    bindOutput 1, msg
}
inputChannel << 1
inputChannel << 2
inputChannel << 3
inputChannel << 4
inputChannel << 5
可能导致输出通道的值混合,例如
a -> 1, 3, 2, 4, 5
b -> 2, 1, 3, 5, 4

显式同步是绑定所有输出通道并保护运算符非线程本地状态的一种方法

def lock = new Object()
operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    doStuffThatIsThreadSafe()

synchronized(lock) { doSomethingThatMustNotBeAccessedByMultipleThreadsAtTheSameTime() bindOutput 0, msg bindOutput 1, 2*msg } }

显然,您需要在这里权衡利弊,因为同步可能会违背将 maxForks 设置为大于 1 的目的。

要以原子步骤设置所有运算符输出通道的值,您也可以考虑调用 bindAllOutputsAtomically 方法,传入一个值以写入所有输出通道,或者调用 bindAllOutputsAtomically 方法,该方法接受多个值,每个值都将写入具有相同位置索引的输出通道。

operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    doStuffThatIsThreadSafe()
        bindAllOutputValuesAtomically msg, 2*msg
    }
}

使用 bindAllOutputsbindAllOutputValues 方法在使用内部并行化时不会保证跨所有输出通道写入的原子性。如果保留多个输出通道中消息的顺序不是问题,bindAllOutputs 以及 bindAllOutputValues 将比原子变体提供更好的性能。

运算符生命周期

Dataflow 运算符和选择器在其生命周期中触发多个事件,这使感兴趣方能够获得通知并可能改变运算符的行为。 DataflowEventListener 接口提供了一些回调方法

public interface DataflowEventListener {
    /**
     * Invoked immediately after the operator starts by a pooled thread before the first message is obtained
     *
     * @param processor The reporting dataflow operator/selector
     */
    void afterStart(DataflowProcessor processor);

/** * Invoked immediately after the operator terminates * * @param processor The reporting dataflow operator/selector */ void afterStop(DataflowProcessor processor);

/** * Invoked if an exception occurs. * If any of the listeners returns true, the operator will terminate. * Exceptions outside of the operator's body or listeners' messageSentOut() handlers will terminate the operator irrespective of the listeners' votes. * * @param processor The reporting dataflow operator/selector * @param e The thrown exception * @return True, if the operator should terminate in response to the exception, false otherwise. */ boolean onException(DataflowProcessor processor, Throwable e);

/** * Invoked when a message becomes available in an input channel. * * @param processor The reporting dataflow operator/selector * @param channel The input channel holding the message * @param index The index of the input channel within the operator * @param message The incoming message * @return The original message or a message that should be used instead */ Object messageArrived(DataflowProcessor processor, DataflowReadChannel<Object> channel, int index, Object message);

/** * Invoked when a control message (instances of ControlMessage) becomes available in an input channel. * * @param processor The reporting dataflow operator/selector * @param channel The input channel holding the message * @param index The index of the input channel within the operator * @param message The incoming message * @return The original message or a message that should be used instead */ Object controlMessageArrived(DataflowProcessor processor, DataflowReadChannel<Object> channel, int index, Object message);

/** * Invoked when a message is being bound to an output channel. * * @param processor The reporting dataflow operator/selector * @param channel The output channel to send the message to * @param index The index of the output channel within the operator * @param message The message to send * @return The original message or a message that should be used instead */ Object messageSentOut(DataflowProcessor processor, DataflowWriteChannel<Object> channel, int index, Object message);

/** * Invoked when all messages required to trigger the operator become available in the input channels. * * @param processor The reporting dataflow operator/selector * @param messages The incoming messages * @return The original list of messages or a modified/new list of messages that should be used instead */ List<Object> beforeRun(DataflowProcessor processor, List<Object> messages);

/** * Invoked when the operator completes a single run * * @param processor The reporting dataflow operator/selector * @param messages The incoming messages that have been processed */ void afterRun(DataflowProcessor processor, List<Object> messages);

/** * Invoked when the fireCustomEvent() method is triggered manually on a dataflow operator/selector * * @param processor The reporting dataflow operator/selector * @param data The custom piece of data provided as part of the event * @return A value to return from the fireCustomEvent() method to the caller (event initiator) */ Object customEvent(DataflowProcessor processor, Object data); }

通过 DataflowEventAdapter 类提供了一个默认实现。

监听器提供了一种处理异常的方法,这些异常发生在运算符内部。监听器通常可以记录此类异常、通知监督实体、生成备用输出或执行恢复情况所需的任何步骤。如果没有注册监听器,或者如果任何监听器返回 true,运算符将终止,保留 afterStop() 的契约。在实际运算符主体之外发生的异常,即在主体触发之前的参数准备阶段或在主体完成之后的清理和通道订阅阶段,始终导致运算符终止。

运算符和选择器上可用的 fireCustomEvent() 方法可用于在运算符主体和感兴趣的监听器之间来回通信

final listener = new DataflowEventAdapter() {
    @Override
    Object customEvent(DataflowProcessor processor, Object data) {
        println "Log: Getting quite high on the scale $data"
        return 100  //The value to use instead
    }
}

op = group.operator(inputs: [a, b], outputs: [c], listeners: [listener]) {x, y -> final sum = x + y if (sum > 100) bindOutput(fireCustomEvent(sum)) //Reporting that the sum is too high, binding the lowered value that comes back else bindOutput sum }

选择器

选择器的主体应该是消耗一个或两个参数的闭包。

selector (inputs : [a, b, c], outputs : [d, e]) {value ->
    ....
}

两个参数的闭包将获得一个值加上当前正在处理的值的输入通道的索引。这允许选择器区分通过不同输入通道的值。

selector (inputs : [a, b, c], outputs : [d, e]) {value, index ->
    ....
}

优先级选择器

当需要在输入通道之间保留优先级时,应使用 DataflowPrioritySelector

prioritySelector(inputs : [a, b, c], outputs : [d, e]) {value, index ->
    …
}

优先级选择器将始终优先考虑来自位置索引较低的通道的值,而不是来自位置索引较高的通道的值。

连接选择器

没有指定主体闭包的选择器将复制所有传入的值到其所有输出通道。

def join = selector (inputs : [programmers, analysis, managers], outputs : [employees, colleagues])

内部并行化

允许内部选择器并行化的 maxForks 属性也可用。

selector (inputs : [a, b, c], outputs : [d, e], maxForks : 5) {value ->
    ....
}

保护

Selects 一样,Selectors 也允许用户临时将单个输入通道包括在选择中或从选择中排除。 guards 输入属性可用于设置所有输入通道的初始掩码,然后选择器的主体中可以使用 setGuardssetGuard 方法。

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.selector
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()

def instruction def nums = []

selector(inputs: [operations, numbers], outputs: [], guards: [true, false]) {value, index -> //initial guards is set here if (index == 0) { instruction = value setGuard(0, false) //setGuard() used here setGuard(1, true) } else nums << value if (nums.size() == 2) { setGuards([true, false]) //setGuards() used here final def formula = "${nums[0]} $instruction ${nums[1]}" println "$formula = ${new GroovyShell().evaluate(formula)}" nums.clear() } }

task { operations << '+' operations << '+' operations << '*' }

task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }

避免将 guards 和大于 1 的 maxForks 结合使用。虽然 Selector 是线程安全的,不会以任何方式损坏,但防护措施可能不会按照您的预期设置。并发运行选择器主体的多个线程将倾向于覆盖彼此对 guards 属性的设置。

7.4 关闭 Dataflow 网络

关闭数据流处理器(运算符和选择器)网络有时可能是一项非平凡的任务,尤其是如果您需要一种通用的机制,该机制不会留下任何未处理的消息。

Dataflow 运算符和选择器可以通过三种方式终止

  1. 通过调用需要终止的所有运算符上的 terminate() 方法
  2. 通过发送泊松消息
  3. 通过设置活动监视器网络,该网络将在所有消息处理完毕后关闭网络

查看 GPars 提供的方式的详细信息。

关闭线程池

如果您使用自定义 PGroup 来维护数据流网络的线程池,则不应忘记在网络终止后关闭池。否则,线程池将消耗系统资源,并且,如果使用非守护线程,它将阻止 JVM 退出。

紧急关闭

您可以对任何运算符/选择器调用 terminate() 以立即将其关闭。假设您跟踪所有处理器,也许通过将它们添加到列表中,停止网络的最快方法是

allMyProcessors*.terminate()

但是,这应该被视为紧急退出,因为无法保证处理的消息或完成的工作。运算符将简单地立即终止,留下未完成的工作并放弃输入通道中的消息。当然,挂钩到运算符/选择器的生命周期事件监听器将调用其 afterStop() 事件处理程序,例如释放资源或向日志输出注释。

def op1 = operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }

def op2 = selector(inputs: [d], outputs: [f, out]) { }

def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }

[op1, op2, op3]*.terminate() //Terminate all operators by calling the terminate() method on them op1.join() op2.join() op3.join()

通过 System.exit() 关闭整个 JVM 也将明显关闭数据流网络,但是,在这种情况下不会调用任何生命周期监听器。

温和地停止运算符

运算符反复处理传入的消息。在不冒丢失任何消息的风险的情况下停止运算符的唯一安全时刻是运算符在完成消息处理后即将在其传入管道中查找更多消息。这正是 terminateAfterNextRun() 方法所做的。它将在处理下一组消息后安排运算符关闭。

未处理的消息将保留在输入通道中,这使您能够稍后处理它们,也许使用不同的运算符/选择器或以其他方式处理。使用 terminateAfterNextRun(),您将不会丢失任何输入消息。当您使用一组运算符/选择器来平衡来自通道的消息时,这可能特别方便。一旦工作负载减少,terminateAfterNextRun() 方法可用于安全地减少负载平衡运算符池。

检测关闭

对于需要阻塞到运算符终止的人,运算符和选举人提供了一个方便的 join() 方法。

allMyProcessors*.join()
这是等待整个数据流网络关闭的最简单方法,无论使用哪种关闭方法。

毒药

PoisonPill 是使用专用消息停止接收它的实体的策略的常用术语。GPars 提供了 PoisonPill 类,它对运算符和选择器具有完全相同的效应。由于 PoisonPill 是一个 ControlMessage,因此对运算符主体不可见,自定义代码不需要以任何方式处理它。 DataflowEventListeners 可以通过 controlMessageArrived() 处理程序方法对 ControlMessages 做出反应。

def op1 = operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }

def op2 = selector(inputs: [d], outputs: [f, out]) { }

def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }

a << PoisonPill.instance //Send the poisson

op1.join() op2.join() op3.join()

收到泊松后,运算符将在完成当前计算并确保泊松被发送到其所有输出通道后立即终止,以便泊松可以传播到连接的运算符。此外,虽然运算符通常会等待所有输入都具有值,但在 PoisonPills 的情况下,只要其任何输入上出现 PoisonPill,运算符就会立即终止。从其他通道获得的值将丢失。如果应该处理这些消息,则可以将其视为网络设计中的错误。它们需要一个适当的值作为其对等体,而不是一个 PoisonPill,以便正常处理。

另一方面,选择器将耐心等待从所有输入通道接收 PoisonPill,然后再将其发送到输出通道。这种行为防止包含 涉及选择器的反馈循环 的网络使用 PoisonPill 关闭。选择器永远不会从来自选择器后面的通道接收 PoisonPill。应为这种网络使用不同的关闭策略。

鉴于运算符网络的潜在多样性和其异步性质,一个好的终止策略是运算符和选择器应始终仅终止自身。从外部终止它们的所有方式(无论是通过调用 terminate() 方法还是通过将泊松发送到流下游)都可能导致消息在管道中的某个地方丢失,当读取运算符在完全处理等待其输入通道中的消息之前终止时。

立即毒药

特别是对于在收到毒药后立即关闭的选择器,已引入 立即毒药 的概念。由于正常的非立即毒药只是关闭输入通道,让选择器保持活动状态,直到至少一个输入通道保持打开状态,因此立即毒药会立即关闭选择器。显然,一旦选择器读取了立即毒药,来自其他选择器输入通道的未处理消息将不会由选择器处理。

使用立即毒药,您可以安全地关闭涉及反馈循环的选择器的网络。

def op1 = selector(inputs: [a, b, c], outputs: [d, e]) {value, index -> }
def op2 = selector(inputs: [d], outputs: [f, out]) { }
def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }

a << PoisonPill.immediateInstance

[op1, op2, op3]*.join()

带计数的毒药

当将毒药发送到运算符网络时,您可能需要在所有运算符或指定数量的运算符停止时收到通知。 CountingPoisonPill 类正是为此目的而服务的

operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }
selector(inputs: [d], outputs: [f, out]) { }
prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }

//Send the poisson indicating the number of operators than need to be terminated before we can continue final pill = new CountingPoisonPill(3) a << pill

//Wait for all operators to terminate pill.join() //At least 3 operators should be terminated by now

CountingPoisonPill 类的 termination 属性是一个普通的 Promise<Boolean>,因此具有许多方便的属性。

//Send the poisson indicating the number of operators than need to be terminated before we can continue
final pill = new CountingPoisonPill(3)
pill.termination.whenBound {println "Reporting asynchronously that the network has been stopped"}
a << pill

if (pill.termination.bound) println "Wow, that was quick. We are done already!" else println "Things are being slow today. The network is still running."

//Wait for all operators to terminate assert pill.termination.get() //At least 3 operators should be terminated by now

还提供了一个 CountingPoisonPill 的立即变体 - ImmediateCountingPoisonPill
def op1 = selector(inputs: [a, b, c], outputs: [d, e]) {value, index -> }
def op2 = selector(inputs: [d], outputs: [f, out]) { }
def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }

final pill = new ImmediateCountingPoisonPill(3) a << pill pill.join()

ImmediateCountingPoisonPill 将安全而快速地关闭数据流网络,即使选择器涉及反馈循环,而正常的非立即毒药则无法做到。

毒药策略

为了正确地使用 PoisonPill 关闭网络,您必须确定发送 PoisonPill 的适当通道集。 PoisonPill 将以通常的方式通过通道和处理器在网络中传播。 通常,发送 PoisonPill 的正确通道是充当网络 数据源 的通道。 对于一般情况或复杂网络来说,这可能难以实现。 另一方面,对于消息流方向明显且普遍的网络,PoisonPill 提供了一种非常直接的方式来优雅地关闭整个网络。

使用多个操作符从共享通道(队列)读取消息的负载均衡架构也会阻止 Poison 关闭正常工作,因为只有一个读取操作符可以读取 Poison 消息。 您可以考虑使用 派生操作符,方法是将 maxForks 属性设置为大于 1 的值。 另一种方法是手动将消息流拆分为多个通道,每个通道都由一个原始操作符使用。

终止提示和技巧

请注意,GPars 任务 返回一个 DataflowVariable,该变量一旦任务完成就会绑定到一个值。 下面的“终止器”操作符利用了 DataflowVariablesDataflowReadChannel 接口实现的事实,因此可以被操作符使用。 一旦两个任务都完成,操作符就会在 q 通道上发送一个 PoisonPill 来停止消费者,只要它处理完所有数据。

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.group.NonDaemonPGroup

def group = new NonDaemonPGroup()

final DataflowQueue q = new DataflowQueue()

// final destination def customs = group.operator(inputs: [q], outputs: []) { value -> println "Customs received $value" }

// big producer def green = group.task { (1..100).each { q << 'green channel ' + it sleep 10 } }

// little producer def red = group.task { (1..10).each { q << 'red channel ' + it sleep 15 } }

def terminator = group.operator(inputs: [green, red], outputs: []) { t1, t2 -> q << PoisonPill.instance }

customs.join() group.shutdown()

将 PoisonPill 保留在给定网络中

如果您的网络通过通道将值传递给其外部的实体,您可能需要在网络边界停止 PoisonPill 消息。 这可以通过在每个这样的通道上放置一个单输入单输出过滤操作符来轻松实现。

operator(networkLeavingChannel, otherNetworkEnteringChannel) {value ->
    if (!(value instanceOf PoisonPill)) bindOutput it
}

Pipeline DSL 也可能在这里有所帮助

networkLeavingChannel.filter { !(it instanceOf PoisonPill) } into otherNetworkEnteringChannel

查看 Pipeline DSL 部分以了解有关管道的更多信息。

优雅关闭

GPars 提供了一种通用的方法来关闭数据流网络。 与前面提到的机制不同,这种方法将使网络继续运行,直到所有消息都得到处理,然后优雅地关闭所有操作符,并让您知道何时发生这种情况。 但是,您需要付出适度的性能损失。 这是不可避免的,因为我们需要跟踪网络内部发生的事情。

import groovyx.gpars.dataflow.DataflowBroadcast
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.operator.component.GracefulShutdownListener
import groovyx.gpars.dataflow.operator.component.GracefulShutdownMonitor
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.group.PGroup

PGroup group = new DefaultPGroup(10) final a = new DataflowQueue() final b = new DataflowQueue() final c = new DataflowQueue() final d = new DataflowQueue<Object>() final e = new DataflowBroadcast<Object>() final f = new DataflowQueue<Object>() final result = new DataflowQueue<Object>()

final monitor = new GracefulShutdownMonitor(100);

def op1 = group.operator(inputs: [a, b], outputs: [c], listeners: [new GracefulShutdownListener(monitor)]) {x, y -> sleep 5 bindOutput x + y } def op2 = group.operator(inputs: [c], outputs: [d, e], listeners: [new GracefulShutdownListener(monitor)]) {x -> sleep 10 bindAllOutputs 2*x } def op3 = group.operator(inputs: [d], outputs: [f], listeners: [new GracefulShutdownListener(monitor)]) {x -> sleep 5 bindOutput x + 40 } def op4 = group.operator(inputs: [e.createReadChannel(), f], outputs: [result], listeners: [new GracefulShutdownListener(monitor)]) {x, y -> sleep 5 bindOutput x + y }

100.times{a << 10} 100.times{b << 20}

final shutdownPromise = monitor.shutdownNetwork()

100.times{assert 160 == result.val}

shutdownPromise.get() [op1, op2, op3, op4]*.join()

group.shutdown()

首先,我们需要一个 GracefulShutdownMonitor 实例,它将协调关闭过程。 它依赖于附加到所有操作符/选择器的 GracefulShutdownListener 实例。 这些侦听器观察它们各自的处理器及其输入通道,并向共享的 GracefulShutdownMonitor 报告。 一旦在 GracefulShutdownMonitor 上调用 shutdownNetwork(),它将定期检查报告的活动,查询操作符的状态以及其输入通道中的消息数量。

请确保在启动关闭后,没有新的消息进入数据流网络,因为这可能会导致网络永远无法终止。 只有在所有数据生产者停止向监控网络发送更多消息后,才应该启动关闭过程。

shutdownNetwork() 方法返回一个 Promise,以便您可以使用它来执行通常的技巧 - 使用 get() 方法阻塞等待网络终止,使用 whenBound() 方法注册回调,或者通过 then() 方法使其触发一系列活动。

优雅关闭的限制
  1. 为了使 GracefulShutdownListener 正确工作,其 messageArrived() 事件处理程序必须看到通过输入通道到达的原始值。 由于某些事件侦听器可能会在消息通过侦听器时更改消息,因此建议先将 GracefulShutdownListener 添加到每个数据流处理器的侦听器列表中。
  2. 此外,对于那些罕见的具有侦听器的操作符,优雅关闭将不起作用,这些侦听器会将控制消息转换为 controlMessageArrived() 事件处理程序中的普通值消息。
  3. 第三也是最后,使用多个操作符从共享通道(队列)读取消息的负载均衡架构也会阻止优雅关闭正常工作。 您可以考虑使用 派生操作符,方法是将 maxForks 属性设置为大于 1 的值。 另一种方法是手动将消息流拆分为多个通道,每个通道都由一个原始操作符使用。

7.5 应用框架

数据流操作符和选择器可以成功地用于构建针对自然适合流模型的问题的高级领域特定框架。

在 GPars 数据流之上构建流框架

GPars 数据流可以被视为底层的语言级基础设施。 操作符、选择器、通道和事件侦听器在语言级别非常有用,例如,可以与参与者或并行集合结合使用。 每当需要异步处理来自一个或多个通道的事件时,数据流操作符或小型数据流网络都是非常合适的。 与任务不同,操作符是轻量级的,并在没有消息要处理时释放线程。 与参与者不同,操作符通过通道间接寻址,可以轻松地将来自多个通道的消息组合成一个操作。

或者,操作符可以被视为连续函数,这些函数立即并重复地将其输入值转换为输出。 我们认为,一个支持并发性的通用编程语言应该提供这种类型的抽象。

同时,数据流元素可以轻松地用作构建块来构建领域特定工作流式框架。 这些框架可以提供更高级别的抽象,专门针对单个问题域,这对于通用语言级库来说是不合适的。 然后,每个更高级别的概念都映射到(可能多个)GPars 概念。

例如,解决数据挖掘问题的网络可能包含多个数据源、数据清理节点、分类节点、报告节点等。 另一方面,图像处理网络可能需要专门用于图像压缩和格式转换的节点。 同样,用于数据加密、mp3 编码、工作流管理以及许多其他将受益于基于数据流的解决方案的领域,将会有许多方面的差异 - 网络中的节点类型、事件的类型和频率、负载均衡方案、潜在的限制分支、对可视化、调试和日志记录的需求、用户定义网络和与之交互的方式以及许多其他方面。

更高级别的特定于应用程序的框架应该努力提供最适合给定领域的抽象,并隐藏 GPars 的复杂性。 例如,用户在屏幕上操作的网络的可视化图形通常不应显示参与网络的所有通道。 调试或日志记录通道很少对解决方案的核心做出贡献,是第一个考虑排除的合适候选者。 同样,用于协调负载平衡或优雅关闭等方面的通道和生命周期事件侦听器可能不会暴露给用户,尽管它们将是生成的和执行的网络的一部分。 同样,领域特定模型中的单个通道实际上将转换为多个通道,这些通道可能带有一个或多个连接它们的日志记录/转换/过滤操作符。 与节点关联的函数很可能将被包装在一些额外的基础结构代码中以形成操作符的主体。

GPars 为您提供了最终用户可能被特定于应用程序的框架完全抽象化的底层组件。 这使得 GPars 与领域无关且通用,但在实现级别仍然有用。

7.6 Pipeline DSL

用于构建操作符管道的 DSL

构建数据流网络可以进一步简化。 GPars 为构建(主要是线性)操作符管道提供了方便的快捷方式。

def toUpperCase = {s -> s.toUpperCase()}

final encrypt = new DataflowQueue() final DataflowReadChannel encrypted = encrypt | toUpperCase | {it.reverse()} | {'###encrypted###' + it + '###'}

encrypt << "I need to keep this message secret!" encrypt << "GPars can build linear operator pipelines really easily"

println encrypted.val println encrypted.val

这可以节省您直接创建、连接和操作所有将形成管道的通道和操作符。 pipe 操作符允许您将一个函数/操作符/进程的输出连接到另一个函数/操作符/进程的输入。 就像在命令行上链接系统进程一样。

pipe 操作符是更通用的 chainWith() 方法的便捷简写

def toUpperCase = {s -> s.toUpperCase()}

final encrypt = new DataflowQueue() final DataflowReadChannel encrypted = encrypt.chainWith toUpperCase chainWith {it.reverse()} chainWith {'###encrypted###' + it + '###'}

encrypt << "I need to keep this message secret!" encrypt << "GPars can build linear operator pipelines really easily"

println encrypted.val println encrypted.val

将管道与直线操作符结合使用

由于每个操作符管道都具有入口通道和出口通道,因此可以将管道连接到更复杂的操作符网络中。 只有您的想象力才能限制您在同一个网络定义中混合管道、通道和操作符的能力。

def toUpperCase = {s -> s.toUpperCase()}
def save = {text ->
    //Just pretending to be saving the text to disk, database or whatever
    println 'Saving ' + text
}

final toEncrypt = new DataflowQueue() final DataflowReadChannel encrypted = toEncrypt.chainWith toUpperCase chainWith {it.reverse()} chainWith {'###encrypted###' + it + '###'}

final DataflowQueue fork1 = new DataflowQueue() final DataflowQueue fork2 = new DataflowQueue() splitter(encrypted, [fork1, fork2]) //Split the data flow

fork1.chainWith save //Hook in the save operation

//Hook in a sneaky decryption pipeline final DataflowReadChannel decrypted = fork2.chainWith {it[15..-4]} chainWith {it.reverse()} chainWith {it.toLowerCase()} .chainWith {'Groovy leaks! Check out a decrypted secret message: ' + it}

toEncrypt << "I need to keep this message secret!" toEncrypt << "GPars can build operator pipelines really easy"

println decrypted.val println decrypted.val

通道的类型在整个管道中保持不变。 例如,如果您从同步通道开始链接,则管道中的所有通道都将是同步的。 在这种情况下,显然,整个链都会阻塞,包括写入头部的通道的写入者,直到有人从管道尾部读取数据。
final SyncDataflowQueue queue = new SyncDataflowQueue()
final result = queue.chainWith {it * 2}.chainWith {it + 1} chainWith {it * 100}

Thread.start { 5.times { println result.val } }

queue << 1 queue << 2 queue << 3 queue << 4 queue << 5

加入管道

可以使用 into() 方法连接两个管道(或通道)

final encrypt = new DataflowQueue()
final DataflowWriteChannel messagesToSave = new DataflowQueue()
encrypt.chainWith toUpperCase chainWith {it.reverse()} into messagesToSave

task { encrypt << "I need to keep this message secret!" encrypt << "GPars can build operator pipelines really easy" }

task { 2.times { println "Saving " + messagesToSave.val } }

encryption 管道的输出直接连接到 saving 管道的输入(在本例中为单个通道)。

分叉数据流

当需要将管道/通道的输出复制到多个后续管道/通道时, split() 方法将有助于您

final encrypt = new DataflowQueue()
final DataflowWriteChannel messagesToSave = new DataflowQueue()
final DataflowWriteChannel messagesToLog = new DataflowQueue()

encrypt.chainWith toUpperCase chainWith {it.reverse()}.split(messagesToSave, messagesToLog)

插入管道

split() 类似, tap() 方法允许您将数据流分叉到多个通道。 但是,在某些情况下,插入会更方便一些,因为它将两个新分支中的一个视为管道的后继者。

queue.chainWith {it * 2}.tap(logChannel).chainWith{it + 1}.tap(logChannel).into(PrintChannel)

合并通道

合并允许您将多个读取通道作为单个数据流操作符的输入连接起来。 作为第二个参数传递的函数需要接受与正在合并的通道数量一样多的参数 - 每个参数都将保存相应通道的值。

maleChannel.merge(femaleChannel) {m, f -> m.marry(f)}.into(mortgageCandidatesChannel)

分离

分离合并 相反。 提供的闭包返回一个值列表,每个值都将输出到具有相应位置索引的输出通道中。

queue1.separate([queue2, queue3, queue4]) {a -> [a-1, a, a+1]}

选择

binaryChoice()choice() 方法允许您根据闭包的返回值,将值发送到两个(或多个)输出通道中的一个。

queue1.binaryChoice(queue2, queue3) {a -> a > 0}
queue1.choice([queue2, queue3, queue4]) {a -> a % 3}

过滤

filter() 方法允许使用布尔谓词过滤管道中的数据。

final DataflowQueue queue1 = new DataflowQueue()
        final DataflowQueue queue2 = new DataflowQueue()

final odd = {num -> num % 2 != 0 }

queue1.filter(odd) into queue2 (1..5).each {queue1 << it} assert 1 == queue2.val assert 3 == queue2.val assert 5 == queue2.val

空值

如果链接的函数返回一个 null 值,它通常会作为有效值沿着管道传递。 为了指示操作符不应将任何值进一步传递到管道中,必须返回一个 NullObject.nullObject 实例。

final DataflowQueue queue1 = new DataflowQueue()
        final DataflowQueue queue2 = new DataflowQueue()

final odd = {num -> if (num == 5) return null //null values are normally passed on if (num % 2 != 0) return num else return NullObject.nullObject //this value gets blocked }

queue1.chainWith odd into queue2 (1..5).each {queue1 << it} assert 1 == queue2.val assert 3 == queue2.val assert null == queue2.val

自定义线程池

所有 Pipeline DSL 方法都允许指定自定义线程池或 PGroups

channel | {it * 2}

channel.chainWith(closure) channel.chainWith(pool) {it * 2} channel.chainWith(group) {it * 2}

channel.into(otherChannel) channel.into(pool, otherChannel) channel.into(group, otherChannel)

channel.split(otherChannel1, otherChannel2) channel.split(otherChannels) channel.split(pool, otherChannel1, otherChannel2) channel.split(pool, otherChannels) channel.split(group, otherChannel1, otherChannel2) channel.split(group, otherChannels)

channel.tap(otherChannel) channel.tap(pool, otherChannel) channel.tap(group, otherChannel)

channel.merge(otherChannel) channel.merge(otherChannels) channel.merge(pool, otherChannel) channel.merge(pool, otherChannels) channel.merge(group, otherChannel) channel.merge(group, otherChannels)

channel.filter( otherChannel) channel.filter(pool, otherChannel) channel.filter(group, otherChannel)

channel.binaryChoice( trueBranch, falseBranch) channel.binaryChoice(pool, trueBranch, falseBranch) channel.binaryChoice(group, trueBranch, falseBranch)

channel.choice( branches) channel.choice(pool, branches) channel.choice(group, branches)

channel.separate( outputs) channel.separate(pool, outputs) channel.separate(group, outputs)

覆盖默认 PGroup

为了避免有必要为每个 Pipeline DSL 方法单独指定 PGroup,您可以覆盖默认 Dataflow PGroup 的值。

Dataflow.usingGroup(group) {
    channel.choice(branches)
}
//Is identical to
channel.choice(group, branches)

Dataflow.usingGroup() 方法将给定代码块的默认数据流 PGroup 的值重置为指定的值。

管道构建器

Pipeline 类提供了一个直观的操作符管道构建器。 与直接链接通道相比,使用 Pipeline 类的最大好处是可以轻松地将自定义线程池/组应用于构造的链中的所有操作符。 可用的方法和重载运算符与直接在通道上可用的方法和运算符相同。

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.operator.Pipeline
import groovyx.gpars.scheduler.DefaultPool
import groovyx.gpars.scheduler.Pool

final DataflowQueue queue = new DataflowQueue() final DataflowQueue result1 = new DataflowQueue() final DataflowQueue result2 = new DataflowQueue() final Pool pool = new DefaultPool(false, 2)

final negate = {-it}

final Pipeline pipeline = new Pipeline(pool, queue)

pipeline | {it * 2} | {it + 1} | negate pipeline.split(result1, result2)

queue << 1 queue << 2 queue << 3

assert -3 == result1.val assert -5 == result1.val assert -7 == result1.val

assert -3 == result2.val assert -5 == result2.val assert -7 == result2.val

pool.shutdown()

通过 Pipeline DSL 传递构造参数

您可能经常需要将额外的初始化参数传递给操作符,例如要附加的侦听器或maxForks的值。就像直接构建操作符一样,Pipeline DSL 方法接受一个可选的参数映射以传递。

new Pipeline(group, queue1).merge([maxForks: 4, listeners: [listener]], queue2) {a, b -> a + b}.into queue3

7.7 实现

GPars 中的数据流并发基于与 actor 支持相同的原则。所有数据流任务共享一个线程池,因此通过Dataflow.task()工厂方法创建的线程数量不需要与系统所需的物理线程数量相对应。PGroup.task()工厂方法可用于将创建的任务附加到组。由于每个组都定义了自己的线程池,因此您可以像处理 actor 一样轻松地将任务组织在不同的线程池周围。

组合 actor 和数据流并发

好消息是,您可以根据您的具体问题以任何您认为合适的方式组合 actor 和数据流并发。您可以自由地从 actor 中使用数据流变量。

final DataflowVariable a = new DataflowVariable()

final Actor doubler = Actors.actor { react {message-> a << 2 * message } }

final Actor fakingDoubler = actor { react { doubler.send it //send a number to the doubler println "Result ${a.val}" //wait for the result to be bound to 'a' } }

fakingDoubler << 10

在示例中,您可以看到“fakingDoubler”使用消息和DataflowVariabledoubler actor 通信。

使用普通 Java 线程

DataflowVariable以及DataflowQueue类显然可以从应用程序的任何线程中使用,而不仅仅是从Dataflow.task()创建的任务中使用。考虑以下示例

import groovyx.gpars.dataflow.DataflowVariable

final DataflowVariable a = new DataflowVariable<String>() final DataflowVariable b = new DataflowVariable<String>()

Thread.start { println "Received: $a.val" Thread.sleep 2000 b << 'Thank you' }

Thread.start { Thread.sleep 2000 a << 'An important message from the second thread' println "Reply: $b.val" }

我们正在创建两个普通的java.lang.Thread实例,它们使用两个数据流变量交换数据。显然,actor 生命周期的方法、发送/反应功能或线程池在这种情况下不起作用。

7.8 同步变量和通道

在使用异步数据流通道时,除了读取器必须等待值可用以供使用之外,通信方保持完全独立。写入者不等待他们的消息被使用。读取器在它们到来并询问时立即获取值。另一方面,同步通道可以将写入者与读取器以及多个读取器之间进行同步。当您需要提高确定性水平时,这特别有用。异步通信强加的写入者到读取者的部分排序通过使用同步通信来补充读取者到写入者的部分排序。换句话说,您可以保证读取器在从同步通道读取值之前所做的任何操作都在写入者在写入该值之后所做的任何操作之前。此外,使用同步通信的写入者永远不会领先于读取者太远,这简化了对系统的推理,并减少了管理数据生产速度以避免系统过载的需要。

同步数据流队列

SyncDataflowQueue类应该用于点对点(1:1 或 n:1)通信。写入队列的每条消息将被正好一个读取器使用。写入者被阻塞,直到他们的消息被使用,读取者被阻塞,直到有值可供他们读取。

import groovyx.gpars.dataflow.SyncDataflowQueue
import groovyx.gpars.group.NonDaemonPGroup

/** * Shows how synchronous dataflow queues can be used to throttle fast producer when serving data to a slow consumer. * Unlike when using asynchronous channels, synchronous channels block both the writer and the readers until all parties are ready to exchange messages. */

def group = new NonDaemonPGroup()

final SyncDataflowQueue channel = new SyncDataflowQueue()

def producer = group.task { (1..30).each { channel << it println "Just sent $it" } channel << -1 }

def consumer = group.task { while (true) { sleep 500 //simulating a slow consumer final Object msg = channel.val if (msg == -1) return println "Received $msg" } }

consumer.join()

group.shutdown()

同步数据流广播

SyncDataflowBroadcast类应该用于发布-订阅(1:n 或 n:m)通信。写入广播的每条消息将被所有订阅的读取器使用。写入者被阻塞,直到他们的消息被所有读取器使用,读取者被阻塞,直到有值可供他们读取,并且所有其他订阅的读取器也请求该消息。使用SyncDataflowBroadcast,您将获得所有读取器同时处理同一消息,并在获得下一个消息之前相互等待。

import groovyx.gpars.dataflow.SyncDataflowBroadcast
import groovyx.gpars.group.NonDaemonPGroup

/** * Shows how synchronous dataflow broadcasts can be used to throttle fast producer when serving data to slow consumers. * Unlike when using asynchronous channels, synchronous channels block both the writer and the readers until all parties are ready to exchange messages. */

def group = new NonDaemonPGroup()

final SyncDataflowBroadcast channel = new SyncDataflowBroadcast()

def subscription1 = channel.createReadChannel() def fastConsumer = group.task { while (true) { sleep 10 //simulating a fast consumer final Object msg = subscription1.val if (msg == -1) return println "Fast consumer received $msg" } }

def subscription2 = channel.createReadChannel() def slowConsumer = group.task { while (true) { sleep 500 //simulating a slow consumer final Object msg = subscription2.val if (msg == -1) return println "Slow consumer received $msg" } }

def producer = group.task { (1..30).each { println "Sending $it" channel << it println "Sent $it" } channel << -1 }

[fastConsumer, slowConsumer]*.join()

group.shutdown()

同步数据流变量

DataflowVariable不同,DataflowVariable是异步的,并且只阻塞读取器直到值绑定到变量,SyncDataflowVariable类提供了一种一次性数据交换机制,该机制会阻塞写入者和所有读取者,直到达到指定的等待方数量。

import groovyx.gpars.dataflow.SyncDataflowVariable
import groovyx.gpars.group.NonDaemonPGroup

final NonDaemonPGroup group = new NonDaemonPGroup()

final SyncDataflowVariable value = new SyncDataflowVariable(2) //two readers required to exchange the message

def writer = group.task { println "Writer about to write a value" value << 'Hello' println "Writer has written the value" }

def reader = group.task { println "Reader about to read a value" println "Reader has read the value: ${value.val}" }

def slowReader = group.task { sleep 5000 println "Slow reader about to read a value" println "Slow reader has read the value: ${value.val}" }

[reader, slowReader]*.join()

group.shutdown()

7.9 看板流

API:KanbanFlow | KanbanLink | KanbanTray | ProcessingNode

看板流

一个KanbanFlow是一个组合对象,它使用数据流抽象来定义多个并发生产者和消费者操作符之间的依赖关系。

生产者和消费者之间的每个链接由一个KanbanLink定义。

在每个 KanbanLink 内部,生产者和消费者之间的通信遵循KanbanFlow 模式(推荐阅读)中描述的模式。它们使用类型为KanbanTray的对象将产品发送到下游,并向生产者发出进一步产品的请求信号。

下图显示了一个KanbanLink,它有一个生产者、一个消费者和五个编号为 0 到 4 的托盘。托盘号 0 已用于将产品从生产者传递给消费者,已被消费者清空,现在被发送回生产者的输入队列。托盘 1 和 2 等待等待消费的产品,托盘 3 和 4 等待被生产者使用。

一个KanbanFlow对象将生产者链接到消费者,从而创建KanbanLink对象。在此活动过程中,可以构建第二个链接,其中生产者是以前创建的链接中充当消费者的同一个对象,这样这两个链接就会连接起来形成一条链。

这是一个只有一个链接的KanbanFlow的示例,例如一个生产者和一个消费者。生产者总是将数字 1 发送到下游,而消费者则打印这个数字。

import static groovyx.gpars.dataflow.ProcessingNode.node
import groovyx.gpars.dataflow.KanbanFlow

def producer = node { down -> down 1 } def consumer = node { up -> println up.take() }

new KanbanFlow().with { link producer to consumer start() // run for a while stop() }

为了将产品放入托盘并将托盘发送到下游,可以使用send()方法、<<运算符或将托盘用作方法对象。以下行是等效的

node { down -> down.send 1 }
node { down -> down << 1 }
node { down -> down 1 }

从输入托盘中使用take()方法获取产品时,空的托盘会自动释放。

您应该只调用一次take()

如果您希望不使用空托盘将产品发送到下游(通常是在ProcessingNode充当过滤器时的情况),则必须释放托盘以使其处于活动状态。否则,系统中的托盘数量会减少。您可以通过调用release()方法或使用~运算符(想想“把它抖掉”)来释放托盘。以下行是等效的

node { down -> down.release() }
node { down -> ~down }

如果您调用任何take()send()方法,托盘将自动释放。

各种链接结构

除了线性链之外,KanbanFlow还可以将单个生产者链接到多个消费者(树)或多个生产者链接到单个消费者(收集器)或上述任何组合,从而形成有向无环图 (DAG)。

KanbanFlowTest类有很多此类结构的示例,包括生产者将工作委托给多个消费者的场景

  • 一个工作窃取策略,其中所有消费者都从下游获取他们的选择,
  • 一个主从策略,其中生产者从可用的消费者中选择,以及
  • 一个广播策略,其中生产者将所有产品发送给所有消费者。

循环默认情况下是被禁止的,但如果启用,它们可以作为所谓的生成器使用。生产者甚至可以成为自己的消费者,在每个循环中增加产品价值。生成器本身保持无状态,因为价值仅作为托盘上的产品存储。这种生成器可用于例如延迟序列或作为后续流的“心跳”。

生成器“循环”的方法同样可以应用于收集器,其中收集器不维护任何内部状态,而是将集合发送给自己,并在每次调用时添加产品。

一般来说,一个ProcessingNode可以链接到自身,以便将状态导出到它发送到自身的托盘/产品。对产品的访问然后是由设计线程安全的

组合看板流

就像KanbanLink对象可以链接在一起形成KanbanFlow一样,流本身也可以再次组合,以从现有的较小流形成新的较大流。

def firstFlow = new KanbanFlow()
def producer  = node(counter)
def consumer  = node(repeater)
firstFlow.link(producer).to(consumer)

def secondFlow = new KanbanFlow() def producer2 = node(repeater) def consumer2 = node(reporter) secondFlow.link(producer2).to(consumer2)

flow = firstFlow + secondFlow

flow.start()

自定义并发特性

看板系统中的并发量由托盘数量(有时称为WIP = 在制品)决定。如果流中没有托盘,则系统不会执行任何操作。

  • 只有一个托盘时,系统仅限于顺序执行。
  • 有更多托盘时,并发开始。
  • 当托盘数量超过可用的处理单元数量时,系统开始浪费资源。

托盘数量可以通过多种方式控制。它们通常在启动流时设置。

flow.start(0) // start without trays
flow.start(1) // start with one tray per link in the flow
flow.start()  // start with the optimal number of trays

除了托盘之外,KanbanFlow也可能受其底层ThreadPool的限制。例如,大小为 1 的池不允许太多并发。

KanbanFlows使用一个默认池,该池的尺寸由可用的核心数量决定。这可以通过设置pooledGroup属性来定制。

测试
KanbanFlowTest
演示
DemoKanbanFlow
DemoKanbanFlowBroadcast
DemoKanbanFlowCycle
DemoKanbanLazyPrimeSequenceLoops

7.10 经典示例

使用数据流任务实现的埃拉托斯特尼筛法

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using dataflow tasks */

final int requestedPrimeNumberCount = 1000

final DataflowQueue initialChannel = new DataflowQueue()

/** * Generating candidate numbers */ task { (2..10000).each { initialChannel << it } }

/** * Chain a new filter for a particular prime number to the end of the Sieve * @param inChannel The current end channel to consume * @param prime The prime number to divide future prime candidates with * @return A new channel ending the whole chain */ def filter(inChannel, int prime) { def outChannel = new DataflowQueue()

task { while (true) { def number = inChannel.val if (number % prime != 0) { outChannel << number } } } return outChannel }

/** * Consume Sieve output and add additional filters for all found primes */ def currentOutput = initialChannel requestedPrimeNumberCount.times { int prime = currentOutput.val println "Found: $prime" currentOutput = filter(currentOutput, prime) }

使用数据流任务和操作符组合实现的埃拉托斯特尼筛法

import groovyx.gpars.dataflow.DataflowQueue
       import static groovyx.gpars.dataflow.Dataflow.operator
       import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using dataflow tasks and operators */

final int requestedPrimeNumberCount = 100

final DataflowQueue initialChannel = new DataflowQueue()

/** * Generating candidate numbers */ task { (2..1000).each { initialChannel << it } }

/** * Chain a new filter for a particular prime number to the end of the Sieve * @param inChannel The current end channel to consume * @param prime The prime number to divide future prime candidates with * @return A new channel ending the whole chain */ def filter(inChannel, int prime) { def outChannel = new DataflowQueue()

operator([inputs: [inChannel], outputs: [outChannel]]) { if (it % prime != 0) { bindOutput it } } return outChannel }

/** * Consume Sieve output and add additional filters for all found primes */ def currentOutput = initialChannel requestedPrimeNumberCount.times { int prime = currentOutput.val println "Found: $prime" currentOutput = filter(currentOutput, prime) }

8 STM

软件事务内存 (STM) 为开发人员提供了用于访问内存中数据的交易语义。当多个线程共享内存中的数据时,通过将代码块标记为事务 (atomic) 开发人员将数据一致性的责任委托给 Stm 引擎。GPars 利用 Multiverse Stm 引擎。在Multiverse 网站上查看有关事务引擎的更多详细信息

原子地运行一段代码

使用 Stm 时,开发人员将代码组织成事务。事务是一段代码,该代码以原子方式执行 - 要么运行所有代码,要么根本不运行。事务代码使用的数据保持一致,无论事务是否正常完成或突然中止。在事务内部运行时,代码被赋予一种隔离的幻觉,与其他并发运行的事务隔离,因此,直到事务提交之前,一个事务对数据的更改在其他事务中不可见。这为我们提供了数据库事务的ACID特性的ACI部分。数据库中非常典型的持久性事务方面,通常不是 Stm 的强制要求。

GPars 允许开发人员通过使用atomic闭包来指定事务边界。

import groovyx.gpars.stm.GParsStm
import org.multiverse.api.references.TxnInteger
import static org.multiverse.api.StmUtils.newTxnInteger

public class Account { private final TxnInteger amount = newTxnInteger(0);

public void transfer(final int a) { GParsStm.atomic { amount.increment(a); } }

public int getCurrentAmount() { GParsStm.atomicWithInt { amount.get(); } } }

有几种类型的atomic闭包,每种闭包用于不同类型的返回值

  • atomic - 返回 Object
  • atomicWithInt - 返回 int
  • atomicWithLong - 返回 long
  • atomicWithBoolean - 返回 boolean
  • atomicWithDouble - 返回 double
  • atomicWithVoid - 没有返回值

Multiverse 默认使用乐观锁策略,并自动回滚和重试发生冲突的事务。因此,开发人员应在事务代码中避免不可逆的操作(例如写入控制台、发送电子邮件、发射导弹等)。为了提高灵活性,可以通过自定义 原子块 来调整默认的 Multiverse 设置。

自定义事务属性

通常,您可能希望为某些事务属性指定不同的值(例如只读事务、锁策略、隔离级别等)。 createAtomicBlock 方法将创建一个使用提供的配置值的新 AtomicBlock

import groovyx.gpars.stm.GParsStm
import org.multiverse.api.AtomicBlock
import org.multiverse.api.PropagationLevel

final TxnExecutor block = GParsStm.createTxnExecutor(maxRetries: 3000, familyName: 'Custom', PropagationLevel: PropagationLevel.Requires, interruptible: false) assert GParsStm.atomicWithBoolean(block) { true }

然后,可以使用自定义的 AtomicBlock 来创建遵循指定设置的事务。 AtomicBlock 实例是线程安全的,可以在线程和事务之间自由重用。

使用 Transaction 对象

原子闭包将当前 Transaction 作为参数提供。然后,可以利用表示事务的 Txn 对象来手动控制事务。以下示例说明了这一点,其中我们使用 retry() 方法来阻塞当前事务,直到计数器达到所需的值。

import groovyx.gpars.stm.GParsStm
import org.multiverse.api.PropagationLevel
import org.multiverse.api.TxnExecutor

import static org.multiverse.api.StmUtils.newTxnInteger

final TxnExecutor block = GParsStm.createTxnExecutor(maxRetries: 3000, familyName: 'Custom', PropagationLevel: PropagationLevel.Requires, interruptible: false)

def counter = newTxnInteger(0) final int max = 100 Thread.start { while (counter.atomicGet() < max) { counter.atomicIncrementAndGet(1) sleep 10 } } assert max + 1 == GParsStm.atomicWithInt(block) { tx -> if (counter.get() == max) return counter.get() + 1 tx.retry() }

数据结构

您可能已经注意到,在上面的代码示例中,我们使用了专用数据结构来保存值。事实是,普通的 Java 类不支持事务,因此不能直接使用,因为 Multiverse 无法在并发事务之间安全地共享它们、提交它们或回滚它们。我们需要使用了解事务的数据

  • TxnIntRef
  • TxnLongRef
  • TxnBooleanRef
  • TxnDoubleRef
  • TxnRef

您通常可以通过 org.multiverse.api.StmUtils 类工厂方法来创建这些结构。

更多信息

我们决定不重复 Multiverse 网站上已经提供的信息。请访问 Multiverse 网站 并将其用作您与 GPars 的 Stm 进一步冒险的参考。

9 Google App Engine 集成

GPars 可以运行在 Google App Engine (GAE) 上。它可以成为 Groovy 和 Java GAE 应用程序的一部分,也可以被插入 Gaelyk。小型 GPars App Engine 集成库 提供了将 GAE 服务连接到 GPars 的所有必要基础设施。虽然您将运行在 GAE 线程上并利用 GAE 定时器服务,但高层抽象保持不变。在一些限制下,您仍然可以使用 GPars 演员、数据流、代理、并行集合和其他方便的概念。

有关在 GAE 上使用 GPars 的详细说明,请参阅 GPars App Engine 库 文档。

10 提示

GPars 通用提示

分组

像代理、演员或数据流任务和操作符这样的高层并发概念可以围绕共享线程池进行分组。 PGroup 类及其子类代表围绕线程池的方便的 GPars 包装器。使用组的工厂方法创建的对象将共享组的线程池。

def group1 = new DefaultPGroup()
def group2 = new NonDaemonPGroup()

group1.with { task {...} task {...} def op = operator(...) {...} def actor = actor{...} def anotherActor = group2.actor{...} //will belong to group2 def agent = safe(0) }

在自定义组的线程池时,请考虑使用现有的 GPars 实现 - DefaultPoolResizeablePool 类。或者,您也可以创建自己的 groovyx.gpars.scheduler.Pool 接口实现,以传递给 DefaultPGroupNonDaemonPGroup 构造函数。

Java API

GPars 的大多数功能都可以从 Java 和 Groovy 中使用。查看用户指南的 2.6 Java API - 从 Java 使用 GPars 部分,并尝试基于 maven 的独立 Java 演示应用程序 。将 GPars 带到任何地方!

10.1 性能

您在 Groovy 中的代码可以与用 Java、Scala 或任何其他编程语言编写的代码一样快。这并不奇怪,因为 GPars 从技术上来说是一个用 Java 制作的坚固美味的蛋糕,上面覆盖着一层 Groovy DSL 奶油。

然而,与 Java 不同的是,在 GPars 中,以及在其他 DSL 友好语言中,您很可能免费体验到一种有用的代码加速,这种加速来自应用程序设计得更好、更清晰。使用并发 DSL 进行编码将使您的代码库更小,代码使用并发原语作为语言结构。因此,构建健壮的并发应用程序、识别潜在的瓶颈或错误并消除它们变得更加容易。

虽然本用户指南全面介绍了如何使用 Groovy 和 GPars 来创建美观且健壮的并发代码,但让我们利用本章来重点介绍一些可以带来有趣性能提升的地方,这些地方可能需要进行一些代码调整或小的设计折衷。

并行集合

用于并行集合处理的方法,如 eachParallel()collectParallel() 等,在幕后使用 并行数组 ,这是一种高效的树状数据结构。每次调用任何并行集合方法时,都必须从原始集合构建此数据结构。因此,在链接并行方法调用时,您可能需要考虑使用 map/reduce API 或者直接使用 并行数组 API,以避免 并行数组 创建开销。

GParsPool.withPool {
    people.findAllParallel{it.isMale()}.collectParallel{it.name}.any{it == 'Joe'}
    people.parallel.filter{it.isMale()}.map{it.name}.filter{it == 'Joe'}.size() > 0
    people.parallelArray.withFilter({it.isMale()} as Predicate).withMapping({it.name} as Mapper).any{it == 'Joe'} != null
}

在许多情况下,将池大小从默认值更改为其他值可能会带来性能优势。特别是如果您的任务执行 IO 操作(如文件或数据库访问、网络等),增加池中的线程数量可能会提高性能。

GParsPool.withPool(50) {
    …
}

由于您提供给并行集合处理方法的闭包将被频繁地并行执行,因此您可以通过将其转换为 Java 来获得更小的性能提升。

Actor

GPars 演员很快。 DynamicDispatchActorsReactiveActorsDefaultActors 快大约两倍,因为它们不需要在后续消息到达之间维护隐式状态。实际上, DefaultActors 的性能与 Scala 中的演员相当,您几乎不会听到它们很慢。

如果您追求最高性能,一个良好的开端是在您的演员代码中识别以下模式

actor {
    loop {
        react {msg ->
            switch(msg) {
                case String:…
                case Integer:…
            }
        }
    }
}
并用 DynamicDispatchActor 替换它们
messageHandler {
    when{String msg -> ...}
    when{Integer msg -> ...}
}

调用 loopreact 方法的成本相当高。

DynamicDispatchActorReactiveActor 定义为类,而不是使用 messageHandlerreactor 工厂方法,也会让您获得一些速度提升

class MyHandler extends DynamicDispatchActor {
    public void handleMessage(String msg) {
        …
    }

public void handleMessage(Integer msg) { … } }

现在,将 MyHandler 类移入 Java 将从 GPars 中挤出最后一点性能。

池调整

GPars 允许您将演员围绕线程池进行分组,让您自由地以任何方式组织演员。始终值得尝试演员池的大小和类型。 FJPool 通常比 DefaultPool 提供更好的特性,但似乎对池中的线程数量更敏感。有时使用 ResizeablePoolResizeableFJPool 可以通过自动消除不需要的线程来提高性能。

def attackerGroup = new DefaultPGroup(new ResizeableFJPool(10))
def defenderGroup = new DefaultPGroup(new DefaultPool(5))

def attacker = attackerGroup.actor {...} def defender = defenderGroup.messageHandler {...} ...

Agent

GPars 代理 处理消息的速度甚至比演员更快。将代理明智地围绕线程池进行分组并调整池的大小和类型的建议同样适用于代理和演员。对于代理,您也可以从提交用 Java 编写的闭包作为消息中受益。

分享您的经验

我们听到的关于 GPars 在野外使用的消息越多,我们就越能为未来调整它。请告诉我们您如何使用 GPars 及其性能。向我们发送您的基准测试、性能比较或分析报告,以帮助我们为您调整 GPars。

10.2 集成到托管环境中

托管环境(例如 Google App Engine)对线程施加了额外的限制。为了让 GPars 更好地与这些环境集成,可以自定义默认的线程工厂和定时器工厂。 GPars_Config 类提供了静态初始化方法,允许第三方注册他们自己的 PoolFactoryTimerFactory 接口实现,这些实现将用于为演员、数据流和 PGroup 创建默认池和定时器。

public final class GParsConfig {
    private static volatile PoolFactory poolFactory;
    private static volatile TimerFactory timerFactory;

public static void setPoolFactory(final PoolFactory pool)

public static PoolFactory getPoolFactory()

public static Pool retrieveDefaultPool()

public static void setTimerFactory(final TimerFactory timerFactory)

public static TimerFactory getTimerFactory()

public static GeneralTimer retrieveDefaultTimer(final String name, final boolean daemon)

public static void shutdown() }

自定义工厂应在应用程序启动后立即注册,以便演员和数据流能够为其默认组使用它们。

关闭

GParsConfig.shutdown() 方法可以在托管环境中用于正确关闭所有异步运行的定时器并释放所有线程局部变量的内存。在调用此方法后,GPars 库将不再提供声明的服务。

兼容性

在托管环境中运行 GPars 时,可能会出现一些其他兼容性问题。最明显的一个可能是 GAE 中缺少 ForkJoinThreadPool(也称为 jsr-166y)支持。因此,Fork/Join 和 GParsPool 等功能可能无法在某些服务上使用。但是,即使使用托管的非 Java SE 线程池,GParsExecutorsPool、数据流、演员、代理和 Stm 也应该正常工作。

11 结论

这真是一个疯狂的旅程,不是吗?现在,在阅读完用户指南后,您肯定已经准备好构建快速、健壮且可靠的并发应用程序。您已经看到,您可以从许多概念中选择,每个概念都有其自己的适用领域。能够选择正确的概念来应用于给定问题,并将其与系统中的其他部分结合起来,是成为一名成功开发人员的关键。如果您认为您可以使用 GPars 来做到这一点,那么用户指南的任务就完成了。

现在,开始使用 GPars 并享受乐趣吧!