|
Akka/Scala example source code file (ResizerSpec.scala)
The ResizerSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.routing import language.postfixOps import akka.actor.Actor import akka.testkit._ import akka.testkit.TestEvent._ import akka.actor.Props import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.ActorRef import akka.pattern.ask import scala.util.Try object ResizerSpec { val config = """ akka.actor.serialize-messages = off akka.actor.deployment { /router1 { router = round-robin-pool resizer { lower-bound = 2 upper-bound = 3 } } } """ class TestActor extends Actor { def receive = { case latch: TestLatch ⇒ latch.countDown() } } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with ImplicitSender { import akka.routing.ResizerSpec._ override def atStartup: Unit = { // when shutting down some Resize messages might hang around system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*Resize"))) } def routeeSize(router: ActorRef): Int = Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees.size "DefaultResizer" must { "use settings to evaluate capacity" in { val resizer = DefaultResizer( lowerBound = 2, upperBound = 3) val c1 = resizer.capacity(Vector.empty[Routee]) c1 should be(2) val current = Vector( ActorRefRoutee(system.actorOf(Props[TestActor])), ActorRefRoutee(system.actorOf(Props[TestActor]))) val c2 = resizer.capacity(current) c2 should be(0) } "use settings to evaluate rampUp" in { val resizer = DefaultResizer( lowerBound = 2, upperBound = 10, rampupRate = 0.2) resizer.rampup(pressure = 9, capacity = 10) should be(0) resizer.rampup(pressure = 5, capacity = 5) should be(1) resizer.rampup(pressure = 6, capacity = 6) should be(2) } "use settings to evaluate backoff" in { val resizer = DefaultResizer( lowerBound = 2, upperBound = 10, backoffThreshold = 0.3, backoffRate = 0.1) resizer.backoff(pressure = 10, capacity = 10) should be(0) resizer.backoff(pressure = 4, capacity = 10) should be(0) resizer.backoff(pressure = 3, capacity = 10) should be(0) resizer.backoff(pressure = 2, capacity = 10) should be(-1) resizer.backoff(pressure = 0, capacity = 10) should be(-1) resizer.backoff(pressure = 1, capacity = 9) should be(-1) resizer.backoff(pressure = 0, capacity = 9) should be(-1) } "be possible to define programmatically" in { val latch = new TestLatch(3) val resizer = DefaultResizer( lowerBound = 2, upperBound = 3) val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)). props(Props[TestActor])) router ! latch router ! latch router ! latch Await.ready(latch, remainingOrDefault) // messagesPerResize is 10 so there is no risk of additional resize routeeSize(router) should be(2) } "be possible to define in configuration" in { val latch = new TestLatch(3) val router = system.actorOf(FromConfig.props(Props[TestActor]), "router1") router ! latch router ! latch router ! latch Await.ready(latch, remainingOrDefault) routeeSize(router) should be(2) } "grow as needed under pressure" in { // make sure the pool starts at the expected lower limit and grows to the upper as needed // as influenced by the backlog of blocking pooled actors val resizer = DefaultResizer( lowerBound = 3, upperBound = 5, rampupRate = 0.1, backoffRate = 0.0, pressureThreshold = 1, messagesPerResize = 1, backoffThreshold = 0.0) val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)).props( Props(new Actor { def receive = { case d: FiniteDuration ⇒ Thread.sleep(d.dilated.toMillis); sender() ! "done" case "echo" ⇒ sender() ! "reply" } }))) // first message should create the minimum number of routees router ! "echo" expectMsg("reply") routeeSize(router) should be(resizer.lowerBound) def loop(loops: Int, d: FiniteDuration) = { for (m ← 0 until loops) { router ! d // sending in too quickly will result in skipped resize due to many resizeInProgress conflicts Thread.sleep(20.millis.dilated.toMillis) } within((d * loops / resizer.lowerBound) + 2.seconds.dilated) { for (m ← 0 until loops) expectMsg("done") } } // 2 more should go thru without triggering more loop(2, 200 millis) routeeSize(router) should be(resizer.lowerBound) // a whole bunch should max it out loop(20, 500 millis) routeeSize(router) should be(resizer.upperBound) } "backoff" in within(10 seconds) { val resizer = DefaultResizer( lowerBound = 2, upperBound = 5, rampupRate = 1.0, backoffRate = 1.0, backoffThreshold = 0.40, pressureThreshold = 1, messagesPerResize = 2) val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)).props( Props(new Actor { def receive = { case n: Int if n <= 0 ⇒ // done case n: Int ⇒ Thread.sleep((n millis).dilated.toMillis) } }))) // put some pressure on the router for (m ← 0 until 15) { router ! 150 Thread.sleep((20 millis).dilated.toMillis) } val z = routeeSize(router) z should be > (2) Thread.sleep((300 millis).dilated.toMillis) // let it cool down awaitCond({ router ! 0 // trigger resize Thread.sleep((20 millis).dilated.toMillis) routeeSize(router) < z }, interval = 500.millis.dilated) } } } Other Akka source code examplesHere is a short list of links related to this Akka ResizerSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.