|
Akka/Scala example source code file (RoundRobinSpec.scala)
The RoundRobinSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.routing import language.postfixOps import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.Props import akka.actor.Actor import akka.testkit._ import akka.pattern.ask import akka.actor.Terminated import akka.actor.ActorRef @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RoundRobinSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { def routeeSize(router: ActorRef): Int = Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees.size "round robin pool" must { "be able to shut down its instance" in { val helloLatch = new TestLatch(5) val stopLatch = new TestLatch(5) val actor = system.actorOf(RoundRobinPool(5).props(routeeProps = Props(new Actor { def receive = { case "hello" ⇒ helloLatch.countDown() } override def postStop() { stopLatch.countDown() } })), "round-robin-shutdown") actor ! "hello" actor ! "hello" actor ! "hello" actor ! "hello" actor ! "hello" Await.ready(helloLatch, 5 seconds) system.stop(actor) Await.ready(stopLatch, 5 seconds) } "deliver messages in a round robin fashion" in { val connectionCount = 10 val iterationCount = 10 val doneLatch = new TestLatch(connectionCount) val counter = new AtomicInteger var replies = Map.empty[Int, Int].withDefaultValue(0) val actor = system.actorOf(RoundRobinPool(connectionCount).props(routeeProps = Props(new Actor { lazy val id = counter.getAndIncrement() def receive = { case "hit" ⇒ sender() ! id case "end" ⇒ doneLatch.countDown() } })), "round-robin") for (_ ← 1 to iterationCount; _ ← 1 to connectionCount) { val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration) replies = replies + (id -> (replies(id) + 1)) } counter.get should be(connectionCount) actor ! akka.routing.Broadcast("end") Await.ready(doneLatch, 5 seconds) replies.values foreach { _ should be(iterationCount) } } "deliver a broadcast message using the !" in { val helloLatch = new TestLatch(5) val stopLatch = new TestLatch(5) val actor = system.actorOf(RoundRobinPool(5).props(routeeProps = Props(new Actor { def receive = { case "hello" ⇒ helloLatch.countDown() } override def postStop() { stopLatch.countDown() } })), "round-robin-broadcast") actor ! akka.routing.Broadcast("hello") Await.ready(helloLatch, 5 seconds) system.stop(actor) Await.ready(stopLatch, 5 seconds) } "be controlled with management messages" in { val actor = system.actorOf(RoundRobinPool(3).props(routeeProps = Props(new Actor { def receive = Actor.emptyBehavior })), "round-robin-managed") routeeSize(actor) should be(3) actor ! AdjustPoolSize(+4) routeeSize(actor) should be(7) actor ! AdjustPoolSize(-2) routeeSize(actor) should be(5) val other = ActorSelectionRoutee(system.actorSelection("/user/other")) actor ! AddRoutee(other) routeeSize(actor) should be(6) actor ! RemoveRoutee(other) routeeSize(actor) should be(5) } } "round robin group" must { "deliver messages in a round robin fashion" in { val connectionCount = 10 val iterationCount = 10 val doneLatch = new TestLatch(connectionCount) var replies = Map.empty[String, Int].withDefaultValue(0) val paths = (1 to connectionCount) map { n ⇒ val ref = system.actorOf(Props(new Actor { def receive = { case "hit" ⇒ sender() ! self.path.name case "end" ⇒ doneLatch.countDown() } }), name = "target-" + n) ref.path.toStringWithoutAddress } val actor = system.actorOf(RoundRobinGroup(paths).props(), "round-robin-group1") for (_ ← 1 to iterationCount; _ ← 1 to connectionCount) { val id = Await.result((actor ? "hit").mapTo[String], timeout.duration) replies = replies + (id -> (replies(id) + 1)) } actor ! akka.routing.Broadcast("end") Await.ready(doneLatch, 5 seconds) replies.values foreach { _ should be(iterationCount) } } } "round robin logic used in actor" must { "deliver messages in a round robin fashion" in { val connectionCount = 10 val iterationCount = 10 var replies = Map.empty[String, Int].withDefaultValue(0) val actor = system.actorOf(Props(new Actor { var n = 0 var router = Router(RoundRobinRoutingLogic()) def receive = { case p: Props ⇒ n += 1 val c = context.actorOf(p, name = "child-" + n) context.watch(c) router = router.addRoutee(c) case Terminated(c) ⇒ router = router.removeRoutee(c) if (router.routees.isEmpty) context.stop(self) case other ⇒ router.route(other, sender()) } })) val childProps = Props(new Actor { def receive = { case "hit" ⇒ sender() ! self.path.name case "end" ⇒ context.stop(self) } }) (1 to connectionCount) foreach { _ ⇒ actor ! childProps } for (_ ← 1 to iterationCount; _ ← 1 to connectionCount) { val id = Await.result((actor ? "hit").mapTo[String], timeout.duration) replies = replies + (id -> (replies(id) + 1)) } watch(actor) actor ! akka.routing.Broadcast("end") expectTerminated(actor) replies.values foreach { _ should be(iterationCount) } } } } Other Akka source code examplesHere is a short list of links related to this Akka RoundRobinSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.