|
Akka/Scala example source code file (ClusterRoundRobinSpec.scala)
The ClusterRoundRobinSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster.routing import language.postfixOps import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Address import akka.actor.Props import akka.actor.Terminated import akka.cluster.MultiNodeClusterSpec import akka.pattern.ask import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.routing.FromConfig import akka.routing.RoundRobinPool import akka.routing.ActorRefRoutee import akka.routing.ActorSelectionRoutee import akka.routing.RoutedActorRef import akka.routing.GetRoutees import akka.routing.Routees object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig { class SomeActor(routeeType: RouteeType) extends Actor { def this() = this(PoolRoutee) def receive = { case "hit" ⇒ sender() ! Reply(routeeType, self) } } final case class Reply(routeeType: RouteeType, ref: ActorRef) sealed trait RouteeType extends Serializable object PoolRoutee extends RouteeType object GroupRoutee extends RouteeType val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.actor.deployment { /router1 { router = round-robin-pool nr-of-instances = 10 cluster { enabled = on max-nr-of-instances-per-node = 2 } } /router3 { router = round-robin-pool nr-of-instances = 10 cluster { enabled = on max-nr-of-instances-per-node = 1 allow-local-routees = off } } /router4 { router = round-robin-group nr-of-instances = 10 routees.paths = ["/user/myserviceA", "/user/myserviceB"] cluster.enabled = on } /router5 { router = round-robin-pool nr-of-instances = 10 cluster { enabled = on use-role = a } } } """)). withFallback(MultiNodeClusterSpec.clusterConfig)) nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]""")) nodeConfig(third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]""")) testTransport(on = true) } class ClusterRoundRobinMultiJvmNode1 extends ClusterRoundRobinSpec class ClusterRoundRobinMultiJvmNode2 extends ClusterRoundRobinSpec class ClusterRoundRobinMultiJvmNode3 extends ClusterRoundRobinSpec class ClusterRoundRobinMultiJvmNode4 extends ClusterRoundRobinSpec abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with DefaultTimeout { import ClusterRoundRobinMultiJvmSpec._ lazy val router1 = system.actorOf(FromConfig.props(Props[SomeActor]), "router1") lazy val router2 = system.actorOf(ClusterRouterPool(RoundRobinPool(nrOfInstances = 0), ClusterRouterPoolSettings(totalInstances = 3, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). props(Props[SomeActor]), "router2") lazy val router3 = system.actorOf(FromConfig.props(Props[SomeActor]), "router3") lazy val router4 = system.actorOf(FromConfig.props(), "router4") lazy val router5 = system.actorOf(RoundRobinPool(nrOfInstances = 0).props(Props[SomeActor]), "router5") def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = { val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) (receiveWhile(5 seconds, messages = expectedReplies) { case Reply(`routeeType`, ref) ⇒ fullAddress(ref) }).foldLeft(zero) { case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) } } /** * Fills in self address for local ActorRef */ private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { case Address(_, _, None, None) ⇒ cluster.selfAddress case a ⇒ a } def currentRoutees(router: ActorRef) = Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees "A cluster router with a RoundRobin router" must { "start cluster with 2 nodes" taggedAs LongRunningTest in { awaitClusterUp(first, second) enterBarrier("after-1") } "deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { router1.isInstanceOf[RoutedActorRef] should be(true) // max-nr-of-instances-per-node=2 times 2 nodes awaitAssert(currentRoutees(router1).size should be(4)) val iterationCount = 10 for (i ← 0 until iterationCount) { router1 ! "hit" } val replies = receiveReplies(PoolRoutee, iterationCount) replies(first) should be > (0) replies(second) should be > (0) replies(third) should be(0) replies(fourth) should be(0) replies.values.sum should be(iterationCount) } enterBarrier("after-2") } "lookup routees on the member nodes in the cluster" taggedAs LongRunningTest in { // cluster consists of first and second system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceA") system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceB") enterBarrier("myservice-started") runOn(first) { // 2 nodes, 2 routees on each node within(10.seconds) { awaitAssert(currentRoutees(router4).size should be(4)) } val iterationCount = 10 for (i ← 0 until iterationCount) { router4 ! "hit" } val replies = receiveReplies(GroupRoutee, iterationCount) replies(first) should be > (0) replies(second) should be > (0) replies(third) should be(0) replies(fourth) should be(0) replies.values.sum should be(iterationCount) } enterBarrier("after-3") } "deploy routees to new nodes in the cluster" taggedAs LongRunningTest in { // add third and fourth awaitClusterUp(first, second, third, fourth) runOn(first) { // max-nr-of-instances-per-node=2 times 4 nodes awaitAssert(currentRoutees(router1).size should be(8)) val iterationCount = 10 for (i ← 0 until iterationCount) { router1 ! "hit" } val replies = receiveReplies(PoolRoutee, iterationCount) replies.values.foreach { _ should be > (0) } replies.values.sum should be(iterationCount) } enterBarrier("after-4") } "lookup routees on new nodes in the cluster" taggedAs LongRunningTest in { // cluster consists of first, second, third and fourth runOn(first) { // 4 nodes, 2 routee on each node awaitAssert(currentRoutees(router4).size should be(8)) val iterationCount = 10 for (i ← 0 until iterationCount) { router4 ! "hit" } val replies = receiveReplies(GroupRoutee, iterationCount) replies.values.foreach { _ should be > (0) } replies.values.sum should be(iterationCount) } enterBarrier("after-5") } "deploy routees to only remote nodes when allow-local-routees = off" taggedAs LongRunningTest in { runOn(first) { // max-nr-of-instances-per-node=1 times 3 nodes awaitAssert(currentRoutees(router3).size should be(3)) val iterationCount = 10 for (i ← 0 until iterationCount) { router3 ! "hit" } val replies = receiveReplies(PoolRoutee, iterationCount) replies(first) should be(0) replies(second) should be > (0) replies(third) should be > (0) replies(fourth) should be > (0) replies.values.sum should be(iterationCount) } enterBarrier("after-6") } "deploy routees to specified node role" taggedAs LongRunningTest in { runOn(first) { awaitAssert(currentRoutees(router5).size should be(2)) val iterationCount = 10 for (i ← 0 until iterationCount) { router5 ! "hit" } val replies = receiveReplies(PoolRoutee, iterationCount) replies(first) should be > (0) replies(second) should be > (0) replies(third) should be(0) replies(fourth) should be(0) replies.values.sum should be(iterationCount) } enterBarrier("after-7") } "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { router2.isInstanceOf[RoutedActorRef] should be(true) // totalInstances = 3, maxInstancesPerNode = 1 awaitAssert(currentRoutees(router2).size should be(3)) val iterationCount = 10 for (i ← 0 until iterationCount) { router2 ! "hit" } val replies = receiveReplies(PoolRoutee, iterationCount) // note that router2 has totalInstances = 3, maxInstancesPerNode = 1 val routees = currentRoutees(router2) val routeeAddresses = routees map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) } routeeAddresses.size should be(3) replies.values.sum should be(iterationCount) } enterBarrier("after-8") } "remove routees for unreachable nodes, and add when reachable again" taggedAs LongRunningTest in within(30.seconds) { // myservice is already running def routees = currentRoutees(router4) def routeeAddresses = (routees map { case ActorSelectionRoutee(sel) ⇒ fullAddress(sel.anchor) }).toSet runOn(first) { // 4 nodes, 2 routees on each node awaitAssert(currentRoutees(router4).size should be(8)) testConductor.blackhole(first, second, Direction.Both).await awaitAssert(routees.size should be(6)) routeeAddresses should not contain (address(second)) testConductor.passThrough(first, second, Direction.Both).await awaitAssert(routees.size should be(8)) routeeAddresses should contain(address(second)) } enterBarrier("after-9") } "deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in { muteMarkingAsUnreachable() runOn(first) { def routees = currentRoutees(router2) def routeeAddresses = (routees map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }).toSet routees foreach { case ActorRefRoutee(ref) ⇒ watch(ref) } val notUsedAddress = ((roles map address).toSet -- routeeAddresses).head val downAddress = routeeAddresses.find(_ != address(first)).get val downRouteeRef = routees.collectFirst { case ActorRefRoutee(ref) if ref.path.address == downAddress ⇒ ref }.get cluster.down(downAddress) expectMsgType[Terminated](15.seconds).actor should be(downRouteeRef) awaitAssert { routeeAddresses should contain(notUsedAddress) routeeAddresses should not contain (downAddress) } val iterationCount = 10 for (i ← 0 until iterationCount) { router2 ! "hit" } val replies = receiveReplies(PoolRoutee, iterationCount) routeeAddresses.size should be(3) replies.values.sum should be(iterationCount) } enterBarrier("after-10") } } } Other Akka source code examplesHere is a short list of links related to this Akka ClusterRoundRobinSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.