|
Akka/Scala example source code file (AdaptiveLoadBalancingRouterSpec.scala)
The AdaptiveLoadBalancingRouterSpec.scala Akka example source code/* * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster.routing import language.postfixOps import java.lang.management.ManagementFactory import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.actor._ import akka.cluster.Cluster import akka.cluster.MultiNodeClusterSpec import akka.cluster.NodeMetrics import akka.pattern.ask import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig } import akka.routing.GetRoutees import akka.routing.FromConfig import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender } import akka.routing.ActorRefRoutee import akka.routing.Routees object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig { class Echo extends Actor { def receive = { case _ ⇒ sender() ! Reply(Cluster(context.system).selfAddress) } } class Memory extends Actor with ActorLogging { var usedMemory: Array[Array[Int]] = _ def receive = { case AllocateMemory ⇒ val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage // getMax can be undefined (-1) val max = math.max(heap.getMax, heap.getCommitted) val used = heap.getUsed log.debug("used heap before: [{}] bytes, of max [{}]", used, heap.getMax) // allocate 70% of free space val allocateBytes = (0.7 * (max - used)).toInt val numberOfArrays = allocateBytes / 1024 usedMemory = Array.ofDim(numberOfArrays, 248) // each 248 element Int array will use ~ 1 kB log.debug("used heap after: [{}] bytes", ManagementFactory.getMemoryMXBean.getHeapMemoryUsage.getUsed) sender() ! "done" } } case object AllocateMemory final case class Reply(address: Address) val first = role("first") val second = role("second") val third = role("third") commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" akka.cluster.metrics.collect-interval = 1s akka.cluster.metrics.gossip-interval = 1s akka.cluster.metrics.moving-average-half-life = 2s akka.actor.deployment { /router3 = { router = adaptive-pool metrics-selector = cpu nr-of-instances = 9 } /router4 = { router = adaptive-pool metrics-selector = "akka.cluster.routing.TestCustomMetricsSelector" nr-of-instances = 10 cluster { enabled = on max-nr-of-instances-per-node = 2 } } } """)).withFallback(MultiNodeClusterSpec.clusterConfig)) } class TestCustomMetricsSelector(config: Config) extends MetricsSelector { override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] = Map.empty } class AdaptiveLoadBalancingRouterMultiJvmNode1 extends AdaptiveLoadBalancingRouterSpec class AdaptiveLoadBalancingRouterMultiJvmNode2 extends AdaptiveLoadBalancingRouterSpec class AdaptiveLoadBalancingRouterMultiJvmNode3 extends AdaptiveLoadBalancingRouterSpec abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoadBalancingRouterMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with DefaultTimeout { import AdaptiveLoadBalancingRouterMultiJvmSpec._ def currentRoutees(router: ActorRef) = Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees def receiveReplies(expectedReplies: Int): Map[Address, Int] = { val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) (receiveWhile(5 seconds, messages = expectedReplies) { case Reply(address) ⇒ address }).foldLeft(zero) { case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) } } /** * Fills in self address for local ActorRef */ def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { case Address(_, _, None, None) ⇒ cluster.selfAddress case a ⇒ a } def startRouter(name: String): ActorRef = { val router = system.actorOf(ClusterRouterPool( local = AdaptiveLoadBalancingPool(HeapMetricsSelector), settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). props(Props[Echo]), name) // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router).size should be(roles.size) } val routees = currentRoutees(router) routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(roles.map(address).toSet) router } "A cluster with a AdaptiveLoadBalancingRouter" must { "start cluster nodes" taggedAs LongRunningTest in { awaitClusterUp(roles: _*) enterBarrier("after-1") } "use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in { runOn(first) { val router1 = startRouter("router1") // collect some metrics before we start Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10) val iterationCount = 100 1 to iterationCount foreach { _ ⇒ router1 ! "hit" // wait a while between each message, since metrics is collected periodically Thread.sleep(10) } val replies = receiveReplies(iterationCount) replies(first) should be > (0) replies(second) should be > (0) replies(third) should be > (0) replies.values.sum should be(iterationCount) } enterBarrier("after-2") } "prefer node with more free heap capacity" taggedAs LongRunningTest in { System.gc() enterBarrier("gc") runOn(second) { within(20.seconds) { system.actorOf(Props[Memory], "memory") ! AllocateMemory expectMsg("done") } } enterBarrier("heap-allocated") runOn(first) { val router2 = startRouter("router2") // collect some metrics before we start Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10) val iterationCount = 3000 1 to iterationCount foreach { _ ⇒ router2 ! "hit" } val replies = receiveReplies(iterationCount) replies(third) should be > (replies(second)) replies.values.sum should be(iterationCount) } enterBarrier("after-3") } "create routees from configuration" taggedAs LongRunningTest in { runOn(first) { val router3 = system.actorOf(FromConfig.props(Props[Memory]), "router3") // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router3).size should be(9) } val routees = currentRoutees(router3) routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(Set(address(first))) } enterBarrier("after-4") } "create routees from cluster.enabled configuration" taggedAs LongRunningTest in { runOn(first) { val router4 = system.actorOf(FromConfig.props(Props[Memory]), "router4") // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router4).size should be(6) } val routees = currentRoutees(router4) routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(Set( address(first), address(second), address(third))) } enterBarrier("after-5") } } } Other Akka source code examplesHere is a short list of links related to this Akka AdaptiveLoadBalancingRouterSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.