|
Akka/Scala example source code file (AggregatorSpec.scala)
The AggregatorSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import akka.testkit.{ ImplicitSender, TestKit }
import org.scalatest.FunSuiteLike
import org.scalatest.Matchers
import scala.annotation.tailrec
//#demo-code
import scala.collection._
import scala.concurrent.duration._
import scala.math.BigDecimal.int2bigDecimal
import akka.actor._
/**
* Sample and test code for the aggregator patter.
* This is based on Jamie Allen's tutorial at
* http://jaxenter.com/tutorial-asynchronous-programming-with-akka-actors-46220.html
*/
sealed trait AccountType
case object Checking extends AccountType
case object Savings extends AccountType
case object MoneyMarket extends AccountType
final case class GetCustomerAccountBalances(id: Long, accountTypes: Set[AccountType])
final case class GetAccountBalances(id: Long)
final case class AccountBalances(accountType: AccountType,
balance: Option[List[(Long, BigDecimal)]])
final case class CheckingAccountBalances(balances: Option[List[(Long, BigDecimal)]])
final case class SavingsAccountBalances(balances: Option[List[(Long, BigDecimal)]])
final case class MoneyMarketAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case object TimedOut
case object CantUnderstand
class SavingsAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) ⇒
sender() ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
}
}
class CheckingAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) ⇒
sender() ! CheckingAccountBalances(Some(List((3, 15000))))
}
}
class MoneyMarketAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) ⇒
sender() ! MoneyMarketAccountBalances(None)
}
}
class AccountBalanceRetriever extends Actor with Aggregator {
import context._
//#initial-expect
expectOnce {
case GetCustomerAccountBalances(id, types) ⇒
new AccountAggregator(sender(), id, types)
case _ ⇒
sender() ! CantUnderstand
context.stop(self)
}
//#initial-expect
class AccountAggregator(originalSender: ActorRef,
id: Long, types: Set[AccountType]) {
val results =
mutable.ArrayBuffer.empty[(AccountType, Option[List[(Long, BigDecimal)]])]
if (types.size > 0)
types foreach {
case Checking ⇒ fetchCheckingAccountsBalance()
case Savings ⇒ fetchSavingsAccountsBalance()
case MoneyMarket ⇒ fetchMoneyMarketAccountsBalance()
}
else collectBalances() // Empty type list yields empty response
context.system.scheduler.scheduleOnce(1.second, self, TimedOut)
//#expect-timeout
expect {
case TimedOut ⇒ collectBalances(force = true)
}
//#expect-timeout
//#expect-balance
def fetchCheckingAccountsBalance() {
context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case CheckingAccountBalances(balances) ⇒
results += (Checking -> balances)
collectBalances()
}
}
//#expect-balance
def fetchSavingsAccountsBalance() {
context.actorOf(Props[SavingsAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case SavingsAccountBalances(balances) ⇒
results += (Savings -> balances)
collectBalances()
}
}
def fetchMoneyMarketAccountsBalance() {
context.actorOf(Props[MoneyMarketAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case MoneyMarketAccountBalances(balances) ⇒
results += (MoneyMarket -> balances)
collectBalances()
}
}
def collectBalances(force: Boolean = false) {
if (results.size == types.size || force) {
originalSender ! results.toList // Make sure it becomes immutable
context.stop(self)
}
}
}
}
//#demo-code
//#chain-sample
final case class InitialRequest(name: String)
final case class Request(name: String)
final case class Response(name: String, value: String)
final case class EvaluationResults(name: String, eval: List[Int])
final case class FinalResponse(qualifiedValues: List[String])
/**
* An actor sample demonstrating use of unexpect and chaining.
* This is just an example and not a complete test case.
*/
class ChainingSample extends Actor with Aggregator {
expectOnce {
case InitialRequest(name) ⇒ new MultipleResponseHandler(sender(), name)
}
class MultipleResponseHandler(originalSender: ActorRef, propName: String) {
import context.dispatcher
import collection.mutable.ArrayBuffer
val values = ArrayBuffer.empty[String]
context.actorSelection("/user/request_proxies") ! Request(propName)
context.system.scheduler.scheduleOnce(50.milliseconds, self, TimedOut)
//#unexpect-sample
val handle = expect {
case Response(name, value) ⇒
values += value
if (values.size > 3) processList()
case TimedOut ⇒ processList()
}
def processList() {
unexpect(handle)
if (values.size > 0) {
context.actorSelection("/user/evaluator") ! values.toList
expectOnce {
case EvaluationResults(name, eval) ⇒ processFinal(eval)
}
} else processFinal(List.empty[Int])
}
//#unexpect-sample
def processFinal(eval: List[Int]) {
// Select only the entries coming back from eval
originalSender ! FinalResponse(eval map values)
context.stop(self)
}
}
}
//#chain-sample
class AggregatorSpec extends TestKit(ActorSystem("test")) with ImplicitSender with FunSuiteLike with Matchers {
test("Test request 1 account type") {
system.actorOf(Props[AccountBalanceRetriever]) ! GetCustomerAccountBalances(1, Set(Savings))
receiveOne(10.seconds) match {
case result: List[_] ⇒
result should have size 1
case result ⇒
assert(false, s"Expect List, got ${result.getClass}")
}
}
test("Test request 3 account types") {
system.actorOf(Props[AccountBalanceRetriever]) !
GetCustomerAccountBalances(1, Set(Checking, Savings, MoneyMarket))
receiveOne(10.seconds) match {
case result: List[_] ⇒
result should have size 3
case result ⇒
assert(false, s"Expect List, got ${result.getClass}")
}
}
}
final case class TestEntry(id: Int)
class WorkListSpec extends FunSuiteLike {
val workList = WorkList.empty[TestEntry]
var entry2: TestEntry = null
var entry4: TestEntry = null
test("Processing empty WorkList") {
// ProcessAndRemove something in the middle
val processed = workList process {
case TestEntry(9) ⇒ true
case _ ⇒ false
}
assert(!processed)
}
test("Insert temp entries") {
assert(workList.head === workList.tail)
val entry0 = TestEntry(0)
workList.add(entry0, permanent = false)
assert(workList.head.next != null)
assert(workList.tail === workList.head.next)
assert(workList.tail.ref.get === entry0)
val entry1 = TestEntry(1)
workList.add(entry1, permanent = false)
assert(workList.head.next != workList.tail)
assert(workList.head.next.ref.get === entry0)
assert(workList.tail.ref.get === entry1)
entry2 = TestEntry(2)
workList.add(entry2, permanent = false)
assert(workList.tail.ref.get === entry2)
val entry3 = TestEntry(3)
workList.add(entry3, permanent = false)
assert(workList.tail.ref.get === entry3)
}
test("Process temp entries") {
// ProcessAndRemove something in the middle
assert(workList process {
case TestEntry(2) ⇒ true
case _ ⇒ false
})
// ProcessAndRemove the head
assert(workList process {
case TestEntry(0) ⇒ true
case _ ⇒ false
})
// ProcessAndRemove the tail
assert(workList process {
case TestEntry(3) ⇒ true
case _ ⇒ false
})
}
test("Re-insert permanent entry") {
entry4 = TestEntry(4)
workList.add(entry4, permanent = true)
assert(workList.tail.ref.get === entry4)
}
test("Process permanent entry") {
assert(workList process {
case TestEntry(4) ⇒ true
case _ ⇒ false
})
}
test("Remove permanent entry") {
val removed = workList remove entry4
assert(removed)
}
test("Remove temp entry already processed") {
val removed = workList remove entry2
assert(!removed)
}
test("Process non-matching entries") {
val processed =
workList process {
case TestEntry(2) ⇒ true
case _ ⇒ false
}
assert(!processed)
val processed2 =
workList process {
case TestEntry(5) ⇒ true
case _ ⇒ false
}
assert(!processed2)
}
test("Append two lists") {
workList.removeAll()
0 to 4 foreach { id ⇒ workList.add(TestEntry(id), permanent = false) }
val l2 = new WorkList[TestEntry]
5 to 9 foreach { id ⇒ l2.add(TestEntry(id), permanent = true) }
workList addAll l2
@tailrec
def checkEntries(id: Int, entry: WorkList.Entry[TestEntry]): Int = {
if (entry == null) id
else {
assert(entry.ref.get.id === id)
checkEntries(id + 1, entry.next)
}
}
assert(checkEntries(0, workList.head.next) === 10)
}
test("Clear list") {
workList.removeAll()
assert(workList.head.next === null)
assert(workList.tail === workList.head)
}
val workList2 = WorkList.empty[PartialFunction[Any, Unit]]
val fn1: PartialFunction[Any, Unit] = {
case s: String ⇒
val result1 = workList2 remove fn1
assert(result1 === true, "First remove must return true")
val result2 = workList2 remove fn1
assert(result2 === false, "Second remove must return false")
}
val fn2: PartialFunction[Any, Unit] = {
case s: String ⇒
workList2.add(fn1, permanent = true)
}
test("Reentrant insert") {
workList2.add(fn2, permanent = false)
assert(workList2.head.next != null)
assert(workList2.tail == workList2.head.next)
// Processing inserted fn1, reentrant adding fn2
workList2 process { fn ⇒
var processed = true
fn.applyOrElse("Foo", (_: Any) ⇒ processed = false)
processed
}
}
test("Reentrant delete") {
// Processing inserted fn2, should delete itself
workList2 process { fn ⇒
var processed = true
fn.applyOrElse("Foo", (_: Any) ⇒ processed = false)
processed
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka AggregatorSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.