|
Akka/Scala example source code file (DeathWatchSpec.scala)
The DeathWatchSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.actor import language.postfixOps import akka.dispatch.sysmsg.{ DeathWatchNotification, Failed } import akka.pattern.ask import akka.testkit._ import scala.concurrent.duration._ import scala.concurrent.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec object DeathWatchSpec { def props(target: ActorRef, testActor: ActorRef) = Props(new Actor { context.watch(target) def receive = { case t: Terminated ⇒ testActor forward WrappedTerminated(t) case x ⇒ testActor forward x } }) /** * Forwarding `Terminated` to non-watching testActor is not possible, * and therefore the `Terminated` message is wrapped. */ final case class WrappedTerminated(t: Terminated) final case class W(ref: ActorRef) final case class U(ref: ActorRef) final case class FF(fail: Failed) final case class Latches(t1: TestLatch, t2: TestLatch) extends NoSerializationVerificationNeeded } trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout ⇒ import DeathWatchSpec._ lazy val supervisor = system.actorOf(Props(new Supervisor(SupervisorStrategy.defaultStrategy)), "watchers") def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds) "The Death Watch" must { def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") { case WrappedTerminated(Terminated(`actorRef`)) ⇒ true } "notify with one Terminated message when an Actor is stopped" in { val terminal = system.actorOf(Props.empty) startWatching(terminal) ! "hallo" expectMsg("hallo") terminal ! PoisonPill expectTerminationOf(terminal) } "notify with one Terminated message when an Actor is already dead" in { val terminal = system.actorOf(Props.empty) terminal ! PoisonPill startWatching(terminal) expectTerminationOf(terminal) } "notify with all monitors with one Terminated message when an Actor is stopped" in { val terminal = system.actorOf(Props.empty) val monitor1, monitor2, monitor3 = startWatching(terminal) terminal ! PoisonPill expectTerminationOf(terminal) expectTerminationOf(terminal) expectTerminationOf(terminal) system.stop(monitor1) system.stop(monitor2) system.stop(monitor3) } "notify with _current_ monitors with one Terminated message when an Actor is stopped" in { val terminal = system.actorOf(Props.empty) val monitor1, monitor3 = startWatching(terminal) val monitor2 = system.actorOf(Props(new Actor { context.watch(terminal) context.unwatch(terminal) def receive = { case "ping" ⇒ sender() ! "pong" case t: Terminated ⇒ testActor ! WrappedTerminated(t) } }).withDeploy(Deploy.local)) monitor2 ! "ping" expectMsg("pong") //Needs to be here since watch and unwatch are asynchronous terminal ! PoisonPill expectTerminationOf(terminal) expectTerminationOf(terminal) system.stop(monitor1) system.stop(monitor2) system.stop(monitor3) } "notify with a Terminated message once when an Actor is stopped but not when restarted" in { filterException[ActorKilledException] { val supervisor = system.actorOf(Props(new Supervisor( OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception]))))) val terminalProps = Props(new Actor { def receive = { case x ⇒ sender() ! x } }) val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) val monitor = startWatching(terminal) terminal ! Kill terminal ! Kill Await.result(terminal ? "foo", timeout.duration) should be("foo") terminal ! Kill expectTerminationOf(terminal) terminal.isTerminated should be(true) system.stop(supervisor) } } "fail a monitor which does not handle Terminated()" in { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { val strategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { testActor.tell(FF(Failed(child, cause, 0)), child) super.handleFailure(context, child, cause, stats, children) } } val supervisor = system.actorOf(Props(new Supervisor(strategy)).withDeploy(Deploy.local)) val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration) val brother = Await.result((supervisor ? Props(new Actor { context.watch(failed) def receive = Actor.emptyBehavior })).mapTo[ActorRef], timeout.duration) startWatching(brother) failed ! Kill val result = receiveWhile(3 seconds, messages = 3) { case FF(Failed(_, _: ActorKilledException, _)) if lastSender eq failed ⇒ 1 case FF(Failed(_, DeathPactException(`failed`), _)) if lastSender eq brother ⇒ 2 case WrappedTerminated(Terminated(`brother`)) ⇒ 3 } testActor.isTerminated should not be true result should be(Seq(1, 2, 3)) } } "be able to watch a child with the same name after the old died" in { val parent = system.actorOf(Props(new Actor { def receive = { case "NKOTB" ⇒ val currentKid = context.watch(context.actorOf(Props(new Actor { def receive = { case "NKOTB" ⇒ context stop self } }), "kid")) currentKid forward "NKOTB" context become { case Terminated(`currentKid`) ⇒ testActor ! "GREEN" context unbecome } } }).withDeploy(Deploy.local)) parent ! "NKOTB" expectMsg("GREEN") parent ! "NKOTB" expectMsg("GREEN") } "only notify when watching" in { val subject = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior })) testActor.asInstanceOf[InternalActorRef] .sendSystemMessage(DeathWatchNotification(subject, existenceConfirmed = true, addressTerminated = false)) // the testActor is not watching subject and will not receive a Terminated msg expectNoMsg } "discard Terminated when unwatched between sysmsg and processing" in { class Watcher extends Actor { def receive = { case W(ref) ⇒ context watch ref case U(ref) ⇒ context unwatch ref case Latches(t1: TestLatch, t2: TestLatch) ⇒ t1.countDown() Await.ready(t2, 3.seconds) } } val t1, t2 = TestLatch() val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher") val p = TestProbe() w ! W(p.ref) w ! Latches(t1, t2) Await.ready(t1, 3.seconds) watch(p.ref) system stop p.ref expectTerminated(p.ref) w ! U(p.ref) t2.countDown() /* * now the Watcher will * - process the DeathWatchNotification and enqueue Terminated * - process the unwatch command * - process the Terminated * If it receives the Terminated it will die, which in fact it should not */ w ! Identify(()) expectMsg(ActorIdentity((), Some(w))) w ! Identify(()) expectMsg(ActorIdentity((), Some(w))) } } } Other Akka source code examplesHere is a short list of links related to this Akka DeathWatchSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.