alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (MultiNodeClusterSpec.scala)

This example Akka source code file (MultiNodeClusterSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorsystem, address, akka, boolean, cluster, concurrent, end, endack, node, option, rolename, test, testing, time, unit

The MultiNodeClusterSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.cluster

import language.implicitConversions
import org.scalatest.{ Suite, Outcome, Canceled }
import org.scalatest.exceptions.TestCanceledException
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec }
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.{ ActorSystem, Address }
import akka.event.Logging.ErrorLevel
import scala.concurrent.duration._
import scala.collection.immutable
import java.util.concurrent.ConcurrentHashMap
import akka.remote.DefaultFailureDetectorRegistry
import akka.actor.ActorRef
import akka.actor.Actor
import akka.actor.RootActorPath

object MultiNodeClusterSpec {

  def clusterConfigWithFailureDetectorPuppet: Config =
    ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").
      withFallback(clusterConfig)

  def clusterConfig(failureDetectorPuppet: Boolean): Config =
    if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig

  def clusterConfig: Config = ConfigFactory.parseString("""
    akka.actor.provider = akka.cluster.ClusterActorRefProvider
    akka.cluster {
      jmx.enabled                         = off
      gossip-interval                     = 200 ms
      leader-actions-interval             = 200 ms
      unreachable-nodes-reaper-interval   = 500 ms
      periodic-tasks-initial-delay        = 300 ms
      publish-stats-interval              = 0 s # always, when it happens
      failure-detector.heartbeat-interval = 500 ms
    }
    akka.loglevel = INFO
    akka.log-dead-letters = off
    akka.log-dead-letters-during-shutdown = off
    akka.remote.log-remote-lifecycle-events = off
    akka.loggers = ["akka.testkit.TestEventListener"]
    akka.test {
      single-expect-default = 5 s
    }
    """)

  // sometimes we need to coordinate test shutdown with messages instead of barriers
  object EndActor {
    case object SendEnd
    case object End
    case object EndAck
  }

  class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor {
    import EndActor._
    def receive = {
      case SendEnd ⇒
        target foreach { t ⇒
          context.actorSelection(RootActorPath(t) / self.path.elements) ! End
        }
      case End ⇒
        testActor forward End
        sender() ! EndAck
      case EndAck ⇒
        testActor forward EndAck
    }
  }
}

trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec ⇒

  override def initialParticipants = roles.size

  private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]

  override def atStartup(): Unit = {
    startCoroner()
    muteLog()
  }

  override def afterTermination(): Unit = {
    stopCoroner()
  }

  override def expectedTestDuration = 60.seconds

  def muteLog(sys: ActorSystem = system): Unit = {
    if (!sys.log.isDebugEnabled) {
      Seq(".*Metrics collection has started successfully.*",
        ".*Metrics will be retreived from MBeans.*",
        ".*Cluster Node.* - registered cluster JMX MBean.*",
        ".*Cluster Node.* - is starting up.*",
        ".*Shutting down cluster Node.*",
        ".*Cluster node successfully shut down.*",
        ".*Using a dedicated scheduler for cluster.*") foreach { s ⇒
          sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
        }

      muteDeadLetters(
        classOf[ClusterHeartbeatSender.Heartbeat],
        classOf[ClusterHeartbeatSender.HeartbeatRsp],
        classOf[GossipEnvelope],
        classOf[GossipStatus],
        classOf[MetricsGossipEnvelope],
        classOf[ClusterEvent.ClusterMetricsChanged],
        classOf[InternalClusterAction.Tick],
        classOf[akka.actor.PoisonPill],
        classOf[akka.dispatch.sysmsg.DeathWatchNotification],
        classOf[akka.remote.transport.AssociationHandle.Disassociated],
        //        akka.remote.transport.AssociationHandle.Disassociated.getClass,
        classOf[akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying],
        //        akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
        classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys)

    }
  }

  def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit =
    if (!sys.log.isDebugEnabled)
      sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*")))

  def muteMarkingAsReachable(sys: ActorSystem = system): Unit =
    if (!sys.log.isDebugEnabled)
      sys.eventStream.publish(Mute(EventFilter.info(pattern = ".*Marking.* as REACHABLE.*")))

  override def afterAll(): Unit = {
    if (!log.isDebugEnabled) {
      muteDeadLetters()()
      system.eventStream.setLogLevel(ErrorLevel)
    }
    super.afterAll()
  }

  /**
   * Lookup the Address for the role.
   *
   * Implicit conversion from RoleName to Address.
   *
   * It is cached, which has the implication that stopping
   * and then restarting a role (jvm) with another address is not
   * supported.
   */
  implicit def address(role: RoleName): Address = {
    cachedAddresses.get(role) match {
      case null ⇒
        val address = node(role).address
        cachedAddresses.put(role, address)
        address
      case address ⇒ address
    }
  }

  // Cluster tests are written so that if previous step (test method) failed
  // it will most likely not be possible to run next step. This ensures
  // fail fast of steps after the first failure.
  private var failed = false
  override protected def withFixture(test: NoArgTest): Outcome =
    if (failed) {
      Canceled(new TestCanceledException("Previous step failed", 0))
    } else {
      val out = super.withFixture(test)
      if (!out.isSucceeded)
        failed = true
      out
    }

  def clusterView: ClusterReadView = cluster.readView

  /**
   * Get the cluster node to use.
   */
  def cluster: Cluster = Cluster(system)

  /**
   * Use this method for the initial startup of the cluster node.
   */
  def startClusterNode(): Unit = {
    if (clusterView.members.isEmpty) {
      cluster join myself
      awaitAssert(clusterView.members.map(_.address) should contain(address(myself)))
    } else
      clusterView.self
  }

  /**
   * Initialize the cluster of the specified member
   * nodes (roles) and wait until all joined and `Up`.
   * First node will be started first  and others will join
   * the first.
   */
  def awaitClusterUp(roles: RoleName*): Unit = {
    runOn(roles.head) {
      // make sure that the node-to-join is started before other join
      startClusterNode()
    }
    enterBarrier(roles.head.name + "-started")
    if (roles.tail.contains(myself)) {
      cluster.join(roles.head)
    }
    if (roles.contains(myself)) {
      awaitMembersUp(numberOfMembers = roles.length)
    }
    enterBarrier(roles.map(_.name).mkString("-") + "-joined")
  }

  /**
   * Join the specific node within the given period by sending repeated join
   * requests at periodic intervals until we succeed.
   */
  def joinWithin(joinNode: RoleName, max: Duration = remainingOrDefault, interval: Duration = 1.second): Unit = {
    def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
      clusterView.members.exists { m ⇒ (m.address == member) && status.contains(m.status) }

    cluster join joinNode
    awaitCond({
      clusterView.refreshCurrentState()
      if (memberInState(joinNode, List(MemberStatus.up)) &&
        memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
        true
      else {
        cluster join joinNode
        false
      }
    }, max, interval)
  }

  /**
   * Assert that the member addresses match the expected addresses in the
   * sort order used by the cluster.
   */
  def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
    import Member.addressOrdering
    val members = gotMembers.toIndexedSeq
    members.size should be(expectedAddresses.length)
    expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address should be(a) }
  }

  /**
   * Note that this can only be used for a cluster with all members
   * in Up status, i.e. use `awaitMembersUp` before using this method.
   * The reason for that is that the cluster leader is preferably a
   * member with status Up or Leaving and that information can't
   * be determined from the `RoleName`.
   */
  def assertLeader(nodesInCluster: RoleName*): Unit =
    if (nodesInCluster.contains(myself)) assertLeaderIn(nodesInCluster.to[immutable.Seq])

  /**
   * Assert that the cluster has elected the correct leader
   * out of all nodes in the cluster. First
   * member in the cluster ring is expected leader.
   *
   * Note that this can only be used for a cluster with all members
   * in Up status, i.e. use `awaitMembersUp` before using this method.
   * The reason for that is that the cluster leader is preferably a
   * member with status Up or Leaving and that information can't
   * be determined from the `RoleName`.
   */
  def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
    if (nodesInCluster.contains(myself)) {
      nodesInCluster.length should not be (0)
      val expectedLeader = roleOfLeader(nodesInCluster)
      val leader = clusterView.leader
      val isLeader = leader == Some(clusterView.selfAddress)
      assert(isLeader == isNode(expectedLeader),
        "expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
      clusterView.status should (be(MemberStatus.Up) or be(MemberStatus.Leaving))
    }

  /**
   * Wait until the expected number of members has status Up has been reached.
   * Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
   */
  def awaitMembersUp(
    numberOfMembers: Int,
    canNotBePartOfMemberRing: Set[Address] = Set.empty,
    timeout: FiniteDuration = 25.seconds): Unit = {
    within(timeout) {
      if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
        awaitAssert(canNotBePartOfMemberRing foreach (a ⇒ clusterView.members.map(_.address) should not contain (a)))
      awaitAssert(clusterView.members.size should be(numberOfMembers))
      awaitAssert(clusterView.members.map(_.status) should be(Set(MemberStatus.Up)))
      // clusterView.leader is updated by LeaderChanged, await that to be updated also
      val expectedLeader = clusterView.members.headOption.map(_.address)
      awaitAssert(clusterView.leader should be(expectedLeader))
    }
  }

  def awaitAllReachable(): Unit =
    awaitAssert(clusterView.unreachableMembers should be(Set.empty))

  /**
   * Wait until the specified nodes have seen the same gossip overview.
   */
  def awaitSeenSameState(addresses: Address*): Unit =
    awaitAssert((addresses.toSet -- clusterView.seenBy) should be(Set.empty))

  /**
   * Leader according to the address ordering of the roles.
   * Note that this can only be used for a cluster with all members
   * in Up status, i.e. use `awaitMembersUp` before using this method.
   * The reason for that is that the cluster leader is preferably a
   * member with status Up or Leaving and that information can't
   * be determined from the `RoleName`.
   */
  def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
    nodesInCluster.length should not be (0)
    nodesInCluster.sorted.head
  }

  /**
   * Sort the roles in the address order used by the cluster node ring.
   */
  implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
    import Member.addressOrdering
    def compare(x: RoleName, y: RoleName) = addressOrdering.compare(address(x), address(y))
  }

  def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)

  /**
   * Marks a node as available in the failure detector if
   * [[akka.cluster.FailureDetectorPuppet]] is used as
   * failure detector.
   */
  def markNodeAsAvailable(address: Address): Unit =
    failureDetectorPuppet(address) foreach (_.markNodeAsAvailable())

  /**
   * Marks a node as unavailable in the failure detector if
   * [[akka.cluster.FailureDetectorPuppet]] is used as
   * failure detector.
   */
  def markNodeAsUnavailable(address: Address): Unit = {
    if (isFailureDetectorPuppet) {
      // before marking it as unavailble there should be at least one heartbeat
      // to create the FailureDetectorPuppet in the FailureDetectorRegistry
      cluster.failureDetector.heartbeat(address)
      failureDetectorPuppet(address) foreach (_.markNodeAsUnavailable())
    }
  }

  private def isFailureDetectorPuppet: Boolean =
    cluster.settings.FailureDetectorImplementationClass == classOf[FailureDetectorPuppet].getName

  private def failureDetectorPuppet(address: Address): Option[FailureDetectorPuppet] =
    cluster.failureDetector match {
      case reg: DefaultFailureDetectorRegistry[Address] ⇒
        reg.failureDetector(address) collect { case p: FailureDetectorPuppet ⇒ p }
      case _ ⇒ None
    }

}

Other Akka source code examples

Here is a short list of links related to this Akka MultiNodeClusterSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.