(快速参考)

6 Agents - 参考文档

作者:GPars 全员

版本 1.2.1

6 Agents

Agent 类,这是一个线程安全、非阻塞的共享可变状态包装器实现,灵感来自 Clojure 中的 Agent。

在架构中消除对共享可变状态的需求时,许多并发问题都会消失。事实上,Actor、CSP 或数据流并发等概念完全避免或隔离了可变状态。然而,在某些情况下,共享可变数据是不可避免的,或者会使设计更自然、更易于理解。例如,在一个典型的电子商务应用程序中,购物车可能会在多个 AJAX 请求并发访问时出现读或写请求。

简介

在 Clojure 编程语言中,您可以找到一个 Agent 的概念,其目的是保护需要在多个线程之间共享的可变数据。Agent 会隐藏数据并防止直接访问。客户端只能向 Agent 发送命令(函数)。这些命令将被序列化并依次针对数据进行处理。由于命令是串行执行的,因此命令不需要关心并发,并且可以假定在运行时数据都是它们的。尽管实现方式不同,但 GPars Agents(称为 Agent)在本质上与 Actor 的行为类似。它们接受消息并异步处理它们。但是,这些消息必须是命令(函数或 Groovy 闭包),并在 Agent 内部执行。在接收到函数后,它会针对 Agent 的内部状态运行,函数的返回值被认为是 Agent 的新内部状态。

本质上,Agent 通过只允许一个 Agent 管理的线程修改可变值来保护这些值。可变值 无法从外部直接访问,而是需要 向 Agent 发送请求,Agent 保证代表调用者顺序处理这些请求。Agent 保证所有请求的顺序执行,从而保证值的始终性。

示意图

agent = new Agent(0)  //created a new Agent wrapping an integer with initial value 0
agent.send {increment()}  //asynchronous send operation, sending the increment() function
…
//after some delay to process the message the internal Agent's state has been updated
…
assert agent.val== 1
为了包装整数,我们当然可以在 Java 平台上使用 AtomicXXX 类型,但是当状态是更复杂的对象时,我们需要更多支持。

概念

GPars 提供了一个 Agent 类,它是一个特殊用途的线程安全、非阻塞实现,灵感来自 Clojure 中的 Agent。

Agent 包装了对可变状态的引用(保存在单个字段中),并将代码(闭包/命令)作为消息接受,这些消息可以像发送给任何其他 Actor 一样发送给 Agent,使用'<<'运算符、send() 方法或隐式的 call() 方法。在接收到闭包/命令后的一段时间内,闭包会针对内部可变字段调用,并可以对它进行更改。闭包保证在没有其他线程干预的情况下运行,因此可以自由地更改保存在内部<i>data</i>字段中的 Agent 的内部状态。

整个更新过程是“发出并忘记”类型的,因为一旦消息(闭包)被发送到 Agent,调用者线程就可以继续执行其他操作,并在稍后返回检查 Agent.val 或 Agent.valAsync(closure) 的当前值。

基本规则

  • 执行提交的命令时,它们会获得 Agent 的状态作为参数。
  • 提交的命令/闭包可以调用 Agent 状态上的任何方法。
  • 用新对象替换状态对象也是可能的,这可以通过 updateValue() 方法完成。
  • 提交的闭包的 返回值没有特殊含义,会被忽略。
  • 如果发送给 Agent 的消息 不是闭包,则它被认为是内部引用字段的 新值
  • 一个 Agentval 属性将等待 Agent 队列中的所有先前命令被消耗,然后安全地返回 Agent 的值。
  • valAsync() 方法将执行相同的操作 而不阻塞调用者。
  • instantVal 属性将返回 Agent 内部状态的即时快照。
  • 所有 Agent 实例共享一个默认的守护线程池。设置 Agent 实例的 threadPool 属性将允许它使用不同的线程池。
  • 命令抛出的异常可以使用 errors 属性收集。

示例

共享成员列表

Agent 包装了成员列表,这些成员已添加到容器中。要添加新的成员,必须向 jugMembers Agent 发送消息(添加成员的命令)。

import groovyx.gpars.agent.Agent
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

/** * Create a new Agent wrapping a list of strings */ def jugMembers = new Agent<List<String>>(['Me']) //add Me

jugMembers.send {it.add 'James'} //add James

