alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (UnreachableNodeJoinsAgainSpec.scala)

This example Akka source code file (UnreachableNodeJoinsAgainSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

a, actor, akka, concurrent, longrunningtest, multinodeclusterspec, multinodespec, none, rolename, test, testing, testprobe, time, unreachable, unreachablenodejoinsagainmultijvmnode1, unreachablenodejoinsagainmultijvmnode2, unreachablenodejoinsagainmultijvmnode3, unreachablenodejoinsagainspec

The UnreachableNodeJoinsAgainSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.cluster

import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.MultiNodeClusterSpec.EndActor

object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig {
  val first = role("first")
  val second = role("second")
  val third = role("third")
  val fourth = role("fourth")

  commonConfig(ConfigFactory.parseString(
    """
      akka.remote.log-remote-lifecycle-events = off
    """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)))

  testTransport(on = true)

}

class UnreachableNodeJoinsAgainMultiJvmNode1 extends UnreachableNodeJoinsAgainSpec
class UnreachableNodeJoinsAgainMultiJvmNode2 extends UnreachableNodeJoinsAgainSpec
class UnreachableNodeJoinsAgainMultiJvmNode3 extends UnreachableNodeJoinsAgainSpec
class UnreachableNodeJoinsAgainMultiJvmNode4 extends UnreachableNodeJoinsAgainSpec

abstract class UnreachableNodeJoinsAgainSpec
  extends MultiNodeSpec(UnreachableNodeJoinsAgainMultiNodeConfig)
  with MultiNodeClusterSpec {

  import UnreachableNodeJoinsAgainMultiNodeConfig._

  muteMarkingAsUnreachable()

  def allBut(role: RoleName, roles: immutable.Seq[RoleName] = roles): immutable.Seq[RoleName] = {
    roles.filterNot(_ == role)
  }

  lazy val master = second
  lazy val victim = fourth

  var endBarrierNumber = 0
  def endBarrier(): Unit = {
    endBarrierNumber += 1
    enterBarrier("after_" + endBarrierNumber)
  }

  "A cluster of " + roles.size + " members" must {

    "reach initial convergence" taggedAs LongRunningTest in {
      awaitClusterUp(roles: _*)
      endBarrier()
    }

    "mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in {
      // let them send at least one heartbeat to each other after the gossip convergence
      // because for new joining nodes we remove them from the failure detector when
      // receive gossip
      Thread.sleep(2.seconds.dilated.toMillis)

      runOn(first) {
        // pull network for victim node from all nodes
        allBut(victim).foreach { roleName ⇒
          testConductor.blackhole(victim, roleName, Direction.Both).await
        }
      }

      enterBarrier("unplug_victim")

      val allButVictim = allBut(victim, roles)
      runOn(victim) {
        allButVictim.foreach(markNodeAsUnavailable(_))
        within(30 seconds) {
          // victim becomes all alone
          awaitAssert {
            val members = clusterView.members
            clusterView.unreachableMembers.size should be(roles.size - 1)
          }
          clusterView.unreachableMembers.map(_.address) should be((allButVictim map address).toSet)
        }
      }

      runOn(allButVictim: _*) {
        markNodeAsUnavailable(victim)
        within(30 seconds) {
          // victim becomes unreachable
          awaitAssert {
            val members = clusterView.members
            clusterView.unreachableMembers.size should be(1)
          }
          awaitSeenSameState(allButVictim map address: _*)
          // still one unreachable
          clusterView.unreachableMembers.size should be(1)
          clusterView.unreachableMembers.head.address should be(node(victim).address)
          clusterView.unreachableMembers.head.status should be(MemberStatus.Up)
        }
      }

      endBarrier()
    }

    "mark the node as DOWN" taggedAs LongRunningTest in {
      runOn(master) {
        cluster down victim
      }

      val allButVictim = allBut(victim, roles)
      runOn(allButVictim: _*) {
        // eventually removed
        awaitMembersUp(roles.size - 1, Set(victim))
        awaitAssert(clusterView.unreachableMembers should be(Set.empty), 15 seconds)
        awaitAssert(clusterView.members.map(_.address) should be((allButVictim map address).toSet))

      }

      endBarrier()
    }

    "allow fresh node with same host:port to join again when the network is plugged back in" taggedAs LongRunningTest in {
      val expectedNumberOfMembers = roles.size

      // victim actor system will be shutdown, not part of testConductor any more
      // so we can't use barriers to synchronize with it
      val masterAddress = address(master)
      runOn(master) {
        system.actorOf(Props(classOf[EndActor], testActor, None), "end")
      }
      enterBarrier("end-actor-created")

      runOn(first) {
        // put the network back in
        allBut(victim).foreach { roleName ⇒
          testConductor.passThrough(victim, roleName, Direction.Both).await
        }
      }

      enterBarrier("plug_in_victim")

      runOn(first) {
        // will shutdown ActorSystem of victim
        testConductor.shutdown(victim)
      }

      runOn(victim) {
        val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
        system.awaitTermination(10 seconds)
        // create new ActorSystem with same host:port
        val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
            akka.remote.netty.tcp {
              hostname = ${victimAddress.host.get}
              port = ${victimAddress.port.get}
            }
            """).withFallback(system.settings.config))

        try {
          Cluster(freshSystem).join(masterAddress)
          within(15 seconds) {
            awaitAssert(Cluster(freshSystem).readView.members.map(_.address) should contain(victimAddress))
            awaitAssert(Cluster(freshSystem).readView.members.size should be(expectedNumberOfMembers))
            awaitAssert(Cluster(freshSystem).readView.members.map(_.status) should be(Set(MemberStatus.Up)))
          }

          // signal to master node that victim is done
          val endProbe = TestProbe()(freshSystem)
          val endActor = freshSystem.actorOf(Props(classOf[EndActor], endProbe.ref, Some(masterAddress)), "end")
          endActor ! EndActor.SendEnd
          endProbe.expectMsg(EndActor.EndAck)

        } finally {
          shutdown(freshSystem)
        }
        // no barrier here, because it is not part of testConductor roles any more
      }

      runOn(allBut(victim): _*) {
        awaitMembersUp(expectedNumberOfMembers)
        // don't end the test until the freshSystem is done
        runOn(master) {
          expectMsg(20 seconds, EndActor.End)
        }
        endBarrier()
      }

    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka UnreachableNodeJoinsAgainSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.