|
Akka/Scala example source code file (ScatterGatherFirstCompletedSpec.scala)
The ScatterGatherFirstCompletedSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.{ Props, Actor }
import akka.pattern.ask
import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec }
import akka.actor.ActorSystem
object ScatterGatherFirstCompletedSpec {
class TestActor extends Actor {
def receive = { case _ ⇒ }
}
final case class Stop(id: Option[Int] = None)
def newActor(id: Int, shudownLatch: Option[TestLatch] = None)(implicit system: ActorSystem) =
system.actorOf(Props(new Actor {
def receive = {
case Stop(None) ⇒ context.stop(self)
case Stop(Some(_id)) if (_id == id) ⇒ context.stop(self)
case _id: Int if (_id == id) ⇒
case x ⇒ {
Thread sleep 100 * id
sender() ! id
}
}
override def postStop = {
shudownLatch foreach (_.countDown())
}
}), "Actor:" + id)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ScatterGatherFirstCompletedSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
import ScatterGatherFirstCompletedSpec._
"Scatter-gather group" must {
"deliver a broadcast message using the !" in {
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
def receive = {
case "end" ⇒ doneLatch.countDown()
case msg: Int ⇒ counter1.addAndGet(msg)
}
}))
val counter2 = new AtomicInteger
val actor2 = system.actorOf(Props(new Actor {
def receive = {
case "end" ⇒ doneLatch.countDown()
case msg: Int ⇒ counter2.addAndGet(msg)
}
}))
val paths = List(actor1, actor2).map(_.path.toString)
val routedActor = system.actorOf(ScatterGatherFirstCompletedGroup(paths, within = 1.second).props())
routedActor ! Broadcast(1)
routedActor ! Broadcast("end")
Await.ready(doneLatch, TestLatch.DefaultTimeout)
counter1.get should be(1)
counter2.get should be(1)
}
"return response, even if one of the actors has stopped" in {
val shutdownLatch = new TestLatch(1)
val actor1 = newActor(1, Some(shutdownLatch))
val actor2 = newActor(14, Some(shutdownLatch))
val paths = List(actor1, actor2).map(_.path.toString)
val routedActor = system.actorOf(ScatterGatherFirstCompletedGroup(paths, within = 3.seconds).props())
routedActor ! Broadcast(Stop(Some(1)))
Await.ready(shutdownLatch, TestLatch.DefaultTimeout)
Await.result(routedActor ? Broadcast(0), timeout.duration) should be(14)
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ScatterGatherFirstCompletedSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.