|
Akka/Scala example source code file (TransitionSpec.scala)
The TransitionSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import language.implicitConversions import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address import akka.remote.testconductor.RoleName import MemberStatus._ import InternalClusterAction._ object TransitionMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks akka.cluster.publish-stats-interval = 0 s # always, when it happens """)). withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } class TransitionMultiJvmNode1 extends TransitionSpec class TransitionMultiJvmNode2 extends TransitionSpec class TransitionMultiJvmNode3 extends TransitionSpec abstract class TransitionSpec extends MultiNodeSpec(TransitionMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender { import TransitionMultiJvmSpec._ muteMarkingAsUnreachable() // sorted in the order used by the cluster def leader(roles: RoleName*) = roles.sorted.head def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail def memberStatus(address: Address): MemberStatus = { val statusOption = (clusterView.members ++ clusterView.unreachableMembers).collectFirst { case m if m.address == address ⇒ m.status } statusOption.getOrElse(Removed) } def memberAddresses: Set[Address] = clusterView.members.map(_.address) def members: Set[RoleName] = memberAddresses.flatMap(roleName(_)) def seenLatestGossip: Set[RoleName] = clusterView.seenBy flatMap roleName def awaitSeen(addresses: Address*): Unit = awaitAssert { (seenLatestGossip map address) should be(addresses.toSet) } def awaitMembers(addresses: Address*): Unit = awaitAssert { clusterView.refreshCurrentState() memberAddresses should be(addresses.toSet) } def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitAssert { clusterView.refreshCurrentState() memberStatus(address) should be(status) } def leaderActions(): Unit = cluster.clusterCore ! LeaderActionsTick def reapUnreachable(): Unit = cluster.clusterCore ! ReapUnreachableTick // DSL sugar for `role1 gossipTo role2` implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role) var gossipBarrierCounter = 0 class RoleWrapper(fromRole: RoleName) { def gossipTo(toRole: RoleName): Unit = { gossipBarrierCounter += 1 runOn(toRole) { val oldCount = clusterView.latestStats.gossipStats.receivedGossipCount enterBarrier("before-gossip-" + gossipBarrierCounter) awaitCond { clusterView.latestStats.gossipStats.receivedGossipCount != oldCount // received gossip } // gossip chat will synchronize the views awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(fromRole) { enterBarrier("before-gossip-" + gossipBarrierCounter) // send gossip cluster.clusterCore ! InternalClusterAction.SendGossipTo(toRole) // gossip chat will synchronize the views awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(roles.filterNot(r ⇒ r == fromRole || r == toRole): _*) { enterBarrier("before-gossip-" + gossipBarrierCounter) enterBarrier("after-gossip-" + gossipBarrierCounter) } } } "A Cluster" must { "start nodes as singleton clusters" taggedAs LongRunningTest in { runOn(first) { cluster join myself awaitMemberStatus(myself, Joining) leaderActions() awaitMemberStatus(myself, Up) awaitCond(clusterView.isSingletonCluster) } enterBarrier("after-1") } "perform correct transitions when second joining first" taggedAs LongRunningTest in { runOn(second) { cluster.join(first) } runOn(first, second) { // gossip chat from the join will synchronize the views awaitMembers(first, second) awaitMemberStatus(first, Up) awaitMemberStatus(second, Joining) awaitAssert(seenLatestGossip should be(Set(first, second))) } enterBarrier("convergence-joining-2") runOn(first) { leaderActions() awaitMemberStatus(first, Up) awaitMemberStatus(second, Up) } enterBarrier("leader-actions-2") first gossipTo second runOn(first, second) { // gossip chat will synchronize the views awaitMemberStatus(second, Up) awaitAssert(seenLatestGossip should be(Set(first, second))) awaitMemberStatus(first, Up) } enterBarrier("after-2") } "perform correct transitions when third joins second" taggedAs LongRunningTest in { runOn(third) { cluster.join(second) } runOn(second, third) { // gossip chat from the join will synchronize the views awaitAssert(seenLatestGossip should be(Set(second, third))) } enterBarrier("third-joined-second") second gossipTo first runOn(first, second) { // gossip chat will synchronize the views awaitMembers(first, second, third) awaitMemberStatus(third, Joining) awaitMemberStatus(second, Up) awaitAssert(seenLatestGossip should be(Set(first, second, third))) } first gossipTo third runOn(first, second, third) { awaitMembers(first, second, third) awaitMemberStatus(first, Up) awaitMemberStatus(second, Up) awaitMemberStatus(third, Joining) awaitAssert(seenLatestGossip should be(Set(first, second, third))) } enterBarrier("convergence-joining-3") val leader12 = leader(first, second) val (other1, other2) = { val tmp = roles.filterNot(_ == leader12); (tmp.head, tmp.tail.head) } runOn(leader12) { leaderActions() awaitMemberStatus(first, Up) awaitMemberStatus(second, Up) awaitMemberStatus(third, Up) } enterBarrier("leader-actions-3") // leader gossipTo first non-leader leader12 gossipTo other1 runOn(other1) { awaitMemberStatus(third, Up) awaitAssert(seenLatestGossip should be(Set(leader12, myself))) } // first non-leader gossipTo the other non-leader other1 gossipTo other2 runOn(other1) { // send gossip cluster.clusterCore ! InternalClusterAction.SendGossipTo(other2) } runOn(other2) { awaitMemberStatus(third, Up) awaitAssert(seenLatestGossip should be(Set(first, second, third))) } // first non-leader gossipTo the leader other1 gossipTo leader12 runOn(first, second, third) { awaitMemberStatus(first, Up) awaitMemberStatus(second, Up) awaitMemberStatus(third, Up) awaitAssert(seenLatestGossip should be(Set(first, second, third))) } enterBarrier("after-3") } "perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in { runOn(third) { markNodeAsUnavailable(second) reapUnreachable() awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(second))) awaitAssert(seenLatestGossip should be(Set(third))) } enterBarrier("after-second-unavailble") third gossipTo first runOn(first, third) { awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(second))) } runOn(first) { cluster.down(second) } enterBarrier("after-second-down") first gossipTo third runOn(first, third) { awaitAssert(clusterView.unreachableMembers.map(_.address) should contain(address(second))) awaitMemberStatus(second, Down) awaitAssert(seenLatestGossip should be(Set(first, third))) } enterBarrier("after-6") } } } Other Akka source code examplesHere is a short list of links related to this Akka TransitionSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.