alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (DispatcherDocSpec.scala)

This example Akka source code file (DispatcherDocSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, anyref, concurrent, dispatcher, event, myactor, mycontrolmessage, mypriomailbox, poisonpill, requiresmessagequeue, test, testing, throughput, time, what

The DispatcherDocSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package docs.dispatcher

import language.postfixOps

import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.Matchers
import akka.testkit.AkkaSpec
import akka.event.Logging
import akka.event.LoggingAdapter
import scala.concurrent.duration._
import akka.actor._
import docs.dispatcher.DispatcherDocSpec.MyBoundedActor
import akka.dispatch.RequiresMessageQueue

object DispatcherDocSpec {
  val javaConfig = """
     //#prio-dispatcher-config-java
     prio-dispatcher {
       mailbox-type = "docs.dispatcher.DispatcherDocTest$MyPrioMailbox"
       //Other dispatcher configuration goes here
     }
     //#prio-dispatcher-config-java

    //#prio-mailbox-config-java
    prio-mailbox {
      mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
      //Other mailbox configuration goes here
    }
    //#prio-mailbox-config-java

    //#custom-mailbox-config-java
    custom-dispatcher {
      mailbox-requirement =
      "docs.dispatcher.MyUnboundedJMessageQueueSemantics"
    }

    akka.actor.mailbox.requirements {
      "docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
      custom-dispatcher-mailbox
    }

    custom-dispatcher-mailbox {
      mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
    }
    //#custom-mailbox-config-java
  """

  val config = """
    //#my-dispatcher-config
    my-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      # What kind of ExecutionService to use
      executor = "fork-join-executor"
      # Configuration for the fork join pool
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 2
        # Parallelism (threads) ... ceil(available processors * factor)
        parallelism-factor = 2.0
        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 10
      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor.
      # Set to 1 for as fair as possible.
      throughput = 100
    }
    //#my-dispatcher-config

    //#my-thread-pool-dispatcher-config
    my-thread-pool-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      # What kind of ExecutionService to use
      executor = "thread-pool-executor"
      # Configuration for the thread pool
      thread-pool-executor {
        # minimum number of threads to cap factor-based core number to
        core-pool-size-min = 2
        # No of core threads ... ceil(available processors * factor)
        core-pool-size-factor = 2.0
        # maximum number of threads to cap factor-based number to
        core-pool-size-max = 10
      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor.
      # Set to 1 for as fair as possible.
      throughput = 100
    }
    //#my-thread-pool-dispatcher-config

    //#my-pinned-dispatcher-config
    my-pinned-dispatcher {
      executor = "thread-pool-executor"
      type = PinnedDispatcher
    }
    //#my-pinned-dispatcher-config

    //#my-bounded-config
    my-dispatcher-bounded-queue {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        core-pool-size-factor = 8.0
        max-pool-size-factor  = 16.0
      }
      # Specifies the bounded capacity of the message queue
      mailbox-capacity = 100
      throughput = 3
    }
    //#my-bounded-config

    //#prio-dispatcher-config
    prio-dispatcher {
      mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
      //Other dispatcher configuration goes here
    }
    //#prio-dispatcher-config

    //#dispatcher-deployment-config
    akka.actor.deployment {
      /myactor {
        dispatcher = my-dispatcher
      }
    }
    //#dispatcher-deployment-config

    //#prio-mailbox-config
    prio-mailbox {
      mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
      //Other mailbox configuration goes here
    }
    //#prio-mailbox-config

    //#mailbox-deployment-config

    akka.actor.deployment {
      /priomailboxactor {
        mailbox = prio-mailbox
      }
    }
    //#mailbox-deployment-config

    //#bounded-mailbox-config
    bounded-mailbox {
      mailbox-type = "akka.dispatch.BoundedMailbox"
      mailbox-capacity = 1000
      mailbox-push-timeout-time = 10s
    }
    //#bounded-mailbox-config

    //#required-mailbox-config

    akka.actor.mailbox.requirements {
      "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
    }
    //#required-mailbox-config

    //#custom-mailbox-config
    custom-dispatcher {
      mailbox-requirement =
      "docs.dispatcher.MyUnboundedMessageQueueSemantics"
    }

    akka.actor.mailbox.requirements {
      "docs.dispatcher.MyUnboundedMessageQueueSemantics" =
      custom-dispatcher-mailbox
    }

    custom-dispatcher-mailbox {
      mailbox-type = "docs.dispatcher.MyUnboundedMailbox"
    }
    //#custom-mailbox-config

    //#control-aware-mailbox-config
    control-aware-dispatcher {
      mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
      //Other dispatcher configuration goes here
    }
    //#control-aware-mailbox-config

  """

  //#prio-mailbox
  import akka.dispatch.PriorityGenerator
  import akka.dispatch.UnboundedPriorityMailbox
  import com.typesafe.config.Config

  // We inherit, in this case, from UnboundedPriorityMailbox
  // and seed it with the priority generator
  class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
    extends UnboundedPriorityMailbox(
      // Create a new PriorityGenerator, lower prio means more important
      PriorityGenerator {
        // 'highpriority messages should be treated first if possible
        case 'highpriority => 0

        // 'lowpriority messages should be treated last if possible
        case 'lowpriority  => 2

        // PoisonPill when no other left
        case PoisonPill    => 3

        // We default to 1, which is in between high and low
        case otherwise     => 1
      })
  //#prio-mailbox

  //#control-aware-mailbox-messages
  import akka.dispatch.ControlMessage

  case object MyControlMessage extends ControlMessage
  //#control-aware-mailbox-messages

  class MyActor extends Actor {
    def receive = {
      case x =>
    }
  }

  //#required-mailbox-class
  import akka.dispatch.RequiresMessageQueue
  import akka.dispatch.BoundedMessageQueueSemantics

  class MyBoundedActor extends MyActor
    with RequiresMessageQueue[BoundedMessageQueueSemantics]
  //#required-mailbox-class

  //#require-mailbox-on-actor
  class MySpecialActor extends Actor
    with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] {
    //#require-mailbox-on-actor
    def receive = {
      case _ =>
    }
    //#require-mailbox-on-actor
    // ...
  }
  //#require-mailbox-on-actor
}

