|
Akka/Scala example source code file (RoundRobinSpec.scala)
The RoundRobinSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import language.postfixOps
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.Props
import akka.actor.Actor
import akka.testkit._
import akka.pattern.ask
import akka.actor.Terminated
import akka.actor.ActorRef
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RoundRobinSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
def routeeSize(router: ActorRef): Int =
Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees.size
"round robin pool" must {
"be able to shut down its instance" in {
val helloLatch = new TestLatch(5)
val stopLatch = new TestLatch(5)
val actor = system.actorOf(RoundRobinPool(5).props(routeeProps = Props(new Actor {
def receive = {
case "hello" ⇒ helloLatch.countDown()
}
override def postStop() {
stopLatch.countDown()
}
})), "round-robin-shutdown")
actor ! "hello"
actor ! "hello"
actor ! "hello"
actor ! "hello"
actor ! "hello"
Await.ready(helloLatch, 5 seconds)
system.stop(actor)
Await.ready(stopLatch, 5 seconds)
}
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new TestLatch(connectionCount)
val counter = new AtomicInteger
var replies = Map.empty[Int, Int].withDefaultValue(0)
val actor = system.actorOf(RoundRobinPool(connectionCount).props(routeeProps = Props(new Actor {
lazy val id = counter.getAndIncrement()
def receive = {
case "hit" ⇒ sender() ! id
case "end" ⇒ doneLatch.countDown()
}
})), "round-robin")
for (_ ← 1 to iterationCount; _ ← 1 to connectionCount) {
val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration)
replies = replies + (id -> (replies(id) + 1))
}
counter.get should be(connectionCount)
actor ! akka.routing.Broadcast("end")
Await.ready(doneLatch, 5 seconds)
replies.values foreach { _ should be(iterationCount) }
}
"deliver a broadcast message using the !" in {
val helloLatch = new TestLatch(5)
val stopLatch = new TestLatch(5)
val actor = system.actorOf(RoundRobinPool(5).props(routeeProps = Props(new Actor {
def receive = {
case "hello" ⇒ helloLatch.countDown()
}
override def postStop() {
stopLatch.countDown()
}
})), "round-robin-broadcast")
actor ! akka.routing.Broadcast("hello")
Await.ready(helloLatch, 5 seconds)
system.stop(actor)
Await.ready(stopLatch, 5 seconds)
}
"be controlled with management messages" in {
val actor = system.actorOf(RoundRobinPool(3).props(routeeProps = Props(new Actor {
def receive = Actor.emptyBehavior
})), "round-robin-managed")
routeeSize(actor) should be(3)
actor ! AdjustPoolSize(+4)
routeeSize(actor) should be(7)
actor ! AdjustPoolSize(-2)
routeeSize(actor) should be(5)
val other = ActorSelectionRoutee(system.actorSelection("/user/other"))
actor ! AddRoutee(other)
routeeSize(actor) should be(6)
actor ! RemoveRoutee(other)
routeeSize(actor) should be(5)
}
}
"round robin group" must {
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new TestLatch(connectionCount)
var replies = Map.empty[String, Int].withDefaultValue(0)
val paths = (1 to connectionCount) map { n ⇒
val ref = system.actorOf(Props(new Actor {
def receive = {
case "hit" ⇒ sender() ! self.path.name
case "end" ⇒ doneLatch.countDown()
}
}), name = "target-" + n)
ref.path.toStringWithoutAddress
}
val actor = system.actorOf(RoundRobinGroup(paths).props(), "round-robin-group1")
for (_ ← 1 to iterationCount; _ ← 1 to connectionCount) {
val id = Await.result((actor ? "hit").mapTo[String], timeout.duration)
replies = replies + (id -> (replies(id) + 1))
}
actor ! akka.routing.Broadcast("end")
Await.ready(doneLatch, 5 seconds)
replies.values foreach { _ should be(iterationCount) }
}
}
"round robin logic used in actor" must {
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
var replies = Map.empty[String, Int].withDefaultValue(0)
val actor = system.actorOf(Props(new Actor {
var n = 0
var router = Router(RoundRobinRoutingLogic())
def receive = {
case p: Props ⇒
n += 1
val c = context.actorOf(p, name = "child-" + n)
context.watch(c)
router = router.addRoutee(c)
case Terminated(c) ⇒
router = router.removeRoutee(c)
if (router.routees.isEmpty)
context.stop(self)
case other ⇒ router.route(other, sender())
}
}))
val childProps = Props(new Actor {
def receive = {
case "hit" ⇒ sender() ! self.path.name
case "end" ⇒ context.stop(self)
}
})
(1 to connectionCount) foreach { _ ⇒ actor ! childProps }
for (_ ← 1 to iterationCount; _ ← 1 to connectionCount) {
val id = Await.result((actor ? "hit").mapTo[String], timeout.duration)
replies = replies + (id -> (replies(id) + 1))
}
watch(actor)
actor ! akka.routing.Broadcast("end")
expectTerminated(actor)
replies.values foreach { _ should be(iterationCount) }
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka RoundRobinSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.