|
Akka/Scala example source code file (ActorDSLSpec.scala)
The ActorDSLSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import language.postfixOps
import akka.testkit.{ AkkaSpec, EventFilter }
import akka.actor.ActorDSL._
import akka.event.Logging.Warning
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
class ActorDSLDummy {
//#import
import akka.actor.ActorDSL._
import akka.actor.ActorSystem
implicit val system = ActorSystem("demo")
//#import
}
class ActorDSLSpec extends AkkaSpec {
val echo = system.actorOf(Props(new Actor {
def receive = {
case x ⇒ sender() ! x
}
}))
"An Inbox" must {
"function as implicit sender" in {
//#inbox
implicit val i = inbox()
echo ! "hello"
i.receive() should be("hello")
//#inbox
}
"support watch" in {
//#watch
val target = // some actor
//#watch
actor(new Act {})
//#watch
val i = inbox()
i watch target
//#watch
target ! PoisonPill
i receive 1.second should be(Terminated(target)(true, false))
}
"support queueing multiple queries" in {
val i = inbox()
import system.dispatcher
val res = Future.sequence(Seq(
Future { i.receive() } recover { case x ⇒ x },
Future { Thread.sleep(100); i.select() { case "world" ⇒ 1 } } recover { case x ⇒ x },
Future { Thread.sleep(200); i.select() { case "hello" ⇒ 2 } } recover { case x ⇒ x }))
Thread.sleep(1000)
res.isCompleted should be(false)
i.receiver ! 42
i.receiver ! "hello"
i.receiver ! "world"
Await.result(res, 5 second) should be(Seq(42, 1, 2))
}
"support selective receives" in {
val i = inbox()
i.receiver ! "hello"
i.receiver ! "world"
val result = i.select() {
case "world" ⇒ true
}
result should be(true)
i.receive() should be("hello")
}
"have a maximum queue size" in {
val i = inbox()
system.eventStream.subscribe(testActor, classOf[Warning])
try {
for (_ ← 1 to 1000) i.receiver ! 0
expectNoMsg(1 second)
EventFilter.warning(start = "dropping message", occurrences = 1) intercept {
i.receiver ! 42
}
expectMsgType[Warning]
i.receiver ! 42
expectNoMsg(1 second)
val gotit = for (_ ← 1 to 1000) yield i.receive()
gotit should be((1 to 1000) map (_ ⇒ 0))
intercept[TimeoutException] {
i.receive(1 second)
}
} finally {
system.eventStream.unsubscribe(testActor, classOf[Warning])
}
}
"have a default and custom timeouts" in {
val i = inbox()
within(5 seconds, 6 seconds) {
intercept[TimeoutException](i.receive())
}
within(1 second) {
intercept[TimeoutException](i.receive(100 millis))
}
}
}
"A lightweight creator" must {
"support creating regular actors" in {
//#simple-actor
val a = actor(new Act {
become {
case "hello" ⇒ sender() ! "hi"
}
})
//#simple-actor
implicit val i = inbox()
a ! "hello"
i.receive() should be("hi")
}
"support becomeStacked" in {
//#becomeStacked
val a = actor(new Act {
become { // this will replace the initial (empty) behavior
case "info" ⇒ sender() ! "A"
case "switch" ⇒
becomeStacked { // this will stack upon the "A" behavior
case "info" ⇒ sender() ! "B"
case "switch" ⇒ unbecome() // return to the "A" behavior
}
case "lobotomize" ⇒ unbecome() // OH NOES: Actor.emptyBehavior
}
})
//#becomeStacked
implicit val sender = testActor
a ! "info"
expectMsg("A")
a ! "switch"
a ! "info"
expectMsg("B")
a ! "switch"
a ! "info"
expectMsg("A")
}
"support setup/teardown" in {
//#simple-start-stop
val a = actor(new Act {
whenStarting { testActor ! "started" }
whenStopping { testActor ! "stopped" }
})
//#simple-start-stop
system stop a
expectMsg("started")
expectMsg("stopped")
}
"support restart" in {
//#failing-actor
val a = actor(new Act {
become {
case "die" ⇒ throw new Exception
}
whenFailing { case m @ (cause, msg) ⇒ testActor ! m }
whenRestarted { cause ⇒ testActor ! cause }
})
//#failing-actor
EventFilter[Exception](occurrences = 1) intercept {
a ! "die"
}
expectMsgPF() { case (x: Exception, Some("die")) ⇒ }
expectMsgPF() { case _: Exception ⇒ }
}
"support superviseWith" in {
val a = actor(new Act {
val system = null // shadow the implicit system
//#supervise-with
superviseWith(OneForOneStrategy() {
case e: Exception if e.getMessage == "hello" ⇒ Stop
case _: Exception ⇒ Resume
})
//#supervise-with
val child = actor("child")(new Act {
whenFailing { (_, _) ⇒ }
become {
case ref: ActorRef ⇒ whenStopping(ref ! "stopped")
case ex: Exception ⇒ throw ex
}
})
become {
case x ⇒ child ! x
}
})
a ! testActor
EventFilter.warning("hi", occurrences = 1) intercept {
a ! new Exception("hi")
}
expectNoMsg(1 second)
EventFilter[Exception]("hello", occurrences = 1) intercept {
a ! new Exception("hello")
}
expectMsg("stopped")
}
"supported nested declaration" in {
val system = this.system
//#nested-actor
// here we pass in the ActorRefFactory explicitly as an example
val a = actor(system, "fred")(new Act {
val b = actor("barney")(new Act {
whenStarting { context.parent ! ("hello from " + self.path) }
})
become {
case x ⇒ testActor ! x
}
})
//#nested-actor
expectMsg("hello from akka://ActorDSLSpec/user/fred/barney")
lastSender should be(a)
}
"support Stash" in {
//#act-with-stash
val a = actor(new ActWithStash {
become {
case 1 ⇒ stash()
case 2 ⇒
testActor ! 2; unstashAll(); becomeStacked {
case 1 ⇒ testActor ! 1; unbecome()
}
}
})
//#act-with-stash
a ! 1
a ! 2
expectMsg(2)
expectMsg(1)
a ! 1
a ! 2
expectMsg(2)
expectMsg(1)
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ActorDSLSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.