6 Agents - 参考文档
作者:GPars 全员
版本 1.2.1
6 Agents
Agent 类,这是一个线程安全、非阻塞的共享可变状态包装器实现,灵感来自 Clojure 中的 Agent。在架构中消除对共享可变状态的需求时,许多并发问题都会消失。事实上,Actor、CSP 或数据流并发等概念完全避免或隔离了可变状态。然而,在某些情况下,共享可变数据是不可避免的,或者会使设计更自然、更易于理解。例如,在一个典型的电子商务应用程序中,购物车可能会在多个 AJAX 请求并发访问时出现读或写请求。
简介
在 Clojure 编程语言中,您可以找到一个 Agent 的概念,其目的是保护需要在多个线程之间共享的可变数据。Agent 会隐藏数据并防止直接访问。客户端只能向 Agent 发送命令(函数)。这些命令将被序列化并依次针对数据进行处理。由于命令是串行执行的,因此命令不需要关心并发,并且可以假定在运行时数据都是它们的。尽管实现方式不同,但 GPars Agents(称为 Agent)在本质上与 Actor 的行为类似。它们接受消息并异步处理它们。但是,这些消息必须是命令(函数或 Groovy 闭包),并在 Agent 内部执行。在接收到函数后,它会针对 Agent 的内部状态运行,函数的返回值被认为是 Agent 的新内部状态。
本质上,Agent 通过只允许一个 Agent 管理的线程修改可变值来保护这些值。可变值 无法从外部直接访问,而是需要 向 Agent 发送请求,Agent 保证代表调用者顺序处理这些请求。Agent 保证所有请求的顺序执行,从而保证值的始终性。
示意图
agent = new Agent(0) //created a new Agent wrapping an integer with initial value 0 agent.send {increment()} //asynchronous send operation, sending the increment() function … //after some delay to process the message the internal Agent's state has been updated … assert agent.val== 1
概念
GPars 提供了一个 Agent 类,它是一个特殊用途的线程安全、非阻塞实现,灵感来自 Clojure 中的 Agent。
Agent 包装了对可变状态的引用(保存在单个字段中),并将代码(闭包/命令)作为消息接受,这些消息可以像发送给任何其他 Actor 一样发送给 Agent,使用'<<'运算符、send() 方法或隐式的 call() 方法。在接收到闭包/命令后的一段时间内,闭包会针对内部可变字段调用,并可以对它进行更改。闭包保证在没有其他线程干预的情况下运行,因此可以自由地更改保存在内部<i>data</i>字段中的 Agent 的内部状态。
整个更新过程是“发出并忘记”类型的,因为一旦消息(闭包)被发送到 Agent,调用者线程就可以继续执行其他操作,并在稍后返回检查 Agent.val 或 Agent.valAsync(closure) 的当前值。
基本规则
- 执行提交的命令时,它们会获得 Agent 的状态作为参数。
- 提交的命令/闭包可以调用 Agent 状态上的任何方法。
- 用新对象替换状态对象也是可能的,这可以通过 updateValue() 方法完成。
- 提交的闭包的 返回值没有特殊含义,会被忽略。
- 如果发送给 Agent 的消息 不是闭包,则它被认为是内部引用字段的 新值。
- 一个 Agent 的 val 属性将等待 Agent 队列中的所有先前命令被消耗,然后安全地返回 Agent 的值。
- valAsync() 方法将执行相同的操作 而不阻塞调用者。
- instantVal 属性将返回 Agent 内部状态的即时快照。
- 所有 Agent 实例共享一个默认的守护线程池。设置 Agent 实例的 threadPool 属性将允许它使用不同的线程池。
- 命令抛出的异常可以使用 errors 属性收集。
示例
共享成员列表
Agent 包装了成员列表,这些成员已添加到容器中。要添加新的成员,必须向 jugMembers Agent 发送消息(添加成员的命令)。
import groovyx.gpars.agent.Agent import java.util.concurrent.ExecutorService import java.util.concurrent.Executors/** * Create a new Agent wrapping a list of strings */ def jugMembers = new Agent<List<String>>(['Me']) //add Me
jugMembers.send {it.add 'James'} //add James
final Thread t1 = Thread.start { jugMembers.send {it.add 'Joe'} //add Joe }
final Thread t2 = Thread.start { jugMembers << {it.add 'Dave'} //add Dave jugMembers {it.add 'Alice'} //add Alice (using the implicit call() method) }
[t1, t2]*.join() println jugMembers.val jugMembers.valAsync {println "Current members: $it"}
jugMembers.await()
共享会议注册人数计数
Conference 类允许注册和取消注册,但这些方法只能从发送到 conference Agent 的命令中调用。
import groovyx.gpars.agent.Agent/** * Conference stores number of registrations and allows parties to register and unregister. * It inherits from the Agent class and adds the register() and unregister() private methods, * which callers may use it the commands they submit to the Conference. */ class Conference extends Agent<Long> { def Conference() { super(0) } private def register(long num) { data += num } private def unregister(long num) { data -= num } }
final Agent conference = new Conference() //new Conference created
/** * Three external parties will try to register/unregister concurrently */
final Thread t1 = Thread.start { conference << {register(10L)} //send a command to register 10 attendees }
final Thread t2 = Thread.start { conference << {register(5L)} //send a command to register 5 attendees }
final Thread t3 = Thread.start { conference << {unregister(3L)} //send a command to unregister 3 attendees }
[t1, t2, t3]*.join()
assert 12L == conference.val
工厂方法
Agent 实例也可以使用 Agent.agent() 工厂方法创建。
def jugMembers = Agent.agent ['Me'] //add Me
监听器和验证器
Agent 允许用户添加监听器和验证器。监听器会在内部状态发生更改时收到通知,而验证器有机会通过抛出异常来拒绝即将发生的更改。
final Agent counter = new Agent()counter.addListener {oldValue, newValue -> println "Changing value from $oldValue to $newValue"} counter.addListener {agent, oldValue, newValue -> println "Agent $agent changing value from $oldValue to $newValue"}
counter.addValidator {oldValue, newValue -> if (oldValue > newValue) throw new IllegalArgumentException('Things can only go up in Groovy')} counter.addValidator {agent, oldValue, newValue -> if (oldValue == newValue) throw new IllegalArgumentException('Things never stay the same for $agent')}
counter 10 counter 11 counter {updateValue 12} counter 10 //Will be rejected counter {updateValue it - 1} //Will be rejected counter {updateValue it} //Will be rejected counter {updateValue 11} //Will be rejected counter 12 //Will be rejected counter 20 counter.await()
监听器和验证器本质上都是接受两个或三个参数的闭包。从验证器抛出的异常将在 Agent 内部记录,可以使用 hasErrors() 方法进行测试,也可以通过 errors 属性检索。
assert counter.hasErrors() assert counter.errors.size() == 5
验证器注意事项
由于 Groovy 对数据类型和不可变性的限制并不严格,因此 Agent 用户应该注意潜在的障碍。如果提交的代码直接修改了状态,则在验证规则违规的情况下,验证器将无法撤销更改。有两种可能的解决方案
- 确保您绝不更改代表当前 Agent 状态的提供对象
- 在 Agent 上使用自定义复制策略,以允许 Agent 创建内部状态的副本
在这两种情况下,您都需要调用 updateValue() 以正确设置和验证新状态。
问题以及两种解决方案都显示在下面
//Create an agent storing names, rejecting 'Joe' final Closure rejectJoeValidator = {oldValue, newValue -> if ('Joe' in newValue) throw new IllegalArgumentException('Joe is not allowed to enter our list.')}Agent agent = new Agent([]) agent.addValidator rejectJoeValidator
agent {it << 'Dave'} //Accepted agent {it << 'Joe'} //Erroneously accepted, since by-passes the validation mechanism println agent.val
//Solution 1 - never alter the supplied state object agent = new Agent([]) agent.addValidator rejectJoeValidator
agent {updateValue(['Dave', * it])} //Accepted agent {updateValue(['Joe', * it])} //Rejected println agent.val
//Solution 2 - use custom copy strategy on the agent agent = new Agent([], {it.clone()}) agent.addValidator rejectJoeValidator
agent {updateValue it << 'Dave'} //Accepted agent {updateValue it << 'Joe'} //Rejected, since 'it' is now just a copy of the internal agent's state println agent.val
分组
默认情况下,所有 Agent 实例都属于同一个组,共享其守护线程池。
自定义组也可以创建 Agent 实例。这些实例将属于创建它们的组,并将共享线程池。要创建属于某个组的 Agent 实例,请在该组上调用 agent() 工厂方法。这样,您可以组织和调整 Agent 的性能。
final def group = new NonDaemonPGroup(5) //create a group around a thread pool def jugMembers = group.agent(['Me']) //add Me
Agent 的默认线程池包含守护线程。确保您的自定义线程池也使用守护线程,这可以通过使用 DefaultPGroup 或在线程池构造函数中提供您自己的线程工厂来实现,或者如果您 的线程池使用非守护线程(例如使用 NonDaemonPGroup 组类时),请确保您明确地关闭该组或线程池,方法是调用其 shutdown() 方法,否则您的应用程序将不会退出。
直接替换池
或者,通过在 Agent 实例上调用 attachToThreadPool() 方法,可以为它指定一个自定义线程池。
def jugMembers = new Agent<List<String>>(['Me']) //add Mefinal ExecutorService pool = Executors.newFixedThreadPool(10) jugMembers.attachToThreadPool(new DefaultPool(pool))
请记住,就像 Actor 一样,单个 Agent 实例(也称为 Agent)一次只能使用一个线程。
购物车示例
import groovyx.gpars.agent.Agentclass ShoppingCart { private def cartState = new Agent([:]) //----------------- public methods below here ---------------------------------- public void addItem(String product, int quantity) { cartState << {it[product] = quantity} //the << operator sends //a message to the Agent } public void removeItem(String product) { cartState << {it.remove(product)} } public Object listContent() { return cartState.val } public void clearItems() { cartState << performClear }
public void increaseQuantity(String product, int quantityChange) { cartState << this.&changeQuantity.curry(product, quantityChange) } //----------------- private methods below here --------------------------------- private void changeQuantity(String product, int quantityChange, Map items) { items[product] = (items[product] ?: 0) + quantityChange } private Closure performClear = { it.clear() } } //----------------- script code below here ------------------------------------- final ShoppingCart cart = new ShoppingCart() cart.addItem 'Pilsner', 10 cart.addItem 'Budweisser', 5 cart.addItem 'Staropramen', 20
cart.removeItem 'Budweisser' cart.addItem 'Budweisser', 15
println "Contents ${cart.listContent()}"
cart.increaseQuantity 'Budweisser', 3 println "Contents ${cart.listContent()}"
cart.clearItems() println "Contents ${cart.listContent()}"
- 公共方法可以在内部将所需的代码发送到 Agent,而不是直接执行相同的功能
因此,像这样的顺序代码
public void addItem(String product, int quantity) { cartState[product]=quantity}
public void addItem(String product, int quantity) { cartState << {it[product] = quantity} }
public void clearItems() { cartState << performClear }private Closure performClear = { it.clear() }
打印机服务示例
另一个示例——一个由多个线程共享的非线程安全的打印机服务。打印机需要在打印之前设置文档和质量属性,因此如果没有适当的保护,可能会出现竞争条件。调用者不想阻塞到打印机可用为止,而 Actor 的“发出并忘记”特性可以非常优雅地解决这个问题。
import groovyx.gpars.agent.Agent/** * A non-thread-safe service that slowly prints documents on at a time */ class PrinterService { String document String quality
public void printDocument() { println "Printing $document in $quality quality" Thread.sleep 5000 println "Done printing $document" } }
def printer = new Agent<PrinterService>(new PrinterService())
final Thread thread1 = Thread.start { for (num in (1..3)) { final String text = "document $num" printer << {printerService -> printerService.document = text printerService.quality = 'High' printerService.printDocument() } Thread.sleep 200 } println 'Thread 1 is ready to do something else. All print tasks have been submitted' }
final Thread thread2 = Thread.start { for (num in (1..4)) { final String text = "picture $num" printer << {printerService -> printerService.document = text printerService.quality = 'Medium' printerService.printDocument() } Thread.sleep 500 } println 'Thread 2 is ready to do something else. All print tasks have been submitted' }
[thread1, thread2]*.join() printer.await()
读取值
为了紧密遵循 Clojure 的哲学,Agent 类赋予读取比写入更高的优先级。通过使用 instantVal 属性,您的读取请求将绕过 Agent 的传入消息队列,并返回内部状态的当前快照。 val 属性将像非阻塞变体 valAsync(Clojure cl) 一样在消息队列中等待处理,后者将使用内部状态作为参数调用提供的闭包。
您必须牢记, instantVal 属性可能会返回结果,尽管正确,但看起来是随机的,因为 Agent 在执行 instantVal 时 的内部状态是非确定性的,并且取决于在线程调度器执行 instantVal 的主体之前已处理的消息。
await() 方法允许您等待处理提交给 Agent 的所有消息,从而阻塞调用线程。
状态复制策略
为了避免泄露内部状态,Agent 类允许指定复制策略作为第二个构造函数参数。指定复制策略后,内部状态将由复制策略闭包处理,复制策略值的输出值将返回给调用者,而不是实际的内部状态。这适用于 instantVal、val 以及 valAsync() 。
错误处理
从提交的命令内部抛出的异常将存储在 Agent 中,可以从 errors 属性获得。该属性在读取后会被清除。
def jugMembers = new Agent<List>() assert jugMembers.errors.emptyjugMembers.send {throw new IllegalStateException('test1')} jugMembers.send {throw new IllegalArgumentException('test2')} jugMembers.await()
List errors = jugMembers.errors assert 2 == errors.size() assert errors[0] instanceof IllegalStateException assert 'test1' == errors[0].message assert errors[1] instanceof IllegalArgumentException assert 'test2' == errors[1].message
assert jugMembers.errors.empty
公平与非公平 Agent
Agent 可以是公平的也可以是非公平的。公平 Agent 在处理完每条消息后就会放弃线程,非公平 Agent 会一直保持线程,直到其消息队列为空。因此,非公平 Agent 的性能往往优于公平 Agent。所有 Agent 实例的默认设置是 非公平的,但可以通过调用其 makeFair() 方法将其变为公平的。
def jugMembers = new Agent<List>(['Me']) //add Me
jugMembers.makeFair()