|
Akka/Scala example source code file (DispatcherActorSpec.scala)
The DispatcherActorSpec.scala Akka example source codepackage akka.actor.dispatch import language.postfixOps import java.util.concurrent.{ CountDownLatch, TimeUnit } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import akka.testkit.{ filterEvents, EventFilter, AkkaSpec } import akka.actor.{ Props, Actor } import scala.concurrent.Await import scala.concurrent.duration._ import akka.testkit.DefaultTimeout import akka.dispatch.{ PinnedDispatcher, Dispatchers, Dispatcher } import akka.pattern.ask object DispatcherActorSpec { val config = """ test-dispatcher { } test-throughput-dispatcher { throughput = 101 executor = "thread-pool-executor" thread-pool-executor { core-pool-size-min = 1 core-pool-size-max = 1 } } test-throughput-deadline-dispatcher { throughput = 2 throughput-deadline-time = 100 milliseconds executor = "thread-pool-executor" thread-pool-executor { core-pool-size-min = 1 core-pool-size-max = 1 } } """ class TestActor extends Actor { def receive = { case "Hello" ⇒ sender() ! "World" case "Failure" ⇒ throw new RuntimeException("Expected exception; to test fault-tolerance") } } object OneWayTestActor { val oneWay = new CountDownLatch(1) } class OneWayTestActor extends Actor { def receive = { case "OneWay" ⇒ OneWayTestActor.oneWay.countDown() } } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class DispatcherActorSpec extends AkkaSpec(DispatcherActorSpec.config) with DefaultTimeout { import DispatcherActorSpec._ private val unit = TimeUnit.MILLISECONDS "A Dispatcher and an Actor" must { "support tell" in { val actor = system.actorOf(Props[OneWayTestActor].withDispatcher("test-dispatcher")) val result = actor ! "OneWay" assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS)) system.stop(actor) } "support ask/reply" in { val actor = system.actorOf(Props[TestActor].withDispatcher("test-dispatcher")) assert("World" === Await.result(actor ? "Hello", timeout.duration)) system.stop(actor) } "respect the throughput setting" in { val throughputDispatcher = "test-throughput-dispatcher" val works = new AtomicBoolean(true) val latch = new CountDownLatch(100) val start = new CountDownLatch(1) val fastOne = system.actorOf( Props(new Actor { def receive = { case "sabotage" ⇒ works.set(false) } }) .withDispatcher(throughputDispatcher)) val slowOne = system.actorOf( Props(new Actor { def receive = { case "hogexecutor" ⇒ { sender() ! "OK"; start.await } case "ping" ⇒ if (works.get) latch.countDown() } }).withDispatcher(throughputDispatcher)) assert(Await.result(slowOne ? "hogexecutor", timeout.duration) === "OK") (1 to 100) foreach { _ ⇒ slowOne ! "ping" } fastOne ! "sabotage" start.countDown() latch.await(10, TimeUnit.SECONDS) system.stop(fastOne) system.stop(slowOne) assert(latch.getCount() === 0) } "respect throughput deadline" in { val deadline = 100 millis val throughputDispatcher = "test-throughput-deadline-dispatcher" val works = new AtomicBoolean(true) val latch = new CountDownLatch(1) val start = new CountDownLatch(1) val ready = new CountDownLatch(1) val fastOne = system.actorOf( Props(new Actor { def receive = { case "ping" ⇒ if (works.get) latch.countDown(); context.stop(self) } }).withDispatcher(throughputDispatcher)) val slowOne = system.actorOf( Props(new Actor { def receive = { case "hogexecutor" ⇒ { ready.countDown(); start.await } case "ping" ⇒ { works.set(false); context.stop(self) } } }).withDispatcher(throughputDispatcher)) slowOne ! "hogexecutor" slowOne ! "ping" fastOne ! "ping" assert(ready.await(2, TimeUnit.SECONDS) === true) Thread.sleep(deadline.toMillis + 10) // wait just a bit more than the deadline start.countDown() assert(latch.await(2, TimeUnit.SECONDS) === true) } } } Other Akka source code examplesHere is a short list of links related to this Akka DispatcherActorSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.