|
Akka/Scala example source code file (DeathWatchSpec.scala)
The DeathWatchSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import language.postfixOps
import akka.dispatch.sysmsg.{ DeathWatchNotification, Failed }
import akka.pattern.ask
import akka.testkit._
import scala.concurrent.duration._
import scala.concurrent.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec
object DeathWatchSpec {
def props(target: ActorRef, testActor: ActorRef) = Props(new Actor {
context.watch(target)
def receive = {
case t: Terminated ⇒ testActor forward WrappedTerminated(t)
case x ⇒ testActor forward x
}
})
/**
* Forwarding `Terminated` to non-watching testActor is not possible,
* and therefore the `Terminated` message is wrapped.
*/
final case class WrappedTerminated(t: Terminated)
final case class W(ref: ActorRef)
final case class U(ref: ActorRef)
final case class FF(fail: Failed)
final case class Latches(t1: TestLatch, t2: TestLatch) extends NoSerializationVerificationNeeded
}
trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout ⇒
import DeathWatchSpec._
lazy val supervisor = system.actorOf(Props(new Supervisor(SupervisorStrategy.defaultStrategy)), "watchers")
def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds)
"The Death Watch" must {
def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") {
case WrappedTerminated(Terminated(`actorRef`)) ⇒ true
}
"notify with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props.empty)
startWatching(terminal) ! "hallo"
expectMsg("hallo")
terminal ! PoisonPill
expectTerminationOf(terminal)
}
"notify with one Terminated message when an Actor is already dead" in {
val terminal = system.actorOf(Props.empty)
terminal ! PoisonPill
startWatching(terminal)
expectTerminationOf(terminal)
}
"notify with all monitors with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props.empty)
val monitor1, monitor2, monitor3 = startWatching(terminal)
terminal ! PoisonPill
expectTerminationOf(terminal)
expectTerminationOf(terminal)
expectTerminationOf(terminal)
system.stop(monitor1)
system.stop(monitor2)
system.stop(monitor3)
}
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props.empty)
val monitor1, monitor3 = startWatching(terminal)
val monitor2 = system.actorOf(Props(new Actor {
context.watch(terminal)
context.unwatch(terminal)
def receive = {
case "ping" ⇒ sender() ! "pong"
case t: Terminated ⇒ testActor ! WrappedTerminated(t)
}
}).withDeploy(Deploy.local))
monitor2 ! "ping"
expectMsg("pong") //Needs to be here since watch and unwatch are asynchronous
terminal ! PoisonPill
expectTerminationOf(terminal)
expectTerminationOf(terminal)
system.stop(monitor1)
system.stop(monitor2)
system.stop(monitor3)
}
"notify with a Terminated message once when an Actor is stopped but not when restarted" in {
filterException[ActorKilledException] {
val supervisor = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception])))))
val terminalProps = Props(new Actor { def receive = { case x ⇒ sender() ! x } })
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
val monitor = startWatching(terminal)
terminal ! Kill
terminal ! Kill
Await.result(terminal ? "foo", timeout.duration) should be("foo")
terminal ! Kill
expectTerminationOf(terminal)
terminal.isTerminated should be(true)
system.stop(supervisor)
}
}
"fail a monitor which does not handle Terminated()" in {
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
val strategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) {
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
testActor.tell(FF(Failed(child, cause, 0)), child)
super.handleFailure(context, child, cause, stats, children)
}
}
val supervisor = system.actorOf(Props(new Supervisor(strategy)).withDeploy(Deploy.local))
val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration)
val brother = Await.result((supervisor ? Props(new Actor {
context.watch(failed)
def receive = Actor.emptyBehavior
})).mapTo[ActorRef], timeout.duration)
startWatching(brother)
failed ! Kill
val result = receiveWhile(3 seconds, messages = 3) {
case FF(Failed(_, _: ActorKilledException, _)) if lastSender eq failed ⇒ 1
case FF(Failed(_, DeathPactException(`failed`), _)) if lastSender eq brother ⇒ 2
case WrappedTerminated(Terminated(`brother`)) ⇒ 3
}
testActor.isTerminated should not be true
result should be(Seq(1, 2, 3))
}
}
"be able to watch a child with the same name after the old died" in {
val parent = system.actorOf(Props(new Actor {
def receive = {
case "NKOTB" ⇒
val currentKid = context.watch(context.actorOf(Props(new Actor { def receive = { case "NKOTB" ⇒ context stop self } }), "kid"))
currentKid forward "NKOTB"
context become {
case Terminated(`currentKid`) ⇒
testActor ! "GREEN"
context unbecome
}
}
}).withDeploy(Deploy.local))
parent ! "NKOTB"
expectMsg("GREEN")
parent ! "NKOTB"
expectMsg("GREEN")
}
"only notify when watching" in {
val subject = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }))
testActor.asInstanceOf[InternalActorRef]
.sendSystemMessage(DeathWatchNotification(subject, existenceConfirmed = true, addressTerminated = false))
// the testActor is not watching subject and will not receive a Terminated msg
expectNoMsg
}
"discard Terminated when unwatched between sysmsg and processing" in {
class Watcher extends Actor {
def receive = {
case W(ref) ⇒ context watch ref
case U(ref) ⇒ context unwatch ref
case Latches(t1: TestLatch, t2: TestLatch) ⇒
t1.countDown()
Await.ready(t2, 3.seconds)
}
}
val t1, t2 = TestLatch()
val w = system.actorOf(Props(new Watcher).withDeploy(Deploy.local), "myDearWatcher")
val p = TestProbe()
w ! W(p.ref)
w ! Latches(t1, t2)
Await.ready(t1, 3.seconds)
watch(p.ref)
system stop p.ref
expectTerminated(p.ref)
w ! U(p.ref)
t2.countDown()
/*
* now the Watcher will
* - process the DeathWatchNotification and enqueue Terminated
* - process the unwatch command
* - process the Terminated
* If it receives the Terminated it will die, which in fact it should not
*/
w ! Identify(())
expectMsg(ActorIdentity((), Some(w)))
w ! Identify(())
expectMsg(ActorIdentity((), Some(w)))
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka DeathWatchSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.