|
Akka/Scala example source code file (ClusterDeathWatchSpec.scala)
The ClusterDeathWatchSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.testkit.TestEvent._ import akka.actor.Props import akka.actor.Actor import akka.actor.Address import akka.actor.RootActorPath import akka.actor.Terminated import akka.actor.Address import akka.remote.RemoteActorRef import java.util.concurrent.TimeoutException import akka.actor.ActorSystemImpl import akka.actor.ActorIdentity import akka.actor.Identify import akka.actor.ActorRef import akka.remote.RemoteWatcher import akka.actor.ActorSystem import akka.cluster.MultiNodeClusterSpec.EndActor import akka.actor.Deploy object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") val fifth = role("fifth") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) deployOn(fourth, """/hello.remote = "@first@" """) class Hello extends Actor { def receive = Actor.emptyBehavior } } class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec class ClusterDeathWatchMultiJvmNode2 extends ClusterDeathWatchSpec class ClusterDeathWatchMultiJvmNode3 extends ClusterDeathWatchSpec class ClusterDeathWatchMultiJvmNode4 extends ClusterDeathWatchSpec class ClusterDeathWatchMultiJvmNode5 extends ClusterDeathWatchSpec abstract class ClusterDeathWatchSpec extends MultiNodeSpec(ClusterDeathWatchMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender { import ClusterDeathWatchMultiJvmSpec._ override def atStartup(): Unit = { super.atStartup() if (!log.isDebugEnabled) { muteMarkingAsUnreachable() system.eventStream.publish(Mute(EventFilter[java.net.UnknownHostException]())) } } lazy val remoteWatcher: ActorRef = { system.actorSelection("/system/remote-watcher") ! Identify(None) expectMsgType[ActorIdentity].ref.get } "An actor watching a remote actor in the cluster" must { "receive Terminated when watched node becomes Down/Removed" taggedAs LongRunningTest in within(20 seconds) { awaitClusterUp(first, second, third, fourth) enterBarrier("cluster-up") runOn(first) { enterBarrier("subjected-started") val path2 = RootActorPath(second) / "user" / "subject" val path3 = RootActorPath(third) / "user" / "subject" val watchEstablished = TestLatch(2) system.actorOf(Props(new Actor { context.actorSelection(path2) ! Identify(path2) context.actorSelection(path3) ! Identify(path3) def receive = { case ActorIdentity(`path2`, Some(ref)) ⇒ context.watch(ref) watchEstablished.countDown case ActorIdentity(`path3`, Some(ref)) ⇒ context.watch(ref) watchEstablished.countDown case Terminated(actor) ⇒ testActor ! actor.path } }).withDeploy(Deploy.local), name = "observer1") watchEstablished.await enterBarrier("watch-established") expectMsg(path2) expectNoMsg(2 seconds) enterBarrier("second-terminated") markNodeAsUnavailable(third) awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(third))) cluster.down(third) // removed awaitAssert(clusterView.members.map(_.address) should not contain (address(third))) awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (address(third))) expectMsg(path3) enterBarrier("third-terminated") } runOn(second, third, fourth) { system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject") enterBarrier("subjected-started") enterBarrier("watch-established") runOn(third) { markNodeAsUnavailable(second) awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(second))) cluster.down(second) // removed awaitAssert(clusterView.members.map(_.address) should not contain (address(second))) awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (address(second))) } enterBarrier("second-terminated") enterBarrier("third-terminated") } runOn(fifth) { enterBarrier("subjected-started") enterBarrier("watch-established") enterBarrier("second-terminated") enterBarrier("third-terminated") } enterBarrier("after-1") } "receive Terminated when watched path doesn't exist" taggedAs LongRunningTest ignore { Thread.sleep(5000) runOn(first) { val path = RootActorPath(second) / "user" / "non-existing" system.actorOf(Props(new Actor { context.watch(context.actorFor(path)) def receive = { case t: Terminated ⇒ testActor ! t.actor.path } }).withDeploy(Deploy.local), name = "observer3") expectMsg(path) } enterBarrier("after-2") } "be able to watch actor before node joins cluster, ClusterRemoteWatcher takes over from RemoteWatcher" taggedAs LongRunningTest in within(20 seconds) { runOn(fifth) { system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject5") } enterBarrier("subjected-started") runOn(first) { system.actorSelection(RootActorPath(fifth) / "user" / "subject5") ! Identify("subject5") val subject5 = expectMsgType[ActorIdentity].ref.get watch(subject5) // fifth is not cluster member, so the watch is handled by the RemoteWatcher awaitAssert { remoteWatcher ! RemoteWatcher.Stats expectMsgType[RemoteWatcher.Stats].watchingRefs should contain((subject5, testActor)) } } enterBarrier("remote-watch") // second and third are already removed awaitClusterUp(first, fourth, fifth) runOn(first) { // fifth is member, so the watch is handled by the ClusterRemoteWatcher, // and cleaned up from RemoteWatcher awaitAssert { remoteWatcher ! RemoteWatcher.Stats expectMsgType[RemoteWatcher.Stats].watchingRefs.map { case (watchee, watcher) ⇒ watchee.path.name } should not contain ("subject5") } } enterBarrier("cluster-watch") runOn(fourth) { markNodeAsUnavailable(fifth) awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(fifth))) cluster.down(fifth) // removed awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (address(fifth))) awaitAssert(clusterView.members.map(_.address) should not contain (address(fifth))) } enterBarrier("fifth-terminated") runOn(first) { expectMsgType[Terminated].actor.path.name should be("subject5") } enterBarrier("after-3") } "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in within(20 seconds) { // fourth actor system will be shutdown, not part of testConductor any more // so we can't use barriers to synchronize with it val firstAddress = address(first) runOn(first) { system.actorOf(Props(classOf[EndActor], testActor, None), "end") } enterBarrier("end-actor-created") runOn(fourth) { val hello = system.actorOf(Props[Hello], "hello") hello.isInstanceOf[RemoteActorRef] should be(true) hello.path.address should be(address(first)) watch(hello) enterBarrier("hello-deployed") markNodeAsUnavailable(first) awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(first))) cluster.down(first) // removed awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (address(first))) awaitAssert(clusterView.members.map(_.address) should not contain (address(first))) expectTerminated(hello) enterBarrier("first-unavailable") val timeout = remainingOrDefault try system.awaitTermination(timeout) catch { case _: TimeoutException ⇒ fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout, system.asInstanceOf[ActorSystemImpl].printTree)) } // signal to the first node that fourth is done val endSystem = ActorSystem("EndSystem", system.settings.config) try { val endProbe = TestProbe()(endSystem) val endActor = endSystem.actorOf(Props(classOf[EndActor], endProbe.ref, Some(firstAddress)), "end") endActor ! EndActor.SendEnd endProbe.expectMsg(EndActor.EndAck) } finally { shutdown(endSystem, 10 seconds) } // no barrier here, because it is not part of testConductor roles any more } runOn(first, second, third, fifth) { enterBarrier("hello-deployed") enterBarrier("first-unavailable") // don't end the test until the fourth is done runOn(first) { // fourth system will be shutdown, remove to not participate in barriers any more testConductor.shutdown(fourth).await expectMsg(EndActor.End) } enterBarrier("after-4") } } } } Other Akka source code examplesHere is a short list of links related to this Akka ClusterDeathWatchSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.