alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ClusterDeathWatchSpec.scala)

This example Akka source code file (ClusterDeathWatchSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actoridentity, akka, clusterdeathwatchmultijvmnode1, clusterdeathwatchspec, concurrent, identify, longrunningtest, multinodeclusterspec, remote, rootactorpath, some, terminated, test, testing, time

The ClusterDeathWatchSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.cluster

import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Props
import akka.actor.Actor
import akka.actor.Address
import akka.actor.RootActorPath
import akka.actor.Terminated
import akka.actor.Address
import akka.remote.RemoteActorRef
import java.util.concurrent.TimeoutException
import akka.actor.ActorSystemImpl
import akka.actor.ActorIdentity
import akka.actor.Identify
import akka.actor.ActorRef
import akka.remote.RemoteWatcher
import akka.actor.ActorSystem
import akka.cluster.MultiNodeClusterSpec.EndActor
import akka.actor.Deploy

object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig {
  val first = role("first")
  val second = role("second")
  val third = role("third")
  val fourth = role("fourth")
  val fifth = role("fifth")

  commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))

  deployOn(fourth, """/hello.remote = "@first@" """)

  class Hello extends Actor {
    def receive = Actor.emptyBehavior
  }

}

class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec
class ClusterDeathWatchMultiJvmNode2 extends ClusterDeathWatchSpec
class ClusterDeathWatchMultiJvmNode3 extends ClusterDeathWatchSpec
class ClusterDeathWatchMultiJvmNode4 extends ClusterDeathWatchSpec
class ClusterDeathWatchMultiJvmNode5 extends ClusterDeathWatchSpec

