|
Akka/Scala example source code file (BalancingSpec.scala)
The BalancingSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.routing import language.postfixOps import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.{ Props, Actor } import akka.testkit.{ TestLatch, ImplicitSender, AkkaSpec } import akka.actor.ActorRef import org.scalatest.BeforeAndAfterEach object BalancingSpec { val counter = new AtomicInteger(1) class Worker(latch: TestLatch) extends Actor { lazy val id = counter.getAndIncrement() def receive = { case msg ⇒ if (id == 1) Thread.sleep(10) // dispatch to other routees else Await.ready(latch, 1.minute) sender() ! id } } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class BalancingSpec extends AkkaSpec( """ akka.actor.deployment { /balancingPool-2 { router = balancing-pool nr-of-instances = 5 pool-dispatcher { attempt-teamwork = on } } /balancingPool-3 { router = balancing-pool nr-of-instances = 5 pool-dispatcher { attempt-teamwork = on } } } """) with ImplicitSender with BeforeAndAfterEach { import BalancingSpec._ val poolSize = 5 // must be less than fork-join parallelism-min, which is 8 in AkkaSpec override def beforeEach(): Unit = { counter.set(1) } def test(pool: ActorRef, latch: TestLatch): Unit = { val iterationCount = 100 for (i ← 1 to iterationCount) { pool ! "hit-" + i } // all but one worker are blocked val replies1 = receiveN(iterationCount - poolSize + 1) expectNoMsg(1.second) // all replies from the unblocked worker so far replies1.toSet should be(Set(1)) latch.countDown() val replies2 = receiveN(poolSize - 1) // the remaining replies come from the blocked replies2.toSet should be((2 to poolSize).toSet) expectNoMsg(500.millis) } "balancing pool" must { "deliver messages in a balancing fashion when defined programatically" in { val latch = TestLatch(1) val pool = system.actorOf(BalancingPool(poolSize).props(routeeProps = Props(classOf[Worker], latch)), name = "balancingPool-1") test(pool, latch) } "deliver messages in a balancing fashion when defined in config" in { val latch = TestLatch(1) val pool = system.actorOf(FromConfig().props(routeeProps = Props(classOf[Worker], latch)), name = "balancingPool-2") test(pool, latch) } "deliver messages in a balancing fashion when overridden in config" in { val latch = TestLatch(1) val pool = system.actorOf(BalancingPool(1).props(routeeProps = Props(classOf[Worker], latch)), name = "balancingPool-3") test(pool, latch) } } } Other Akka source code examplesHere is a short list of links related to this Akka BalancingSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.