|
Akka/Scala example source code file (ClusterSingletonManagerChaosSpec.scala)
The ClusterSingletonManagerChaosSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Terminated
import akka.actor.ActorSelection
object ClusterSingletonManagerChaosSpec extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
val sixth = role("sixth")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
"""))
case object EchoStarted
/**
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor {
testActor ! EchoStarted
def receive = {
case _ ⇒ sender() ! self
}
}
}
class ClusterSingletonManagerChaosMultiJvmNode1 extends ClusterSingletonManagerChaosSpec
class ClusterSingletonManagerChaosMultiJvmNode2 extends ClusterSingletonManagerChaosSpec
class ClusterSingletonManagerChaosMultiJvmNode3 extends ClusterSingletonManagerChaosSpec
class ClusterSingletonManagerChaosMultiJvmNode4 extends ClusterSingletonManagerChaosSpec
class ClusterSingletonManagerChaosMultiJvmNode5 extends ClusterSingletonManagerChaosSpec
class ClusterSingletonManagerChaosMultiJvmNode6 extends ClusterSingletonManagerChaosSpec
class ClusterSingletonManagerChaosMultiJvmNode7 extends ClusterSingletonManagerChaosSpec
class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonManagerChaosSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerChaosSpec._
override def initialParticipants = roles.size
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
createSingleton()
}
}
def createSingleton(): ActorRef = {
system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
singletonName = "echo",
terminationMessage = PoisonPill,
role = None),
name = "singleton")
}
def crash(roles: RoleName*): Unit = {
runOn(controller) {
roles foreach { r ⇒
log.info("Shutdown [{}]", node(r).address)
testConductor.exit(r, 0).await
}
}
}
def echo(oldest: RoleName): ActorSelection =
system.actorSelection(RootActorPath(node(oldest).address) / "user" / "singleton" / "echo")
def awaitMemberUp(memberProbe: TestProbe, nodes: RoleName*): Unit = {
runOn(nodes.filterNot(_ == nodes.head): _*) {
memberProbe.expectMsgType[MemberUp](15.seconds).member.address should be(node(nodes.head).address)
}
runOn(nodes.head) {
memberProbe.receiveN(nodes.size, 15.seconds).collect { case MemberUp(m) ⇒ m.address }.toSet should be(
nodes.map(node(_).address).toSet)
}
enterBarrier(nodes.head.name + "-up")
}
"A ClusterSingletonManager in chaotic cluster" must {
"startup 6 node cluster" in within(60 seconds) {
val memberProbe = TestProbe()
Cluster(system).subscribe(memberProbe.ref, classOf[MemberUp])
memberProbe.expectMsgClass(classOf[CurrentClusterState])
join(first, first)
awaitMemberUp(memberProbe, first)
runOn(first) {
expectMsg(EchoStarted)
}
enterBarrier("first-started")
join(second, first)
awaitMemberUp(memberProbe, second, first)
join(third, first)
awaitMemberUp(memberProbe, third, second, first)
join(fourth, first)
awaitMemberUp(memberProbe, fourth, third, second, first)
join(fifth, first)
awaitMemberUp(memberProbe, fifth, fourth, third, second, first)
join(sixth, first)
awaitMemberUp(memberProbe, sixth, fifth, fourth, third, second, first)
runOn(controller) {
echo(first) ! "hello"
expectMsgType[ActorRef](3.seconds).path.address should be(node(first).address)
}
enterBarrier("first-verified")
}
"take over when three oldest nodes crash in 6 nodes cluster" in within(90 seconds) {
// mute logging of deadLetters during shutdown of systems
if (!log.isDebugEnabled)
system.eventStream.publish(Mute(DeadLettersFilter[Any]))
enterBarrier("logs-muted")
crash(first, second, third)
enterBarrier("after-crash")
runOn(fourth) {
expectMsg(EchoStarted)
}
enterBarrier("fourth-active")
runOn(controller) {
echo(fourth) ! "hello"
expectMsgType[ActorRef](3.seconds).path.address should be(node(fourth).address)
}
enterBarrier("fourth-verified")
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ClusterSingletonManagerChaosSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.