abstract class ClusterDeathWatchSpec
  extends MultiNodeSpec(ClusterDeathWatchMultiJvmSpec)
  with MultiNodeClusterSpec with ImplicitSender {

  import ClusterDeathWatchMultiJvmSpec._

  override def atStartup(): Unit = {
    super.atStartup()
    if (!log.isDebugEnabled) {
      muteMarkingAsUnreachable()
      system.eventStream.publish(Mute(EventFilter[java.net.UnknownHostException]()))
    }
  }

  lazy val remoteWatcher: ActorRef = {
    system.actorSelection("/system/remote-watcher") ! Identify(None)
    expectMsgType[ActorIdentity].ref.get
  }

  "An actor watching a remote actor in the cluster" must {
    "receive Terminated when watched node becomes Down/Removed" taggedAs LongRunningTest in within(20 seconds) {
      awaitClusterUp(first, second, third, fourth)
      enterBarrier("cluster-up")

      runOn(first) {
        enterBarrier("subjected-started")

        val path2 = RootActorPath(second) / "user" / "subject"
        val path3 = RootActorPath(third) / "user" / "subject"
        val watchEstablished = TestLatch(2)
        system.actorOf(Props(new Actor {
          context.actorSelection(path2) ! Identify(path2)
          context.actorSelection(path3) ! Identify(path3)

          def receive = {
            case ActorIdentity(`path2`, Some(ref)) ⇒
              context.watch(ref)
              watchEstablished.countDown
            case ActorIdentity(`path3`, Some(ref)) ⇒
              context.watch(ref)
              watchEstablished.countDown
            case Terminated(actor) ⇒ testActor ! actor.path
          }
        }).withDeploy(Deploy.local), name = "observer1")

        watchEstablished.await
        enterBarrier("watch-established")
        expectMsg(path2)
        expectNoMsg(2 seconds)
        enterBarrier("second-terminated")

        markNodeAsUnavailable(third)
        awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(third)))
        cluster.down(third)
        // removed
        awaitAssert(clusterView.members.map(_.address) should not contain (address(third)))
        awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (address(third)))
        expectMsg(path3)
        enterBarrier("third-terminated")

      }

      runOn(second, third, fourth) {
        system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject")
        enterBarrier("subjected-started")
        enterBarrier("watch-established")
        runOn(third) {
          markNodeAsUnavailable(second)
          awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(second)))
          cluster.down(second)
          // removed
          awaitAssert(clusterView.members.map(_.address) should not contain (address(second)))
          awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (address(second)))
        }
        enterBarrier("second-terminated")
        enterBarrier("third-terminated")
      }

      runOn(fifth) {
        enterBarrier("subjected-started")
        enterBarrier("watch-established")
        enterBarrier("second-terminated")
        enterBarrier("third-terminated")
      }

      enterBarrier("after-1")

    }

    "receive Terminated when watched path doesn't exist" taggedAs LongRunningTest ignore {
      Thread.sleep(5000)
      runOn(first) {
        val path = RootActorPath(second) / "user" / "non-existing"
        system.actorOf(Props(new Actor {
          context.watch(context.actorFor(path))
          def receive = {
            case t: Terminated ⇒ testActor ! t.actor.path
          }
        }).withDeploy(Deploy.local), name = "observer3")

        expectMsg(path)
      }

      enterBarrier("after-2")
    }

    "be able to watch actor before node joins cluster, ClusterRemoteWatcher takes over from RemoteWatcher" taggedAs LongRunningTest in within(20 seconds) {
      runOn(fifth) {
        system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }).withDeploy(Deploy.local), name = "subject5")
      }
      enterBarrier("subjected-started")

      runOn(first) {
        system.actorSelection(RootActorPath(fifth) / "user" / "subject5") ! Identify("subject5")
        val subject5 = expectMsgType[ActorIdentity].ref.get
        watch(subject5)

        // fifth is not cluster member, so the watch is handled by the RemoteWatcher
        awaitAssert {
          remoteWatcher ! RemoteWatcher.Stats
          expectMsgType[RemoteWatcher.Stats].watchingRefs should contain((subject5, testActor))
        }
      }
      enterBarrier("remote-watch")

      // second and third are already removed
      awaitClusterUp(first, fourth, fifth)

      runOn(first) {
        // fifth is member, so the watch is handled by the ClusterRemoteWatcher,
        // and cleaned up from RemoteWatcher
        awaitAssert {
          remoteWatcher ! RemoteWatcher.Stats
          expectMsgType[RemoteWatcher.Stats].watchingRefs.map {
            case (watchee, watcher) ⇒ watchee.path.name
          } should not contain ("subject5")
        }
      }

      enterBarrier("cluster-watch")

      runOn(fourth) {
        markNodeAsUnavailable(fifth)
        awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(fifth)))
        cluster.down(fifth)
        // removed
        awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (address(fifth)))
        awaitAssert(clusterView.members.map(_.address) should not contain (address(fifth)))
      }

      enterBarrier("fifth-terminated")
      runOn(first) {
        expectMsgType[Terminated].actor.path.name should be("subject5")
      }

      enterBarrier("after-3")
    }

    "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in within(20 seconds) {
      // fourth actor system will be shutdown, not part of testConductor any more
      // so we can't use barriers to synchronize with it
      val firstAddress = address(first)
      runOn(first) {
        system.actorOf(Props(classOf[EndActor], testActor, None), "end")
      }
      enterBarrier("end-actor-created")

      runOn(fourth) {
        val hello = system.actorOf(Props[Hello], "hello")
        hello.isInstanceOf[RemoteActorRef] should be(true)
        hello.path.address should be(address(first))
        watch(hello)
        enterBarrier("hello-deployed")

        markNodeAsUnavailable(first)
        awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(first)))
        cluster.down(first)
        // removed
        awaitAssert(clusterView.unreachableMembers.map(_.address) should not contain (address(first)))
        awaitAssert(clusterView.members.map(_.address) should not contain (address(first)))

        expectTerminated(hello)

        enterBarrier("first-unavailable")

        val timeout = remainingOrDefault
        try system.awaitTermination(timeout) catch {
          case _: TimeoutException ⇒
            fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout,
              system.asInstanceOf[ActorSystemImpl].printTree))
        }

        // signal to the first node that fourth is done
        val endSystem = ActorSystem("EndSystem", system.settings.config)
        try {
          val endProbe = TestProbe()(endSystem)
          val endActor = endSystem.actorOf(Props(classOf[EndActor], endProbe.ref, Some(firstAddress)), "end")
          endActor ! EndActor.SendEnd
          endProbe.expectMsg(EndActor.EndAck)

        } finally {
          shutdown(endSystem, 10 seconds)
        }
        // no barrier here, because it is not part of testConductor roles any more

      }

      runOn(first, second, third, fifth) {
        enterBarrier("hello-deployed")
        enterBarrier("first-unavailable")

        // don't end the test until the fourth is done
        runOn(first) {
          // fourth system will be shutdown, remove to not participate in barriers any more
          testConductor.shutdown(fourth).await
          expectMsg(EndActor.End)
        }

        enterBarrier("after-4")
      }

    }

  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ClusterDeathWatchSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.