|
Akka/Scala example source code file (ClusterConsistentHashingRouterSpec.scala)
The ClusterConsistentHashingRouterSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster.routing import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Address import akka.actor.Props import akka.cluster.MultiNodeClusterSpec import akka.pattern.ask import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.routing.ConsistentHashingRouter import akka.routing.ConsistentHashingRouter.ConsistentHashMapping import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope import akka.routing.GetRoutees import akka.routing.FromConfig import akka.testkit._ import akka.routing.ActorRefRoutee import akka.routing.ConsistentHashingPool import akka.routing.Routees object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig { class Echo extends Actor { def receive = { case _ ⇒ sender() ! self } } val first = role("first") val second = role("second") val third = role("third") commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" common-router-settings = { router = consistent-hashing-pool nr-of-instances = 10 cluster { enabled = on max-nr-of-instances-per-node = 2 } } akka.actor.deployment { /router1 = ${common-router-settings} /router3 = ${common-router-settings} /router4 = ${common-router-settings} } """)). withFallback(MultiNodeClusterSpec.clusterConfig)) } class ClusterConsistentHashingRouterMultiJvmNode1 extends ClusterConsistentHashingRouterSpec class ClusterConsistentHashingRouterMultiJvmNode2 extends ClusterConsistentHashingRouterSpec class ClusterConsistentHashingRouterMultiJvmNode3 extends ClusterConsistentHashingRouterSpec abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterConsistentHashingRouterMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with DefaultTimeout { import ClusterConsistentHashingRouterMultiJvmSpec._ lazy val router1 = system.actorOf(FromConfig.props(Props[Echo]), "router1") def currentRoutees(router: ActorRef) = Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees /** * Fills in self address for local ActorRef */ private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { case Address(_, _, None, None) ⇒ cluster.selfAddress case a ⇒ a } "A cluster router with a consistent hashing pool" must { "start cluster with 2 nodes" taggedAs LongRunningTest in { awaitClusterUp(first, second) enterBarrier("after-1") } "create routees from configuration" in { runOn(first) { // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router1).size should be(4) } val routees = currentRoutees(router1) routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(Set(address(first), address(second))) } enterBarrier("after-2") } "select destination based on hashKey" in { runOn(first) { router1 ! ConsistentHashableEnvelope(message = "A", hashKey = "a") val destinationA = expectMsgType[ActorRef] router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a") expectMsg(destinationA) } enterBarrier("after-2") } "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) runOn(first) { // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router1).size should be(6) } val routees = currentRoutees(router1) routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(roles.map(address).toSet) } enterBarrier("after-3") } "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { val router2 = system.actorOf(ClusterRouterPool(local = ConsistentHashingPool(nrOfInstances = 0), settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = None)). props(Props[Echo]), "router2") // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router2).size should be(6) } val routees = currentRoutees(router2) routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(roles.map(address).toSet) } enterBarrier("after-4") } "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in { runOn(first) { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s } val router3 = system.actorOf(ConsistentHashingPool(nrOfInstances = 0, hashMapping = hashMapping).props(Props[Echo]), "router3") assertHashMapping(router3) } enterBarrier("after-5") } "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in { runOn(first) { def hashMapping: ConsistentHashMapping = { case s: String ⇒ s } val router4 = system.actorOf(ClusterRouterPool( local = ConsistentHashingPool(nrOfInstances = 0, hashMapping = hashMapping), settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). props(Props[Echo]), "router4") assertHashMapping(router4) } enterBarrier("after-6") } def assertHashMapping(router: ActorRef): Unit = { // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router).size should be(6) } val routees = currentRoutees(router) routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(roles.map(address).toSet) router ! "a" val destinationA = expectMsgType[ActorRef] router ! "a" expectMsg(destinationA) } } } Other Akka source code examplesHere is a short list of links related to this Akka ClusterConsistentHashingRouterSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.