|
Akka/Scala example source code file (SupervisorMiscSpec.scala)
The SupervisorMiscSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import language.postfixOps
import akka.testkit.{ filterEvents, EventFilter }
import scala.concurrent.Await
import akka.dispatch.{ PinnedDispatcher, Dispatchers }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.pattern.ask
import scala.concurrent.duration._
import scala.util.control.NonFatal
object SupervisorMiscSpec {
val config = """
akka.actor.serialize-messages = off
pinned-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher
}
test-dispatcher {
}
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with DefaultTimeout {
"A Supervisor" must {
"restart a crashing actor and its dispatcher for any dispatcher" in {
filterEvents(EventFilter[Exception]("Kill")) {
val countDownLatch = new CountDownLatch(4)
val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds)(List(classOf[Exception])))))
val workerProps = Props(new Actor {
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
def receive = {
case "status" ⇒ this.sender() ! "OK"
case _ ⇒ this.context.stop(self)
}
})
val actor1, actor2 = Await.result((supervisor ? workerProps.withDispatcher("pinned-dispatcher")).mapTo[ActorRef], timeout.duration)
val actor3 = Await.result((supervisor ? workerProps.withDispatcher("test-dispatcher")).mapTo[ActorRef], timeout.duration)
val actor4 = Await.result((supervisor ? workerProps.withDispatcher("pinned-dispatcher")).mapTo[ActorRef], timeout.duration)
actor1 ! Kill
actor2 ! Kill
actor3 ! Kill
actor4 ! Kill
countDownLatch.await(10, TimeUnit.SECONDS)
Seq("actor1" -> actor1, "actor2" -> actor2, "actor3" -> actor3, "actor4" -> actor4) map {
case (id, ref) ⇒ (id, ref ? "status")
} foreach {
case (id, f) ⇒ (id, Await.result(f, timeout.duration)) should be((id, "OK"))
}
}
}
"be able to create named children in its constructor" in {
val a = system.actorOf(Props(new Actor {
context.actorOf(Props.empty, "bob")
def receive = { case x: Exception ⇒ throw x }
override def preStart(): Unit = testActor ! "preStart"
}))
val m = "weird message"
EventFilter[Exception](m, occurrences = 1) intercept {
a ! new Exception(m)
}
expectMsg("preStart")
expectMsg("preStart")
a.isTerminated should be(false)
}
"be able to recreate child when old child is Terminated" in {
val parent = system.actorOf(Props(new Actor {
val kid = context.watch(context.actorOf(Props.empty, "foo"))
def receive = {
case Terminated(`kid`) ⇒
try {
val newKid = context.actorOf(Props.empty, "foo")
val result =
if (newKid eq kid) "Failure: context.actorOf returned the same instance!"
else if (!kid.isTerminated) "Kid is zombie"
else if (newKid.isTerminated) "newKid was stillborn"
else if (kid.path != newKid.path) "The kids do not share the same path"
else "green"
testActor ! result
} catch {
case NonFatal(e) ⇒ testActor ! e
}
case "engage" ⇒ context.stop(kid)
}
}))
parent ! "engage"
expectMsg("green")
}
"not be able to recreate child when old child is alive" in {
val parent = system.actorOf(Props(new Actor {
def receive = {
case "engage" ⇒
try {
val kid = context.actorOf(Props.empty, "foo")
context.stop(kid)
context.actorOf(Props.empty, "foo")
testActor ! "red"
} catch {
case e: InvalidActorNameException ⇒ testActor ! "green"
}
}
}))
parent ! "engage"
expectMsg("green")
}
"be able to create a similar kid in the fault handling strategy" in {
val parent = system.actorOf(Props(new Actor {
override val supervisorStrategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) {
override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
val newKid = context.actorOf(Props.empty, child.path.name)
testActor ! { if ((newKid ne child) && newKid.path == child.path) "green" else "red" }
}
}
def receive = { case "engage" ⇒ context.stop(context.actorOf(Props.empty, "Robert")) }
}))
parent ! "engage"
expectMsg("green")
EventFilter[IllegalStateException]("handleChildTerminated failed", occurrences = 1) intercept {
system.stop(parent)
}
}
"have access to the failing child’s reference in supervisorStrategy" in {
val parent = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception ⇒ testActor ! sender(); SupervisorStrategy.Stop
}
def receive = {
case "doit" ⇒ context.actorOf(Props.empty, "child") ! Kill
}
}))
EventFilter[ActorKilledException](occurrences = 1) intercept {
parent ! "doit"
}
val p = expectMsgType[ActorRef].path
p.parent should be(parent.path)
p.name should be("child")
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka SupervisorMiscSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.