|
Akka/Scala example source code file (UnreachableNodeJoinsAgainSpec.scala)
The UnreachableNodeJoinsAgainSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.MultiNodeClusterSpec.EndActor
object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(ConfigFactory.parseString(
"""
akka.remote.log-remote-lifecycle-events = off
""").withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)))
testTransport(on = true)
}
class UnreachableNodeJoinsAgainMultiJvmNode1 extends UnreachableNodeJoinsAgainSpec
class UnreachableNodeJoinsAgainMultiJvmNode2 extends UnreachableNodeJoinsAgainSpec
class UnreachableNodeJoinsAgainMultiJvmNode3 extends UnreachableNodeJoinsAgainSpec
class UnreachableNodeJoinsAgainMultiJvmNode4 extends UnreachableNodeJoinsAgainSpec
abstract class UnreachableNodeJoinsAgainSpec
extends MultiNodeSpec(UnreachableNodeJoinsAgainMultiNodeConfig)
with MultiNodeClusterSpec {
import UnreachableNodeJoinsAgainMultiNodeConfig._
muteMarkingAsUnreachable()
def allBut(role: RoleName, roles: immutable.Seq[RoleName] = roles): immutable.Seq[RoleName] = {
roles.filterNot(_ == role)
}
lazy val master = second
lazy val victim = fourth
var endBarrierNumber = 0
def endBarrier(): Unit = {
endBarrierNumber += 1
enterBarrier("after_" + endBarrierNumber)
}
"A cluster of " + roles.size + " members" must {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
endBarrier()
}
"mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in {
// let them send at least one heartbeat to each other after the gossip convergence
// because for new joining nodes we remove them from the failure detector when
// receive gossip
Thread.sleep(2.seconds.dilated.toMillis)
runOn(first) {
// pull network for victim node from all nodes
allBut(victim).foreach { roleName ⇒
testConductor.blackhole(victim, roleName, Direction.Both).await
}
}
enterBarrier("unplug_victim")
val allButVictim = allBut(victim, roles)
runOn(victim) {
allButVictim.foreach(markNodeAsUnavailable(_))
within(30 seconds) {
// victim becomes all alone
awaitAssert {
val members = clusterView.members
clusterView.unreachableMembers.size should be(roles.size - 1)
}
clusterView.unreachableMembers.map(_.address) should be((allButVictim map address).toSet)
}
}
runOn(allButVictim: _*) {
markNodeAsUnavailable(victim)
within(30 seconds) {
// victim becomes unreachable
awaitAssert {
val members = clusterView.members
clusterView.unreachableMembers.size should be(1)
}
awaitSeenSameState(allButVictim map address: _*)
// still one unreachable
clusterView.unreachableMembers.size should be(1)
clusterView.unreachableMembers.head.address should be(node(victim).address)
clusterView.unreachableMembers.head.status should be(MemberStatus.Up)
}
}
endBarrier()
}
"mark the node as DOWN" taggedAs LongRunningTest in {
runOn(master) {
cluster down victim
}
val allButVictim = allBut(victim, roles)
runOn(allButVictim: _*) {
// eventually removed
awaitMembersUp(roles.size - 1, Set(victim))
awaitAssert(clusterView.unreachableMembers should be(Set.empty), 15 seconds)
awaitAssert(clusterView.members.map(_.address) should be((allButVictim map address).toSet))
}
endBarrier()
}
"allow fresh node with same host:port to join again when the network is plugged back in" taggedAs LongRunningTest in {
val expectedNumberOfMembers = roles.size
// victim actor system will be shutdown, not part of testConductor any more
// so we can't use barriers to synchronize with it
val masterAddress = address(master)
runOn(master) {
system.actorOf(Props(classOf[EndActor], testActor, None), "end")
}
enterBarrier("end-actor-created")
runOn(first) {
// put the network back in
allBut(victim).foreach { roleName ⇒
testConductor.passThrough(victim, roleName, Direction.Both).await
}
}
enterBarrier("plug_in_victim")
runOn(first) {
// will shutdown ActorSystem of victim
testConductor.shutdown(victim)
}
runOn(victim) {
val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
system.awaitTermination(10 seconds)
// create new ActorSystem with same host:port
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.netty.tcp {
hostname = ${victimAddress.host.get}
port = ${victimAddress.port.get}
}
""").withFallback(system.settings.config))
try {
Cluster(freshSystem).join(masterAddress)
within(15 seconds) {
awaitAssert(Cluster(freshSystem).readView.members.map(_.address) should contain(victimAddress))
awaitAssert(Cluster(freshSystem).readView.members.size should be(expectedNumberOfMembers))
awaitAssert(Cluster(freshSystem).readView.members.map(_.status) should be(Set(MemberStatus.Up)))
}
// signal to master node that victim is done
val endProbe = TestProbe()(freshSystem)
val endActor = freshSystem.actorOf(Props(classOf[EndActor], endProbe.ref, Some(masterAddress)), "end")
endActor ! EndActor.SendEnd
endProbe.expectMsg(EndActor.EndAck)
} finally {
shutdown(freshSystem)
}
// no barrier here, because it is not part of testConductor roles any more
}
runOn(allBut(victim): _*) {
awaitMembersUp(expectedNumberOfMembers)
// don't end the test until the freshSystem is done
runOn(master) {
expectMsg(20 seconds, EndActor.End)
}
endBarrier()
}
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka UnreachableNodeJoinsAgainSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.