final Thread t1 = Thread.start { jugMembers.send {it.add 'Joe'} //add Joe }

final Thread t2 = Thread.start { jugMembers << {it.add 'Dave'} //add Dave jugMembers {it.add 'Alice'} //add Alice (using the implicit call() method) }

[t1, t2]*.join() println jugMembers.val jugMembers.valAsync {println "Current members: $it"}

jugMembers.await()

共享会议注册人数计数

Conference 类允许注册和取消注册,但这些方法只能从发送到 conference Agent 的命令中调用。

import groovyx.gpars.agent.Agent

/** * Conference stores number of registrations and allows parties to register and unregister. * It inherits from the Agent class and adds the register() and unregister() private methods, * which callers may use it the commands they submit to the Conference. */ class Conference extends Agent<Long> { def Conference() { super(0) } private def register(long num) { data += num } private def unregister(long num) { data -= num } }

final Agent conference = new Conference() //new Conference created

/** * Three external parties will try to register/unregister concurrently */

final Thread t1 = Thread.start { conference << {register(10L)} //send a command to register 10 attendees }

final Thread t2 = Thread.start { conference << {register(5L)} //send a command to register 5 attendees }

final Thread t3 = Thread.start { conference << {unregister(3L)} //send a command to unregister 3 attendees }

[t1, t2, t3]*.join()

assert 12L == conference.val

工厂方法

Agent 实例也可以使用 Agent.agent() 工厂方法创建。

def jugMembers = Agent.agent ['Me']  //add Me

监听器和验证器

Agent 允许用户添加监听器和验证器。监听器会在内部状态发生更改时收到通知,而验证器有机会通过抛出异常来拒绝即将发生的更改。

final Agent counter = new Agent()

counter.addListener {oldValue, newValue -> println "Changing value from $oldValue to $newValue"} counter.addListener {agent, oldValue, newValue -> println "Agent $agent changing value from $oldValue to $newValue"}

counter.addValidator {oldValue, newValue -> if (oldValue > newValue) throw new IllegalArgumentException('Things can only go up in Groovy')} counter.addValidator {agent, oldValue, newValue -> if (oldValue == newValue) throw new IllegalArgumentException('Things never stay the same for $agent')}

counter 10 counter 11 counter {updateValue 12} counter 10 //Will be rejected counter {updateValue it - 1} //Will be rejected counter {updateValue it} //Will be rejected counter {updateValue 11} //Will be rejected counter 12 //Will be rejected counter 20 counter.await()

监听器和验证器本质上都是接受两个或三个参数的闭包。从验证器抛出的异常将在 Agent 内部记录,可以使用 hasErrors() 方法进行测试,也可以通过 errors 属性检索。

assert counter.hasErrors()
assert counter.errors.size() == 5

验证器注意事项

由于 Groovy 对数据类型和不可变性的限制并不严格,因此 Agent 用户应该注意潜在的障碍。如果提交的代码直接修改了状态,则在验证规则违规的情况下,验证器将无法撤销更改。有两种可能的解决方案

  1. 确保您绝不更改代表当前 Agent 状态的提供对象
  2. 在 Agent 上使用自定义复制策略,以允许 Agent 创建内部状态的副本

在这两种情况下,您都需要调用 updateValue() 以正确设置和验证新状态。

问题以及两种解决方案都显示在下面

//Create an agent storing names, rejecting 'Joe'
final Closure rejectJoeValidator = {oldValue, newValue -> if ('Joe' in newValue) throw new IllegalArgumentException('Joe is not allowed to enter our list.')}

Agent agent = new Agent([]) agent.addValidator rejectJoeValidator

agent {it << 'Dave'} //Accepted agent {it << 'Joe'} //Erroneously accepted, since by-passes the validation mechanism println agent.val

//Solution 1 - never alter the supplied state object agent = new Agent([]) agent.addValidator rejectJoeValidator

agent {updateValue(['Dave', * it])} //Accepted agent {updateValue(['Joe', * it])} //Rejected println agent.val

//Solution 2 - use custom copy strategy on the agent agent = new Agent([], {it.clone()}) agent.addValidator rejectJoeValidator

agent {updateValue it << 'Dave'} //Accepted agent {updateValue it << 'Joe'} //Rejected, since 'it' is now just a copy of the internal agent's state println agent.val

分组

默认情况下,所有 Agent 实例都属于同一个组,共享其守护线程池。

自定义组也可以创建 Agent 实例。这些实例将属于创建它们的组,并将共享线程池。要创建属于某个组的 Agent 实例,请在该组上调用 agent() 工厂方法。这样,您可以组织和调整 Agent 的性能。

final def group = new NonDaemonPGroup(5)  //create a group around a thread pool
def jugMembers = group.agent(['Me'])  //add Me

Agent 的默认线程池包含守护线程。确保您的自定义线程池也使用守护线程,这可以通过使用 DefaultPGroup 或在线程池构造函数中提供您自己的线程工厂来实现,或者如果您 的线程池使用非守护线程(例如使用 NonDaemonPGroup 组类时),请确保您明确地关闭该组或线程池,方法是调用其 shutdown() 方法,否则您的应用程序将不会退出。

直接替换池

或者,通过在 Agent 实例上调用 attachToThreadPool() 方法,可以为它指定一个自定义线程池。

def jugMembers = new Agent<List<String>>(['Me'])  //add Me

final ExecutorService pool = Executors.newFixedThreadPool(10) jugMembers.attachToThreadPool(new DefaultPool(pool))

请记住,就像 Actor 一样,单个 Agent 实例(也称为 Agent)一次只能使用一个线程。

购物车示例

import groovyx.gpars.agent.Agent

class ShoppingCart { private def cartState = new Agent([:]) //----------------- public methods below here ---------------------------------- public void addItem(String product, int quantity) { cartState << {it[product] = quantity} //the << operator sends //a message to the Agent } public void removeItem(String product) { cartState << {it.remove(product)} } public Object listContent() { return cartState.val } public void clearItems() { cartState << performClear }

public void increaseQuantity(String product, int quantityChange) { cartState << this.&changeQuantity.curry(product, quantityChange) } //----------------- private methods below here --------------------------------- private void changeQuantity(String product, int quantityChange, Map items) { items[product] = (items[product] ?: 0) + quantityChange } private Closure performClear = { it.clear() } } //----------------- script code below here ------------------------------------- final ShoppingCart cart = new ShoppingCart() cart.addItem 'Pilsner', 10 cart.addItem 'Budweisser', 5 cart.addItem 'Staropramen', 20

cart.removeItem 'Budweisser' cart.addItem 'Budweisser', 15

println "Contents ${cart.listContent()}"

cart.increaseQuantity 'Budweisser', 3 println "Contents ${cart.listContent()}"

cart.clearItems() println "Contents ${cart.listContent()}"

您可能已经注意到代码中的两种实现策略。
  1. 公共方法可以在内部将所需的代码发送到 Agent,而不是直接执行相同的功能

因此,像这样的顺序代码

public void addItem(String product, int quantity) {
    cartState[product]=quantity

}

变成了

public void addItem(String product, int quantity) {
    cartState << {it[product] = quantity}
}
2. 公共方法可以发送对内部私有方法或闭包的引用,这些方法或闭包包含要执行的所需功能
public void clearItems() {
    cartState << performClear
}

private Closure performClear = { it.clear() }

可能需要柯里化,如果闭包除了当前内部状态实例之外还需要其他参数。请参阅 increaseQuantity 方法。

打印机服务示例

另一个示例——一个由多个线程共享的非线程安全的打印机服务。打印机需要在打印之前设置文档和质量属性,因此如果没有适当的保护,可能会出现竞争条件。调用者不想阻塞到打印机可用为止,而 Actor 的“发出并忘记”特性可以非常优雅地解决这个问题。

import groovyx.gpars.agent.Agent

/** * A non-thread-safe service that slowly prints documents on at a time */ class PrinterService { String document String quality

public void printDocument() { println "Printing $document in $quality quality" Thread.sleep 5000 println "Done printing $document" } }

def printer = new Agent<PrinterService>(new PrinterService())

final Thread thread1 = Thread.start { for (num in (1..3)) { final String text = "document $num" printer << {printerService -> printerService.document = text printerService.quality = 'High' printerService.printDocument() } Thread.sleep 200 } println 'Thread 1 is ready to do something else. All print tasks have been submitted' }

final Thread thread2 = Thread.start { for (num in (1..4)) { final String text = "picture $num" printer << {printerService -> printerService.document = text printerService.quality = 'Medium' printerService.printDocument() } Thread.sleep 500 } println 'Thread 2 is ready to do something else. All print tasks have been submitted' }

[thread1, thread2]*.join() printer.await()

有关最新更新,请参阅相应的演示。

读取值

为了紧密遵循 Clojure 的哲学,Agent 类赋予读取比写入更高的优先级。通过使用 instantVal 属性,您的读取请求将绕过 Agent 的传入消息队列,并返回内部状态的当前快照。 val 属性将像非阻塞变体 valAsync(Clojure cl) 一样在消息队列中等待处理,后者将使用内部状态作为参数调用提供的闭包。

您必须牢记, instantVal 属性可能会返回结果,尽管正确,但看起来是随机的,因为 Agent 在执行 instantVal 时 的内部状态是非确定性的,并且取决于在线程调度器执行 instantVal 的主体之前已处理的消息。

await() 方法允许您等待处理提交给 Agent 的所有消息,从而阻塞调用线程。

状态复制策略

为了避免泄露内部状态,Agent 类允许指定复制策略作为第二个构造函数参数。指定复制策略后,内部状态将由复制策略闭包处理,复制策略值的输出值将返回给调用者,而不是实际的内部状态。这适用于 instantValval 以及 valAsync()

错误处理

从提交的命令内部抛出的异常将存储在 Agent 中,可以从 errors 属性获得。该属性在读取后会被清除。

def jugMembers = new Agent<List>()
    assert jugMembers.errors.empty

jugMembers.send {throw new IllegalStateException('test1')} jugMembers.send {throw new IllegalArgumentException('test2')} jugMembers.await()

List errors = jugMembers.errors assert 2 == errors.size() assert errors[0] instanceof IllegalStateException assert 'test1' == errors[0].message assert errors[1] instanceof IllegalArgumentException assert 'test2' == errors[1].message

assert jugMembers.errors.empty

公平与非公平 Agent

Agent 可以是公平的也可以是非公平的。公平 Agent 在处理完每条消息后就会放弃线程,非公平 Agent 会一直保持线程,直到其消息队列为空。因此,非公平 Agent 的性能往往优于公平 Agent。所有 Agent 实例的默认设置是 非公平的,但可以通过调用其 makeFair() 方法将其变为公平的。

def jugMembers = new Agent<List>(['Me'])  //add Me
    jugMembers.makeFair()