alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (DispatchersSpec.scala)

This example Akka source code file (DispatchersSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, boolean, dispatch, dispatchersspec, doublingmailbox, expected, map, messagedispatcher, oneshotmailboxtype, option, pinneddispatcher, reflection, test, testing, unit

The DispatchersSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.actor.dispatch

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.reflect.ClassTag
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.ConfigurationException
import akka.actor._
import akka.dispatch._
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.routing.FromConfig
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean

object DispatchersSpec {
  val config = """
    myapp {
      mydispatcher {
        throughput = 17
      }
      thread-pool-dispatcher {
        executor = thread-pool-executor
      }
      my-pinned-dispatcher {
        executor = thread-pool-executor
        type = PinnedDispatcher
      }
      balancing-dispatcher {
        type = "akka.dispatch.BalancingDispatcherConfigurator"
      }
      mymailbox {
         mailbox-type = "akka.actor.dispatch.DispatchersSpec$OneShotMailboxType"
      }
    }
    akka.actor.deployment {
      /echo1 {
        dispatcher = myapp.mydispatcher
      }
      /echo2 {
        dispatcher = myapp.mydispatcher
      }
      /pool1 {
        router = random-pool
        nr-of-instances = 3
        pool-dispatcher {
          fork-join-executor.parallelism-min = 3
          fork-join-executor.parallelism-max = 3
        }
      }
      /balanced {
        router = balancing-pool
        nr-of-instances = 3
        pool-dispatcher {
          mailbox = myapp.mymailbox
          fork-join-executor.parallelism-min = 3
          fork-join-executor.parallelism-max = 3
        }
      }
    }
    """

  class ThreadNameEcho extends Actor {
    def receive = {
      case _ ⇒ sender() ! Thread.currentThread.getName
    }
  }

  class OneShotMailboxType(settings: ActorSystem.Settings, config: Config)
    extends MailboxType with ProducesMessageQueue[DoublingMailbox] {
    val created = new AtomicBoolean(false)
    override def create(owner: Option[ActorRef], system: Option[ActorSystem]) =
      if (created.compareAndSet(false, true)) {
        new DoublingMailbox(owner)
      } else
        throw new IllegalStateException("I've already created the mailbox.")
  }

  class DoublingMailbox(owner: Option[ActorRef]) extends UnboundedQueueBasedMessageQueue {
    final val queue = new ConcurrentLinkedQueue[Envelope]()
    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
      queue add handle
      queue add handle
    }
  }
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSender {
  import DispatchersSpec._
  val df = system.dispatchers
  import df._

  val tipe = "type"
  val keepalivems = "keep-alive-time"
  val corepoolsizefactor = "core-pool-size-factor"
  val maxpoolsizefactor = "max-pool-size-factor"
  val allowcoretimeout = "allow-core-timeout"
  val throughput = "throughput"
  val id = "id"

  def instance(dispatcher: MessageDispatcher): (MessageDispatcher) ⇒ Boolean = _ == dispatcher
  def ofType[T <: MessageDispatcher: ClassTag]: (MessageDispatcher) ⇒ Boolean = _.getClass == implicitly[ClassTag[T]].runtimeClass

  def typesAndValidators: Map[String, (MessageDispatcher) ⇒ Boolean] = Map(
    "PinnedDispatcher" -> ofType[PinnedDispatcher],
    "Dispatcher" -> ofType[Dispatcher])

  def validTypes = typesAndValidators.keys.toList

  val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher")

  lazy val allDispatchers: Map[String, MessageDispatcher] = {
    validTypes.map(t ⇒ (t, from(ConfigFactory.parseMap(Map(tipe -> t, id -> t).asJava).
      withFallback(defaultDispatcherConfig)))).toMap
  }

  def assertMyDispatcherIsUsed(actor: ActorRef): Unit = {
    actor ! "what's the name?"
    val Expected = "(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*)".r
    expectMsgPF() {
      case Expected(x) ⇒
    }
  }

  "Dispatchers" must {

    "use defined properties" in {
      val dispatcher = lookup("myapp.mydispatcher")
      dispatcher.throughput should be(17)
    }

    "use specific id" in {
      val dispatcher = lookup("myapp.mydispatcher")
      dispatcher.id should be("myapp.mydispatcher")
    }

    "complain about missing config" in {
      intercept[ConfigurationException] {
        lookup("myapp.other-dispatcher")
      }
    }

    "have only one default dispatcher" in {
      val dispatcher = lookup(Dispatchers.DefaultDispatcherId)
      dispatcher should be(defaultGlobalDispatcher)
      dispatcher should be(system.dispatcher)
    }

    "throw ConfigurationException if type does not exist" in {
      intercept[ConfigurationException] {
        from(ConfigFactory.parseMap(Map(tipe -> "typedoesntexist", id -> "invalid-dispatcher").asJava).
          withFallback(defaultDispatcherConfig))
      }
    }

    "get the correct types of dispatchers" in {
      //All created/obtained dispatchers are of the expeced type/instance
      assert(typesAndValidators.forall(tuple ⇒ tuple._2(allDispatchers(tuple._1))))
    }

    "provide lookup of dispatchers by id" in {
      val d1 = lookup("myapp.mydispatcher")
      val d2 = lookup("myapp.mydispatcher")
      d1 should be(d2)
    }

    "include system name and dispatcher id in thread names for fork-join-executor" in {
      assertMyDispatcherIsUsed(system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.mydispatcher")))
    }

    "include system name and dispatcher id in thread names for thread-pool-executor" in {
      system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.thread-pool-dispatcher")) ! "what's the name?"
      val Expected = "(DispatchersSpec-myapp.thread-pool-dispatcher-[1-9][0-9]*)".r
      expectMsgPF() {
        case Expected(x) ⇒
      }
    }

    "include system name and dispatcher id in thread names for default-dispatcher" in {
      system.actorOf(Props[ThreadNameEcho]) ! "what's the name?"
      val Expected = "(DispatchersSpec-akka.actor.default-dispatcher-[1-9][0-9]*)".r
      expectMsgPF() {
        case Expected(x) ⇒
      }
    }

    "include system name and dispatcher id in thread names for pinned dispatcher" in {
      system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher")) ! "what's the name?"
      val Expected = "(DispatchersSpec-myapp.my-pinned-dispatcher-[1-9][0-9]*)".r
      expectMsgPF() {
        case Expected(x) ⇒
      }
    }

    "include system name and dispatcher id in thread names for balancing dispatcher" in {
      system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.balancing-dispatcher")) ! "what's the name?"
      val Expected = "(DispatchersSpec-myapp.balancing-dispatcher-[1-9][0-9]*)".r
      expectMsgPF() {
        case Expected(x) ⇒
      }
    }

    "use dispatcher in deployment config" in {
      assertMyDispatcherIsUsed(system.actorOf(Props[ThreadNameEcho], name = "echo1"))
    }

    "use dispatcher in deployment config, trumps code" in {
      assertMyDispatcherIsUsed(
        system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher"), name = "echo2"))
    }

    "use pool-dispatcher router of deployment config" in {
      val pool = system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "pool1")
      pool ! Identify(None)
      val routee = expectMsgType[ActorIdentity].ref.get
      routee ! "what's the name?"
      val Expected = """(DispatchersSpec-akka\.actor\.deployment\./pool1\.pool-dispatcher-[1-9][0-9]*)""".r
      expectMsgPF() {
        case Expected(x) ⇒
      }
    }

    "use balancing-pool router with special routees mailbox of deployment config" in {
      system.actorOf(FromConfig.props(Props[ThreadNameEcho]), name = "balanced") ! "what's the name?"
      val Expected = """(DispatchersSpec-BalancingPool-/balanced-[1-9][0-9]*)""".r
      expectMsgPF() {
        case Expected(x) ⇒
      }
      expectMsgPF() {
        case Expected(x) ⇒
      }
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka DispatchersSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.