|
Akka/Scala example source code file (SupervisorSpec.scala)
The SupervisorSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.actor import language.postfixOps import org.scalatest.BeforeAndAfterEach import scala.concurrent.duration._ import akka.{ Die, Ping } import akka.testkit.TestEvent._ import akka.testkit._ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import akka.pattern.ask object SupervisorSpec { val Timeout = 5.seconds case object DieReply // ===================================================== // Message logs // ===================================================== val PingMessage = "ping" val PongMessage = "pong" val ExceptionMessage = "Expected exception; to test fault-tolerance" // ===================================================== // Actors // ===================================================== class PingPongActor(sendTo: ActorRef) extends Actor { def receive = { case Ping ⇒ sendTo ! PingMessage if (sender() != sendTo) sender() ! PongMessage case Die ⇒ throw new RuntimeException(ExceptionMessage) case DieReply ⇒ val e = new RuntimeException(ExceptionMessage) sender() ! Status.Failure(e) throw e } override def postRestart(reason: Throwable) { sendTo ! reason.getMessage } } class Master(sendTo: ActorRef) extends Actor { val temp = context.watch(context.actorOf(Props(new PingPongActor(sendTo)))) var s: ActorRef = _ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])) def receive = { case Die ⇒ temp forward Die case Terminated(`temp`) ⇒ sendTo ! "terminated" case Status.Failure(_) ⇒ /*Ignore*/ } } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SupervisorSpec extends AkkaSpec("akka.actor.serialize-messages = off") with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { import SupervisorSpec._ val DilatedTimeout = Timeout.dilated // ===================================================== // Creating actors and supervisors // ===================================================== private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) def temporaryActorAllForOne = { val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception]))))) val temporaryActor = child(supervisor, Props(new PingPongActor(testActor))) (temporaryActor, supervisor) } def singleActorAllForOne = { val supervisor = system.actorOf(Props(new Supervisor( AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong = child(supervisor, Props(new PingPongActor(testActor))) (pingpong, supervisor) } def singleActorOneForOne = { val supervisor = system.actorOf(Props(new Supervisor( OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong = child(supervisor, Props(new PingPongActor(testActor))) (pingpong, supervisor) } def multipleActorsAllForOne = { val supervisor = system.actorOf(Props(new Supervisor( AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, supervisor) } def multipleActorsOneForOne = { val supervisor = system.actorOf(Props(new Supervisor( OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, supervisor) } def nestedSupervisorsAllForOne = { val topSupervisor = system.actorOf(Props(new Supervisor( AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception]))))) val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor))) val middleSupervisor = child(topSupervisor, Props(new Supervisor( AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(Nil)))) val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor))) (pingpong1, pingpong2, pingpong3, topSupervisor) } override def atStartup() { system.eventStream.publish(Mute(EventFilter[RuntimeException](ExceptionMessage))) } override def beforeEach() = { } def ping(pingPongActor: ActorRef) = { Await.result(pingPongActor.?(Ping)(DilatedTimeout), DilatedTimeout) should be(PongMessage) expectMsg(Timeout, PingMessage) } def kill(pingPongActor: ActorRef) = { val result = (pingPongActor.?(DieReply)(DilatedTimeout)) expectMsg(Timeout, ExceptionMessage) intercept[RuntimeException] { Await.result(result, DilatedTimeout) } } "A supervisor" must { "not restart child more times than permitted" in { val master = system.actorOf(Props(new Master(testActor))) master ! Die expectMsg(3 seconds, "terminated") expectNoMsg(1 second) } "restart properly when same instance is returned" in { val restarts = 3 //max number of restarts lazy val childInstance = new Actor { var preRestarts = 0 var postRestarts = 0 var preStarts = 0 var postStops = 0 override def preRestart(reason: Throwable, message: Option[Any]) { preRestarts += 1; testActor ! ("preRestart" + preRestarts) } override def postRestart(reason: Throwable) { postRestarts += 1; testActor ! ("postRestart" + postRestarts) } override def preStart() { preStarts += 1; testActor ! ("preStart" + preStarts) } override def postStop() { postStops += 1; testActor ! ("postStop" + postStops) } def receive = { case "crash" ⇒ { testActor ! "crashed"; throw new RuntimeException("Expected") } case "ping" ⇒ sender() ! "pong" } } val master = system.actorOf(Props(new Actor { override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = restarts)(List(classOf[Exception])) val child = context.actorOf(Props(childInstance)) def receive = { case msg ⇒ child forward msg } })) expectMsg("preStart1") master ! "ping" expectMsg("pong") filterEvents(EventFilter[RuntimeException]("Expected", occurrences = restarts + 1)) { (1 to restarts) foreach { i ⇒ master ! "crash" expectMsg("crashed") expectMsg("preRestart" + i) expectMsg("postRestart" + i) master ! "ping" expectMsg("pong") } master ! "crash" expectMsg("crashed") expectMsg("postStop1") } expectNoMsg(1 second) } "not restart temporary actor" in { val (temporaryActor, _) = temporaryActorAllForOne intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(DilatedTimeout), DilatedTimeout) } expectNoMsg(1 second) } "start server for nested supervisor hierarchy" in { val (actor1, _, _, _) = nestedSupervisorsAllForOne ping(actor1) expectNoMsg(1 second) } "kill single actor OneForOne" in { val (actor, _) = singleActorOneForOne kill(actor) } "call-kill-call single actor OneForOne" in { val (actor, supervisor) = singleActorOneForOne ping(actor) kill(actor) ping(actor) } "kill single actor AllForOne" in { val (actor, supervisor) = singleActorAllForOne kill(actor) } "call-kill-call single actor AllForOne" in { val (actor, supervisor) = singleActorAllForOne ping(actor) kill(actor) ping(actor) } "kill multiple actors OneForOne 1" in { val (actor1, actor2, actor3, supervisor) = multipleActorsOneForOne kill(actor1) } "kill multiple actors OneForOne 2" in { val (actor1, actor2, actor3, supervisor) = multipleActorsOneForOne kill(actor3) } "call-kill-call multiple actors OneForOne" in { val (actor1, actor2, actor3, supervisor) = multipleActorsOneForOne ping(actor1) ping(actor2) ping(actor3) kill(actor2) ping(actor1) ping(actor2) ping(actor3) } "kill multiple actors AllForOne" in { val (actor1, actor2, actor3, supervisor) = multipleActorsAllForOne kill(actor2) // and two more exception messages expectMsg(Timeout, ExceptionMessage) expectMsg(Timeout, ExceptionMessage) } "call-kill-call multiple actors AllForOne" in { val (actor1, actor2, actor3, supervisor) = multipleActorsAllForOne ping(actor1) ping(actor2) ping(actor3) kill(actor2) // and two more exception messages expectMsg(Timeout, ExceptionMessage) expectMsg(Timeout, ExceptionMessage) ping(actor1) ping(actor2) ping(actor3) } "one-way kill single actor OneForOne" in { val (actor, _) = singleActorOneForOne actor ! Die expectMsg(Timeout, ExceptionMessage) } "one-way call-kill-call single actor OneForOne" in { val (actor, _) = singleActorOneForOne actor ! Ping actor ! Die actor ! Ping expectMsg(Timeout, PingMessage) expectMsg(Timeout, ExceptionMessage) expectMsg(Timeout, PingMessage) } "restart killed actors in nested superviser hierarchy" in { val (actor1, actor2, actor3, _) = nestedSupervisorsAllForOne ping(actor1) ping(actor2) ping(actor3) kill(actor2) // and two more exception messages expectMsg(Timeout, ExceptionMessage) expectMsg(Timeout, ExceptionMessage) ping(actor1) ping(actor2) ping(actor3) } "attempt restart when exception during restart" in { val inits = new AtomicInteger(0) val supervisor = system.actorOf(Props(new Supervisor( OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil)))) val dyingProps = Props(new Actor { val init = inits.getAndIncrement() if (init % 3 == 1) throw new IllegalStateException("Don't wanna!") override def preRestart(cause: Throwable, msg: Option[Any]) { if (init % 3 == 0) throw new IllegalStateException("Don't wanna!") } def receive = { case Ping ⇒ sender() ! PongMessage case DieReply ⇒ val e = new RuntimeException("Expected") sender() ! Status.Failure(e) throw e } }) supervisor ! dyingProps val dyingActor = expectMsgType[ActorRef] filterEvents( EventFilter[RuntimeException]("Expected", occurrences = 1), EventFilter[PreRestartException]("Don't wanna!", occurrences = 1), EventFilter[PostRestartException]("Don't wanna!", occurrences = 1)) { intercept[RuntimeException] { Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout) } } dyingActor ! Ping expectMsg(PongMessage) inits.get should be(3) system.stop(supervisor) } "not lose system messages when a NonFatal exception occurs when processing a system message" in { val parent = system.actorOf(Props(new Actor { override val supervisorStrategy = OneForOneStrategy()({ case e: IllegalStateException if e.getMessage == "OHNOES" ⇒ throw e case _ ⇒ SupervisorStrategy.Restart }) val child = context.watch(context.actorOf(Props(new Actor { override def postRestart(reason: Throwable): Unit = testActor ! "child restarted" def receive = { case l: TestLatch ⇒ { Await.ready(l, 5 seconds); throw new IllegalStateException("OHNOES") } case "test" ⇒ sender() ! "child green" } }), "child")) override def postRestart(reason: Throwable): Unit = testActor ! "parent restarted" // Overriding to disable auto-unwatch override def preRestart(reason: Throwable, msg: Option[Any]): Unit = { context.children foreach context.stop postStop() } def receive = { case Terminated(a) if a.path == child.path ⇒ testActor ! "child terminated" case l: TestLatch ⇒ child ! l case "test" ⇒ sender() ! "green" case "testchild" ⇒ child forward "test" case "testchildAndAck" ⇒ child forward "test"; sender() ! "ack" } })) val latch = TestLatch() parent ! latch parent ! "testchildAndAck" expectMsg("ack") filterEvents( EventFilter[IllegalStateException]("OHNOES", occurrences = 1), EventFilter.warning(pattern = "dead.*test", occurrences = 1)) { latch.countDown() } expectMsg("parent restarted") expectMsg("child terminated") parent ! "test" expectMsg("green") parent ! "testchild" expectMsg("child green") } } } Other Akka source code examplesHere is a short list of links related to this Akka SupervisorSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.