|
Akka/Scala example source code file (MultiNodeClusterSpec.scala)
The MultiNodeClusterSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.implicitConversions
import org.scalatest.{ Suite, Outcome, Canceled }
import org.scalatest.exceptions.TestCanceledException
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec }
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.{ ActorSystem, Address }
import akka.event.Logging.ErrorLevel
import scala.concurrent.duration._
import scala.collection.immutable
import java.util.concurrent.ConcurrentHashMap
import akka.remote.DefaultFailureDetectorRegistry
import akka.actor.ActorRef
import akka.actor.Actor
import akka.actor.RootActorPath
object MultiNodeClusterSpec {
def clusterConfigWithFailureDetectorPuppet: Config =
ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").
withFallback(clusterConfig)
def clusterConfig(failureDetectorPuppet: Boolean): Config =
if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig
def clusterConfig: Config = ConfigFactory.parseString("""
akka.actor.provider = akka.cluster.ClusterActorRefProvider
akka.cluster {
jmx.enabled = off
gossip-interval = 200 ms
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 500 ms
periodic-tasks-initial-delay = 300 ms
publish-stats-interval = 0 s # always, when it happens
failure-detector.heartbeat-interval = 500 ms
}
akka.loglevel = INFO
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.remote.log-remote-lifecycle-events = off
akka.loggers = ["akka.testkit.TestEventListener"]
akka.test {
single-expect-default = 5 s
}
""")
// sometimes we need to coordinate test shutdown with messages instead of barriers
object EndActor {
case object SendEnd
case object End
case object EndAck
}
class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor {
import EndActor._
def receive = {
case SendEnd ⇒
target foreach { t ⇒
context.actorSelection(RootActorPath(t) / self.path.elements) ! End
}
case End ⇒
testActor forward End
sender() ! EndAck
case EndAck ⇒
testActor forward EndAck
}
}
}
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec ⇒
override def initialParticipants = roles.size
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
override def atStartup(): Unit = {
startCoroner()
muteLog()
}
override def afterTermination(): Unit = {
stopCoroner()
}
override def expectedTestDuration = 60.seconds
def muteLog(sys: ActorSystem = system): Unit = {
if (!sys.log.isDebugEnabled) {
Seq(".*Metrics collection has started successfully.*",
".*Metrics will be retreived from MBeans.*",
".*Cluster Node.* - registered cluster JMX MBean.*",
".*Cluster Node.* - is starting up.*",
".*Shutting down cluster Node.*",
".*Cluster node successfully shut down.*",
".*Using a dedicated scheduler for cluster.*") foreach { s ⇒
sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
}
muteDeadLetters(
classOf[ClusterHeartbeatSender.Heartbeat],
classOf[ClusterHeartbeatSender.HeartbeatRsp],
classOf[GossipEnvelope],
classOf[GossipStatus],
classOf[MetricsGossipEnvelope],
classOf[ClusterEvent.ClusterMetricsChanged],
classOf[InternalClusterAction.Tick],
classOf[akka.actor.PoisonPill],
classOf[akka.dispatch.sysmsg.DeathWatchNotification],
classOf[akka.remote.transport.AssociationHandle.Disassociated],
// akka.remote.transport.AssociationHandle.Disassociated.getClass,
classOf[akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying],
// akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys)
}
}
def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled)
sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*")))
def muteMarkingAsReachable(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled)
sys.eventStream.publish(Mute(EventFilter.info(pattern = ".*Marking.* as REACHABLE.*")))
override def afterAll(): Unit = {
if (!log.isDebugEnabled) {
muteDeadLetters()()
system.eventStream.setLogLevel(ErrorLevel)
}
super.afterAll()
}
/**
* Lookup the Address for the role.
*
* Implicit conversion from RoleName to Address.
*
* It is cached, which has the implication that stopping
* and then restarting a role (jvm) with another address is not
* supported.
*/
implicit def address(role: RoleName): Address = {
cachedAddresses.get(role) match {
case null ⇒
val address = node(role).address
cachedAddresses.put(role, address)
address
case address ⇒ address
}
}
// Cluster tests are written so that if previous step (test method) failed
// it will most likely not be possible to run next step. This ensures
// fail fast of steps after the first failure.
private var failed = false
override protected def withFixture(test: NoArgTest): Outcome =
if (failed) {
Canceled(new TestCanceledException("Previous step failed", 0))
} else {
val out = super.withFixture(test)
if (!out.isSucceeded)
failed = true
out
}
def clusterView: ClusterReadView = cluster.readView
/**
* Get the cluster node to use.
*/
def cluster: Cluster = Cluster(system)
/**
* Use this method for the initial startup of the cluster node.
*/
def startClusterNode(): Unit = {
if (clusterView.members.isEmpty) {
cluster join myself
awaitAssert(clusterView.members.map(_.address) should contain(address(myself)))
} else
clusterView.self
}
/**
* Initialize the cluster of the specified member
* nodes (roles) and wait until all joined and `Up`.
* First node will be started first and others will join
* the first.
*/
def awaitClusterUp(roles: RoleName*): Unit = {
runOn(roles.head) {
// make sure that the node-to-join is started before other join
startClusterNode()
}
enterBarrier(roles.head.name + "-started")
if (roles.tail.contains(myself)) {
cluster.join(roles.head)
}
if (roles.contains(myself)) {
awaitMembersUp(numberOfMembers = roles.length)
}
enterBarrier(roles.map(_.name).mkString("-") + "-joined")
}
/**
* Join the specific node within the given period by sending repeated join
* requests at periodic intervals until we succeed.
*/
def joinWithin(joinNode: RoleName, max: Duration = remainingOrDefault, interval: Duration = 1.second): Unit = {
def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
clusterView.members.exists { m ⇒ (m.address == member) && status.contains(m.status) }
cluster join joinNode
awaitCond({
clusterView.refreshCurrentState()
if (memberInState(joinNode, List(MemberStatus.up)) &&
memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
true
else {
cluster join joinNode
false
}
}, max, interval)
}
/**
* Assert that the member addresses match the expected addresses in the
* sort order used by the cluster.
*/
def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
import Member.addressOrdering
val members = gotMembers.toIndexedSeq
members.size should be(expectedAddresses.length)
expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address should be(a) }
}
/**
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def assertLeader(nodesInCluster: RoleName*): Unit =
if (nodesInCluster.contains(myself)) assertLeaderIn(nodesInCluster.to[immutable.Seq])
/**
* Assert that the cluster has elected the correct leader
* out of all nodes in the cluster. First
* member in the cluster ring is expected leader.
*
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
if (nodesInCluster.contains(myself)) {
nodesInCluster.length should not be (0)
val expectedLeader = roleOfLeader(nodesInCluster)
val leader = clusterView.leader
val isLeader = leader == Some(clusterView.selfAddress)
assert(isLeader == isNode(expectedLeader),
"expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
clusterView.status should (be(MemberStatus.Up) or be(MemberStatus.Leaving))
}
/**
* Wait until the expected number of members has status Up has been reached.
* Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
*/
def awaitMembersUp(
numberOfMembers: Int,
canNotBePartOfMemberRing: Set[Address] = Set.empty,
timeout: FiniteDuration = 25.seconds): Unit = {
within(timeout) {
if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
awaitAssert(canNotBePartOfMemberRing foreach (a ⇒ clusterView.members.map(_.address) should not contain (a)))
awaitAssert(clusterView.members.size should be(numberOfMembers))
awaitAssert(clusterView.members.map(_.status) should be(Set(MemberStatus.Up)))
// clusterView.leader is updated by LeaderChanged, await that to be updated also
val expectedLeader = clusterView.members.headOption.map(_.address)
awaitAssert(clusterView.leader should be(expectedLeader))
}
}
def awaitAllReachable(): Unit =
awaitAssert(clusterView.unreachableMembers should be(Set.empty))
/**
* Wait until the specified nodes have seen the same gossip overview.
*/
def awaitSeenSameState(addresses: Address*): Unit =
awaitAssert((addresses.toSet -- clusterView.seenBy) should be(Set.empty))
/**
* Leader according to the address ordering of the roles.
* Note that this can only be used for a cluster with all members
* in Up status, i.e. use `awaitMembersUp` before using this method.
* The reason for that is that the cluster leader is preferably a
* member with status Up or Leaving and that information can't
* be determined from the `RoleName`.
*/
def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
nodesInCluster.length should not be (0)
nodesInCluster.sorted.head
}
/**
* Sort the roles in the address order used by the cluster node ring.
*/
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
import Member.addressOrdering
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(address(x), address(y))
}
def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)
/**
* Marks a node as available in the failure detector if
* [[akka.cluster.FailureDetectorPuppet]] is used as
* failure detector.
*/
def markNodeAsAvailable(address: Address): Unit =
failureDetectorPuppet(address) foreach (_.markNodeAsAvailable())
/**
* Marks a node as unavailable in the failure detector if
* [[akka.cluster.FailureDetectorPuppet]] is used as
* failure detector.
*/
def markNodeAsUnavailable(address: Address): Unit = {
if (isFailureDetectorPuppet) {
// before marking it as unavailble there should be at least one heartbeat
// to create the FailureDetectorPuppet in the FailureDetectorRegistry
cluster.failureDetector.heartbeat(address)
failureDetectorPuppet(address) foreach (_.markNodeAsUnavailable())
}
}
private def isFailureDetectorPuppet: Boolean =
cluster.settings.FailureDetectorImplementationClass == classOf[FailureDetectorPuppet].getName
private def failureDetectorPuppet(address: Address): Option[FailureDetectorPuppet] =
cluster.failureDetector match {
case reg: DefaultFailureDetectorRegistry[Address] ⇒
reg.failureDetector(address) collect { case p: FailureDetectorPuppet ⇒ p }
case _ ⇒ None
}
}
Other Akka source code examplesHere is a short list of links related to this Akka MultiNodeClusterSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.