|
Akka/Scala example source code file (ClusterRoundRobinSpec.scala)
The ClusterRoundRobinSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import language.postfixOps
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.MultiNodeClusterSpec
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.routing.FromConfig
import akka.routing.RoundRobinPool
import akka.routing.ActorRefRoutee
import akka.routing.ActorSelectionRoutee
import akka.routing.RoutedActorRef
import akka.routing.GetRoutees
import akka.routing.Routees
object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig {
class SomeActor(routeeType: RouteeType) extends Actor {
def this() = this(PoolRoutee)
def receive = {
case "hit" ⇒ sender() ! Reply(routeeType, self)
}
}
final case class Reply(routeeType: RouteeType, ref: ActorRef)
sealed trait RouteeType extends Serializable
object PoolRoutee extends RouteeType
object GroupRoutee extends RouteeType
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.actor.deployment {
/router1 {
router = round-robin-pool
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 2
}
}
/router3 {
router = round-robin-pool
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 1
allow-local-routees = off
}
}
/router4 {
router = round-robin-group
nr-of-instances = 10
routees.paths = ["/user/myserviceA", "/user/myserviceB"]
cluster.enabled = on
}
/router5 {
router = round-robin-pool
nr-of-instances = 10
cluster {
enabled = on
use-role = a
}
}
}
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]"""))
nodeConfig(third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]"""))
testTransport(on = true)
}
class ClusterRoundRobinMultiJvmNode1 extends ClusterRoundRobinSpec
class ClusterRoundRobinMultiJvmNode2 extends ClusterRoundRobinSpec
class ClusterRoundRobinMultiJvmNode3 extends ClusterRoundRobinSpec
class ClusterRoundRobinMultiJvmNode4 extends ClusterRoundRobinSpec
abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import ClusterRoundRobinMultiJvmSpec._
lazy val router1 = system.actorOf(FromConfig.props(Props[SomeActor]), "router1")
lazy val router2 = system.actorOf(ClusterRouterPool(RoundRobinPool(nrOfInstances = 0),
ClusterRouterPoolSettings(totalInstances = 3, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)).
props(Props[SomeActor]),
"router2")
lazy val router3 = system.actorOf(FromConfig.props(Props[SomeActor]), "router3")
lazy val router4 = system.actorOf(FromConfig.props(), "router4")
lazy val router5 = system.actorOf(RoundRobinPool(nrOfInstances = 0).props(Props[SomeActor]), "router5")
def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
(receiveWhile(5 seconds, messages = expectedReplies) {
case Reply(`routeeType`, ref) ⇒ fullAddress(ref)
}).foldLeft(zero) {
case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1))
}
}
/**
* Fills in self address for local ActorRef
*/
private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) ⇒ cluster.selfAddress
case a ⇒ a
}
def currentRoutees(router: ActorRef) =
Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees
"A cluster router with a RoundRobin router" must {
"start cluster with 2 nodes" taggedAs LongRunningTest in {
awaitClusterUp(first, second)
enterBarrier("after-1")
}
"deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) {
router1.isInstanceOf[RoutedActorRef] should be(true)
// max-nr-of-instances-per-node=2 times 2 nodes
awaitAssert(currentRoutees(router1).size should be(4))
val iterationCount = 10
for (i ← 0 until iterationCount) {
router1 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be > (0)
replies(second) should be > (0)
replies(third) should be(0)
replies(fourth) should be(0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-2")
}
"lookup routees on the member nodes in the cluster" taggedAs LongRunningTest in {
// cluster consists of first and second
system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceA")
system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceB")
enterBarrier("myservice-started")
runOn(first) {
// 2 nodes, 2 routees on each node
within(10.seconds) {
awaitAssert(currentRoutees(router4).size should be(4))
}
val iterationCount = 10
for (i ← 0 until iterationCount) {
router4 ! "hit"
}
val replies = receiveReplies(GroupRoutee, iterationCount)
replies(first) should be > (0)
replies(second) should be > (0)
replies(third) should be(0)
replies(fourth) should be(0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-3")
}
"deploy routees to new nodes in the cluster" taggedAs LongRunningTest in {
// add third and fourth
awaitClusterUp(first, second, third, fourth)
runOn(first) {
// max-nr-of-instances-per-node=2 times 4 nodes
awaitAssert(currentRoutees(router1).size should be(8))
val iterationCount = 10
for (i ← 0 until iterationCount) {
router1 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies.values.foreach { _ should be > (0) }
replies.values.sum should be(iterationCount)
}
enterBarrier("after-4")
}
"lookup routees on new nodes in the cluster" taggedAs LongRunningTest in {
// cluster consists of first, second, third and fourth
runOn(first) {
// 4 nodes, 2 routee on each node
awaitAssert(currentRoutees(router4).size should be(8))
val iterationCount = 10
for (i ← 0 until iterationCount) {
router4 ! "hit"
}
val replies = receiveReplies(GroupRoutee, iterationCount)
replies.values.foreach { _ should be > (0) }
replies.values.sum should be(iterationCount)
}
enterBarrier("after-5")
}
"deploy routees to only remote nodes when allow-local-routees = off" taggedAs LongRunningTest in {
runOn(first) {
// max-nr-of-instances-per-node=1 times 3 nodes
awaitAssert(currentRoutees(router3).size should be(3))
val iterationCount = 10
for (i ← 0 until iterationCount) {
router3 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be(0)
replies(second) should be > (0)
replies(third) should be > (0)
replies(fourth) should be > (0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-6")
}
"deploy routees to specified node role" taggedAs LongRunningTest in {
runOn(first) {
awaitAssert(currentRoutees(router5).size should be(2))
val iterationCount = 10
for (i ← 0 until iterationCount) {
router5 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be > (0)
replies(second) should be > (0)
replies(third) should be(0)
replies(fourth) should be(0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-7")
}
"deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) {
router2.isInstanceOf[RoutedActorRef] should be(true)
// totalInstances = 3, maxInstancesPerNode = 1
awaitAssert(currentRoutees(router2).size should be(3))
val iterationCount = 10
for (i ← 0 until iterationCount) {
router2 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
// note that router2 has totalInstances = 3, maxInstancesPerNode = 1
val routees = currentRoutees(router2)
val routeeAddresses = routees map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }
routeeAddresses.size should be(3)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-8")
}
"remove routees for unreachable nodes, and add when reachable again" taggedAs LongRunningTest in within(30.seconds) {
// myservice is already running
def routees = currentRoutees(router4)
def routeeAddresses = (routees map { case ActorSelectionRoutee(sel) ⇒ fullAddress(sel.anchor) }).toSet
runOn(first) {
// 4 nodes, 2 routees on each node
awaitAssert(currentRoutees(router4).size should be(8))
testConductor.blackhole(first, second, Direction.Both).await
awaitAssert(routees.size should be(6))
routeeAddresses should not contain (address(second))
testConductor.passThrough(first, second, Direction.Both).await
awaitAssert(routees.size should be(8))
routeeAddresses should contain(address(second))
}
enterBarrier("after-9")
}
"deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in {
muteMarkingAsUnreachable()
runOn(first) {
def routees = currentRoutees(router2)
def routeeAddresses = (routees map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }).toSet
routees foreach { case ActorRefRoutee(ref) ⇒ watch(ref) }
val notUsedAddress = ((roles map address).toSet -- routeeAddresses).head
val downAddress = routeeAddresses.find(_ != address(first)).get
val downRouteeRef = routees.collectFirst {
case ActorRefRoutee(ref) if ref.path.address == downAddress ⇒ ref
}.get
cluster.down(downAddress)
expectMsgType[Terminated](15.seconds).actor should be(downRouteeRef)
awaitAssert {
routeeAddresses should contain(notUsedAddress)
routeeAddresses should not contain (downAddress)
}
val iterationCount = 10
for (i ← 0 until iterationCount) {
router2 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
routeeAddresses.size should be(3)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-10")
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ClusterRoundRobinSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.