class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {

  import DispatcherDocSpec._

  "defining dispatcher in config" in {
    val context = system
    //#defining-dispatcher-in-config
    import akka.actor.Props
    val myActor = context.actorOf(Props[MyActor], "myactor")
    //#defining-dispatcher-in-config
  }

  "defining dispatcher in code" in {
    val context = system
    //#defining-dispatcher-in-code
    import akka.actor.Props
    val myActor =
      context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")
    //#defining-dispatcher-in-code
  }

  "defining dispatcher with bounded queue" in {
    val dispatcher = system.dispatchers.lookup("my-dispatcher-bounded-queue")
  }

  "defining pinned dispatcher" in {
    val context = system
    //#defining-pinned-dispatcher
    val myActor =
      context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2")
    //#defining-pinned-dispatcher
  }

  "looking up a dispatcher" in {
    //#lookup
    // for use with Futures, Scheduler, etc.
    implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
    //#lookup
  }

  "defining mailbox in config" in {
    val context = system
    //#defining-mailbox-in-config
    import akka.actor.Props
    val myActor = context.actorOf(Props[MyActor], "priomailboxactor")
    //#defining-mailbox-in-config
  }

  "defining mailbox in code" in {
    val context = system
    //#defining-mailbox-in-code
    import akka.actor.Props
    val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))
    //#defining-mailbox-in-code
  }

  "using a required mailbox" in {
    val context = system
    val myActor = context.actorOf(Props[MyBoundedActor])
  }

  "defining priority dispatcher" in {
    new AnyRef {
      //#prio-dispatcher

      // We create a new Actor that just prints out what it processes
      class Logger extends Actor {
        val log: LoggingAdapter = Logging(context.system, this)

        self ! 'lowpriority
        self ! 'lowpriority
        self ! 'highpriority
        self ! 'pigdog
        self ! 'pigdog2
        self ! 'pigdog3
        self ! 'highpriority
        self ! PoisonPill

        def receive = {
          case x => log.info(x.toString)
        }
      }
      val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
        "prio-dispatcher"))

      /*
       * Logs:
       * 'highpriority
       * 'highpriority
       * 'pigdog
       * 'pigdog2
       * 'pigdog3
       * 'lowpriority
       * 'lowpriority
       */
      //#prio-dispatcher

      watch(a)
      expectMsgPF() { case Terminated(`a`) => () }
    }
  }

  "defining control aware dispatcher" in {
    new AnyRef {
      //#control-aware-dispatcher

      // We create a new Actor that just prints out what it processes
      class Logger extends Actor {
        val log: LoggingAdapter = Logging(context.system, this)

        self ! 'foo
        self ! 'bar
        self ! MyControlMessage
        self ! PoisonPill

        def receive = {
          case x => log.info(x.toString)
        }
      }
      val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
        "control-aware-dispatcher"))

      /*
       * Logs:
       * MyControlMessage
       * 'foo
       * 'bar
       */
      //#control-aware-dispatcher

      watch(a)
      expectMsgPF() { case Terminated(`a`) => () }
    }
  }

  "require custom mailbox on dispatcher" in {
    val myActor = system.actorOf(Props[MyActor].withDispatcher(
      "custom-dispatcher"))
  }

  "require custom mailbox on actor" in {
    val myActor = system.actorOf(Props[MySpecialActor])
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka DispatcherDocSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.