|
Akka/Scala example source code file (DispatchersSpec.scala)
The DispatchersSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.actor.dispatch import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.reflect.ClassTag import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.ConfigurationException import akka.actor._ import akka.dispatch._ import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.routing.FromConfig import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicBoolean object DispatchersSpec { val config = """ myapp { mydispatcher { throughput = 17 } thread-pool-dispatcher { executor = thread-pool-executor } my-pinned-dispatcher { executor = thread-pool-executor type = PinnedDispatcher } balancing-dispatcher { type = "akka.dispatch.BalancingDispatcherConfigurator" } mymailbox { mailbox-type = "akka.actor.dispatch.DispatchersSpec$OneShotMailboxType" } } akka.actor.deployment { /echo1 { dispatcher = myapp.mydispatcher } /echo2 { dispatcher = myapp.mydispatcher } /pool1 { router = random-pool nr-of-instances = 3 pool-dispatcher { fork-join-executor.parallelism-min = 3 fork-join-executor.parallelism-max = 3 } } /balanced { router = balancing-pool nr-of-instances = 3 pool-dispatcher { mailbox = myapp.mymailbox fork-join-executor.parallelism-min = 3 fork-join-executor.parallelism-max = 3 } } } """ class ThreadNameEcho extends Actor { def receive = { case _ ⇒ sender() ! Thread.currentThread.getName } } class OneShotMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType with ProducesMessageQueue[DoublingMailbox] { val created = new AtomicBoolean(false) override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = if (created.compareAndSet(false, true)) { new DoublingMailbox(owner) } else throw new IllegalStateException("I've already created the mailbox.") } class DoublingMailbox(owner: Option[ActorRef]) extends UnboundedQueueBasedMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() override def enqueue(receiver: ActorRef, handle: Envelope): Unit = { queue add handle queue add handle } } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSender { import DispatchersSpec._ val df = system.dispatchers import df._ val tipe = "type" val keepalivems = "keep-alive-time" val corepoolsizefactor = "core-pool-size-factor" val maxpoolsizefactor = "max-pool-size-factor" val allowcoretimeout = "allow-core-timeout" val throughput = "throughput" val id = "id" def instance(dispatcher: MessageDispatcher): (MessageDispatcher) ⇒ Boolean = _ == dispatcher def ofType[T <: MessageDispatcher: ClassTag]: (MessageDispatcher) ⇒ Boolean = _.getClass == implicitly[ClassTag[T]].runtimeClass def typesAndValidators: Map[String, (MessageDispatcher) ⇒ Boolean] = Map( "PinnedDispatcher" -> ofType[PinnedDispatcher], "Dispatcher" -> ofType[Dispatcher]) def validTypes = typesAndValidators.keys.toList val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher") lazy val allDispatchers: Map[String, MessageDispatcher] = { validTypes.map(t ⇒ (t, from(ConfigFactory.parseMap(Map(tipe -> t, id -> t).asJava). withFallback(defaultDispatcherConfig)))).toMap } def assertMyDispatcherIsUsed(actor: ActorRef): Unit = { actor ! "what's the name?" val Expected = "(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*)".r expectMsgPF() { case Expected(x) ⇒ } } "Dispatchers" must { "use defined properties" in { val dispatcher = lookup("myapp.mydispatcher") dispatcher.throughput should be(17) } "use specific id" in { val dispatcher = lookup("myapp.mydispatcher") dispatcher.id should be("myapp.mydispatcher") } "complain about missing config" in { intercept[ConfigurationException] { lookup("myapp.other-dispatcher") } } "have only one default dispatcher" in { val dispatcher = lookup(Dispatchers.DefaultDispatcherId) dispatcher should be(defaultGlobalDispatcher) dispatcher should be(system.dispatcher) } "throw ConfigurationException if type does not exist" in { intercept[ConfigurationException] { from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist", id -> "invalid-dispatcher").asJava). withFallback(defaultDispatcherConfig)) } } "get the correct types of dispatchers" in { //All created/obtained dispatchers are of the expeced type/instance assert(typesAndValidators.forall(tuple ⇒ tuple._2(allDispatchers(tuple._1)))) } "provide lookup of dispatchers by id" in { val d1 = lookup("myapp.mydispatcher") val d2 = lookup("myapp.mydispatcher") d1 should be(d2) } "include system name and dispatcher id in thread names for fork-join-executor" in { assertMyDispatcherIsUsed(system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.mydispatcher"))) } "include system name and dispatcher id in thread names for thread-pool-executor" in { system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.thread-pool-dispatcher")) ! "what's the name?" val Expected = "(DispatchersSpec-myapp.thread-pool-dispatcher-[1-9][0-9]*)".r expectMsgPF() { case Expected(x) ⇒ } } "include system name and dispatcher id in thread names for default-dispatcher" in { system.actorOf(Props[ThreadNameEcho]) ! "what's the name?" val Expected = "(DispatchersSpec-akka.actor.default-dispatcher-[1-9][0-9]*)".r expectMsgPF() { case Expected(x) ⇒ } } "include system name and dispatcher id in thread names for pinned dispatcher" in { system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher")) ! "what's the name?" val Expected = "(DispatchersSpec-myapp.my-pinned-dispatcher-[1-9][0-9]*)".r expectMsgPF() { case Expected(x) ⇒ } } "include system name and dispatcher id in thread names for balancing dispatcher" in { system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.balancing-dispatcher")) ! "what's the name?" val Expected = "(DispatchersSpec-myapp.balancing-dispatcher-[1-9][0-9]*)".r expectMsgPF() { case Expected(x) ⇒ } } "use dispatcher in deployment config" in { assertMyDispatcherIsUsed(system.actorOf(Props[ThreadNameEcho], name = "echo1")) } "use dispatcher in deployment config, trumps code" in { assertMyDispatcherIsUsed( system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher"), name = "echo2")) } "use pool-dispatcher router of deployment config" in { val pool = system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "pool1") pool ! Identify(None) val routee = expectMsgType[ActorIdentity].ref.get routee ! "what's the name?" val Expected = """(DispatchersSpec-akka\.actor\.deployment\./pool1\.pool-dispatcher-[1-9][0-9]*)""".r expectMsgPF() { case Expected(x) ⇒ } } "use balancing-pool router with special routees mailbox of deployment config" in { system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "balanced") ! "what's the name?" val Expected = """(DispatchersSpec-BalancingPool-/balanced-[1-9][0-9]*)""".r expectMsgPF() { case Expected(x) ⇒ } expectMsgPF() { case Expected(x) ⇒ } } } } Other Akka source code examplesHere is a short list of links related to this Akka DispatchersSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.