Akka/Scala example source code file (RemoteNodeDeathWatchSpec.scala)
The RemoteNodeDeathWatchSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.remote import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorIdentity import akka.actor.ActorRef import akka.actor.Identify import akka.actor.PoisonPill import akka.actor.Props import akka.actor.Terminated import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ object RemoteNodeDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(""" akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off ## Use a tighter setting than the default, otherwise it takes 20s for DeathWatch to trigger akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3 s """))) final case class WatchIt(watchee: ActorRef) final case class UnwatchIt(watchee: ActorRef) case object Ack /** * Forwarding `Terminated` to non-watching testActor is not possible, * and therefore the `Terminated` message is wrapped. */ final case class WrappedTerminated(t: Terminated) class ProbeActor(testActor: ActorRef) extends Actor { def receive = { case WatchIt(watchee) ⇒ context watch watchee sender() ! Ack case UnwatchIt(watchee) ⇒ context unwatch watchee sender() ! Ack case t: Terminated ⇒ testActor forward WrappedTerminated(t) case msg ⇒ testActor forward msg } } } // Several different variations of the test class RemoteNodeDeathWatchFastMultiJvmNode1 extends RemoteNodeDeathWatchFastSpec class RemoteNodeDeathWatchFastMultiJvmNode2 extends RemoteNodeDeathWatchFastSpec class RemoteNodeDeathWatchFastMultiJvmNode3 extends RemoteNodeDeathWatchFastSpec abstract class RemoteNodeDeathWatchFastSpec extends RemoteNodeDeathWatchSpec { override def scenario = "fast" } class RemoteNodeDeathWatchSlowMultiJvmNode1 extends RemoteNodeDeathWatchSlowSpec class RemoteNodeDeathWatchSlowMultiJvmNode2 extends RemoteNodeDeathWatchSlowSpec class RemoteNodeDeathWatchSlowMultiJvmNode3 extends RemoteNodeDeathWatchSlowSpec abstract class RemoteNodeDeathWatchSlowSpec extends RemoteNodeDeathWatchSpec { override def scenario = "slow" override def sleep(): Unit = Thread.sleep(3000) } abstract class RemoteNodeDeathWatchSpec extends MultiNodeSpec(RemoteNodeDeathWatchMultiJvmSpec) with STMultiNodeSpec with ImplicitSender { import RemoteNodeDeathWatchMultiJvmSpec._ import RemoteWatcher._ def scenario: String // Possible to override to let them heartbeat for a while. def sleep(): Unit = () override def initialParticipants = roles.size muteDeadLetters(Heartbeat.getClass)() lazy val remoteWatcher: ActorRef = { system.actorSelection("/system/remote-watcher") ! Identify(None) expectMsgType[ActorIdentity].ref.get } def identify(role: RoleName, actorName: String): ActorRef = { system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) expectMsgType[ActorIdentity].ref.get } def assertCleanup(timeout: FiniteDuration = 5.seconds): Unit = { within(timeout) { awaitAssert { remoteWatcher ! Stats expectMsg(Stats.empty) } } } "RemoteNodeDeathWatch (" + scenario + ")" must { "receive Terminated when remote actor is stopped" taggedAs LongRunningTest in { runOn(first) { val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher1") enterBarrier("actors-started-1") val subject = identify(second, "subject1") watcher ! WatchIt(subject) expectMsg(1 second, Ack) subject ! "hello1" enterBarrier("hello1-message-sent") enterBarrier("watch-established-1") sleep() expectMsgType[WrappedTerminated].t.actor should be(subject) } runOn(second) { val subject = system.actorOf(Props(classOf[ProbeActor], testActor), "subject1") enterBarrier("actors-started-1") enterBarrier("hello1-message-sent") expectMsg(3 seconds, "hello1") enterBarrier("watch-established-1") sleep() system.stop(subject) } runOn(third) { enterBarrier("actors-started-1") enterBarrier("hello1-message-sent") enterBarrier("watch-established-1") } enterBarrier("terminated-verified-1") // verify that things are cleaned up, and heartbeating is stopped assertCleanup() expectNoMsg(2.seconds) assertCleanup() enterBarrier("after-1") } "cleanup after watch/unwatch" taggedAs LongRunningTest in { runOn(first) { val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher2") enterBarrier("actors-started-2") val subject = identify(second, "subject2") watcher ! WatchIt(subject) expectMsg(1 second, Ack) enterBarrier("watch-2") sleep() watcher ! UnwatchIt(subject) expectMsg(1 second, Ack) enterBarrier("unwatch-2") } runOn(second) { system.actorOf(Props(classOf[ProbeActor], testActor), "subject2") } runOn(second, third) { enterBarrier("actors-started-2") enterBarrier("watch-2") enterBarrier("unwatch-2") } // verify that things are cleaned up, and heartbeating is stopped assertCleanup() expectNoMsg(2.seconds) assertCleanup() enterBarrier("after-2") } "cleanup after bi-directional watch/unwatch" taggedAs LongRunningTest in { runOn(first, second) { val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher3") system.actorOf(Props(classOf[ProbeActor], testActor), "subject3") enterBarrier("actors-started-3") val other = if (myself == first) second else first val subject = identify(other, "subject3") watcher ! WatchIt(subject) expectMsg(1 second, Ack) enterBarrier("watch-3") sleep() watcher ! UnwatchIt(subject) expectMsg(1 second, Ack) enterBarrier("unwatch-3") } runOn(third) { enterBarrier("actors-started-3") enterBarrier("watch-3") enterBarrier("unwatch-3") } // verify that things are cleaned up, and heartbeating is stopped assertCleanup() expectNoMsg(2.seconds) assertCleanup() enterBarrier("after-3") } "cleanup after bi-directional watch/stop/unwatch" taggedAs LongRunningTest in { runOn(first, second) { val watcher1 = system.actorOf(Props(classOf[ProbeActor], testActor), "w1") val watcher2 = system.actorOf(Props(classOf[ProbeActor], testActor), "w2") val s1 = system.actorOf(Props(classOf[ProbeActor], testActor), "s1") val s2 = system.actorOf(Props(classOf[ProbeActor], testActor), "s2") enterBarrier("actors-started-4") val other = if (myself == first) second else first val subject1 = identify(other, "s1") val subject2 = identify(other, "s2") watcher1 ! WatchIt(subject1) expectMsg(1 second, Ack) watcher2 ! WatchIt(subject2) expectMsg(1 second, Ack) enterBarrier("watch-4") sleep() watcher1 ! UnwatchIt(subject1) expectMsg(1 second, Ack) enterBarrier("unwatch-s1-4") system.stop(s1) expectNoMsg(2 seconds) enterBarrier("stop-s1-4") system.stop(s2) enterBarrier("stop-s2-4") expectMsgType[WrappedTerminated].t.actor should be(subject2) } runOn(third) { enterBarrier("actors-started-4") enterBarrier("watch-4") enterBarrier("unwatch-s1-4") enterBarrier("stop-s1-4") enterBarrier("stop-s2-4") } // verify that things are cleaned up, and heartbeating is stopped assertCleanup() expectNoMsg(2.seconds) assertCleanup() enterBarrier("after-4") } "cleanup after stop" taggedAs LongRunningTest in { runOn(first) { val p1, p2, p3 = TestProbe() val a1 = system.actorOf(Props(classOf[ProbeActor], p1.ref), "a1") val a2 = system.actorOf(Props(classOf[ProbeActor], p2.ref), "a2") val a3 = system.actorOf(Props(classOf[ProbeActor], p3.ref), "a3") enterBarrier("actors-started-5") val b1 = identify(second, "b1") val b2 = identify(second, "b2") val b3 = identify(second, "b3") a1 ! WatchIt(b1) expectMsg(1 second, Ack) a1 ! WatchIt(b2) expectMsg(1 second, Ack) a2 ! WatchIt(b2) expectMsg(1 second, Ack) a3 ! WatchIt(b3) expectMsg(1 second, Ack) sleep() a2 ! UnwatchIt(b2) expectMsg(1 second, Ack) enterBarrier("watch-established-5") sleep() a1 ! PoisonPill a2 ! PoisonPill a3 ! PoisonPill enterBarrier("stopped-5") enterBarrier("terminated-verified-5") // verify that things are cleaned up, and heartbeating is stopped assertCleanup() expectNoMsg(2.seconds) assertCleanup() } runOn(second) { val p1, p2, p3 = TestProbe() val b1 = system.actorOf(Props(classOf[ProbeActor], p1.ref), "b1") val b2 = system.actorOf(Props(classOf[ProbeActor], p2.ref), "b2") val b3 = system.actorOf(Props(classOf[ProbeActor], p3.ref), "b3") enterBarrier("actors-started-5") val a1 = identify(first, "a1") val a2 = identify(first, "a2") val a3 = identify(first, "a3") b1 ! WatchIt(a1) expectMsg(1 second, Ack) b1 ! WatchIt(a2) expectMsg(1 second, Ack) b2 ! WatchIt(a2) expectMsg(1 second, Ack) b3 ! WatchIt(a3) expectMsg(1 second, Ack) b3 ! WatchIt(a3) expectMsg(1 second, Ack) sleep() b2 ! UnwatchIt(a2) expectMsg(1 second, Ack) enterBarrier("watch-established-5") enterBarrier("stopped-5") p1.receiveN(2, 5 seconds).collect { case WrappedTerminated(t) ⇒ t.actor }.toSet should be(Set(a1, a2)) p3.expectMsgType[WrappedTerminated](5 seconds).t.actor should be(a3) p2.expectNoMsg(2 seconds) enterBarrier("terminated-verified-5") // verify that things are cleaned up, and heartbeating is stopped assertCleanup() expectNoMsg(2.seconds) p1.expectNoMsg(100 millis) p2.expectNoMsg(100 millis) p3.expectNoMsg(100 millis) assertCleanup() } runOn(third) { enterBarrier("actors-started-5") enterBarrier("watch-established-5") enterBarrier("stopped-5") enterBarrier("terminated-verified-5") } enterBarrier("after-5") } "receive Terminated when watched node crash" taggedAs LongRunningTest in { runOn(first) { val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher6") val watcher2 = system.actorOf(Props(classOf[ProbeActor], system.deadLetters)) enterBarrier("actors-started-6") val subject = identify(second, "subject6") watcher ! WatchIt(subject) expectMsg(1 second, Ack) watcher2 ! WatchIt(subject) expectMsg(1 second, Ack) subject ! "hello6" // testing with this watch/unwatch of watcher2 to make sure that the unwatch doesn't // remove the first watch watcher2 ! UnwatchIt(subject) expectMsg(1 second, Ack) enterBarrier("watch-established-6") sleep() log.info("exit second") testConductor.exit(second, 0).await expectMsgType[WrappedTerminated](15 seconds).t.actor should be(subject) // verify that things are cleaned up, and heartbeating is stopped assertCleanup() expectNoMsg(2.seconds) assertCleanup() } runOn(second) { system.actorOf(Props(classOf[ProbeActor], testActor), "subject6") enterBarrier("actors-started-6") expectMsg(3 seconds, "hello6") enterBarrier("watch-established-6") } runOn(third) { enterBarrier("actors-started-6") enterBarrier("watch-established-6") } enterBarrier("after-6") } "cleanup when watching node crash" taggedAs LongRunningTest in { runOn(third) { val watcher = system.actorOf(Props(classOf[ProbeActor], testActor), "watcher7") enterBarrier("actors-started-7") val subject = identify(first, "subject7") watcher ! WatchIt(subject) expectMsg(1 second, Ack) subject ! "hello7" enterBarrier("watch-established-7") } runOn(first) { system.actorOf(Props(classOf[ProbeActor], testActor), "subject7") enterBarrier("actors-started-7") expectMsg(3 seconds, "hello7") enterBarrier("watch-established-7") sleep() log.info("exit third") testConductor.exit(third, 0).await // verify that things are cleaned up, and heartbeating is stopped assertCleanup(20 seconds) expectNoMsg(2.seconds) assertCleanup() } enterBarrier("after-7") } } } Other Akka source code examplesHere is a short list of links related to this Akka RemoteNodeDeathWatchSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.