alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ActorModelSpec.scala)

This example Akka source code file (ActorModelSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorcell, actormodelmessage, akka, atomiclong, concurrent, countdown, countdownlatch, dispatch, long, messagedispatcherinterceptor, reply, should, test, testing, throwable

The ActorModelSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.actor.dispatch

import language.postfixOps

import java.rmi.RemoteException
import java.util.concurrent.{ TimeUnit, CountDownLatch, ConcurrentHashMap }
import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger }

import org.junit.runner.RunWith
import org.scalatest.Assertions._
import org.scalatest.junit.JUnitRunner

import com.typesafe.config.Config

import akka.actor._
import akka.dispatch.sysmsg._
import akka.dispatch._
import akka.event.Logging.Error
import akka.pattern.ask
import akka.testkit._
import akka.util.Helpers.ConfigOps
import akka.util.Switch
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }
import scala.annotation.tailrec

object ActorModelSpec {

  sealed trait ActorModelMessage extends NoSerializationVerificationNeeded

  final case class TryReply(expect: Any) extends ActorModelMessage

  final case class Reply(expect: Any) extends ActorModelMessage

  final case class Forward(to: ActorRef, msg: Any) extends ActorModelMessage

  final case class CountDown(latch: CountDownLatch) extends ActorModelMessage

  final case class Increment(counter: AtomicLong) extends ActorModelMessage

  final case class AwaitLatch(latch: CountDownLatch) extends ActorModelMessage

  final case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage

  final case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage

  final case class Wait(time: Long) extends ActorModelMessage

  final case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage

  case object Interrupt extends ActorModelMessage

  final case class InterruptNicely(expect: Any) extends ActorModelMessage

  case object Restart extends ActorModelMessage

  case object DoubleStop extends ActorModelMessage

  final case class ThrowException(e: Throwable) extends ActorModelMessage

  val Ping = "Ping"
  val Pong = "Pong"

  class DispatcherActor extends Actor {
    private val busy = new Switch(false)

    def interceptor = context.dispatcher.asInstanceOf[MessageDispatcherInterceptor]

    def ack(): Unit = {
      if (!busy.switchOn(())) {
        throw new Exception("isolation violated")
      } else {
        interceptor.getStats(self).msgsProcessed.incrementAndGet()
      }
    }

    override def postRestart(reason: Throwable) {
      interceptor.getStats(self).restarts.incrementAndGet()
    }

    def receive = {
      case AwaitLatch(latch)            ⇒ { ack(); latch.await(); busy.switchOff(()) }
      case Meet(sign, wait)             ⇒ { ack(); sign.countDown(); wait.await(); busy.switchOff(()) }
      case Wait(time)                   ⇒ { ack(); Thread.sleep(time); busy.switchOff(()) }
      case WaitAck(time, l)             ⇒ { ack(); Thread.sleep(time); l.countDown(); busy.switchOff(()) }
      case Reply(msg)                   ⇒ { ack(); sender() ! msg; busy.switchOff(()) }
      case TryReply(msg)                ⇒ { ack(); sender().tell(msg, null); busy.switchOff(()) }
      case Forward(to, msg)             ⇒ { ack(); to.forward(msg); busy.switchOff(()) }
      case CountDown(latch)             ⇒ { ack(); latch.countDown(); busy.switchOff(()) }
      case Increment(count)             ⇒ { ack(); count.incrementAndGet(); busy.switchOff(()) }
      case CountDownNStop(l)            ⇒ { ack(); l.countDown(); context.stop(self); busy.switchOff(()) }
      case Restart                      ⇒ { ack(); busy.switchOff(()); throw new Exception("Restart requested") }
      case Interrupt                    ⇒ { ack(); sender() ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(()); throw new InterruptedException("Ping!") }
      case InterruptNicely(msg)         ⇒ { ack(); sender() ! msg; busy.switchOff(()); Thread.currentThread().interrupt() }
      case ThrowException(e: Throwable) ⇒ { ack(); busy.switchOff(()); throw e }
      case DoubleStop                   ⇒ { ack(); context.stop(self); context.stop(self); busy.switchOff }
    }
  }

  class InterceptorStats {
    val suspensions = new AtomicLong(0)
    val resumes = new AtomicLong(0)
    val registers = new AtomicLong(0)
    val unregisters = new AtomicLong(0)
    val msgsReceived = new AtomicLong(0)
    val msgsProcessed = new AtomicLong(0)
    val restarts = new AtomicLong(0)
    override def toString = "InterceptorStats(susp=" + suspensions +
      ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
      ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts
  }

  trait MessageDispatcherInterceptor extends MessageDispatcher {
    val stats = new ConcurrentHashMap[ActorRef, InterceptorStats]
    val stops = new AtomicLong(0)

    def getStats(actorRef: ActorRef) = {
      val is = new InterceptorStats
      stats.putIfAbsent(actorRef, is) match {
        case null  ⇒ is
        case other ⇒ other
      }
    }

    protected[akka] abstract override def suspend(actor: ActorCell) {
      getStats(actor.self).suspensions.incrementAndGet()
      super.suspend(actor)
    }

    protected[akka] abstract override def resume(actor: ActorCell) {
      super.resume(actor)
      getStats(actor.self).resumes.incrementAndGet()
    }

    protected[akka] abstract override def register(actor: ActorCell) {
      assert(getStats(actor.self).registers.incrementAndGet() == 1)
      super.register(actor)
    }

    protected[akka] abstract override def unregister(actor: ActorCell) {
      assert(getStats(actor.self).unregisters.incrementAndGet() == 1)
      super.unregister(actor)
    }

    protected[akka] abstract override def dispatch(receiver: ActorCell, invocation: Envelope) {
      val stats = getStats(receiver.self)
      stats.msgsReceived.incrementAndGet()
      super.dispatch(receiver, invocation)
    }

    protected[akka] abstract override def shutdown() {
      stops.incrementAndGet()
      super.shutdown()
    }
  }

  def assertDispatcher(dispatcher: MessageDispatcherInterceptor)(
    stops: Long = dispatcher.stops.get())(implicit system: ActorSystem) {
    val deadline = System.currentTimeMillis + dispatcher.shutdownTimeout.toMillis * 5
    try {
      await(deadline)(stops == dispatcher.stops.get)
    } catch {
      case e: Throwable ⇒
        system.eventStream.publish(Error(e, dispatcher.toString, dispatcher.getClass, "actual: stops=" + dispatcher.stops.get +
          " required: stops=" + stops))
        throw e
    }
  }

  def assertCountDown(latch: CountDownLatch, wait: Long, hint: String) {
    if (!latch.await(wait, TimeUnit.MILLISECONDS))
      fail("Failed to count down within " + wait + " millis (count at " + latch.getCount + "). " + hint)
  }

  def assertNoCountDown(latch: CountDownLatch, wait: Long, hint: String) {
    if (latch.await(wait, TimeUnit.MILLISECONDS))
      fail("Expected count down to fail after " + wait + " millis. " + hint)
  }

  def statsFor(actorRef: ActorRef, dispatcher: MessageDispatcher = null) =
    dispatcher.asInstanceOf[MessageDispatcherInterceptor].getStats(actorRef)

  def assertRefDefaultZero(actorRef: ActorRef, dispatcher: MessageDispatcher = null)(
    suspensions: Long = 0,
    resumes: Long = 0,
    registers: Long = 0,
    unregisters: Long = 0,
    msgsReceived: Long = 0,
    msgsProcessed: Long = 0,
    restarts: Long = 0)(implicit system: ActorSystem) {
    assertRef(actorRef, dispatcher)(
      suspensions,
      resumes,
      registers,
      unregisters,
      msgsReceived,
      msgsProcessed,
      restarts)
  }

  def assertRef(actorRef: ActorRef, dispatcher: MessageDispatcher = null)(
    suspensions: Long = statsFor(actorRef, dispatcher).suspensions.get(),
    resumes: Long = statsFor(actorRef, dispatcher).resumes.get(),
    registers: Long = statsFor(actorRef, dispatcher).registers.get(),
    unregisters: Long = statsFor(actorRef, dispatcher).unregisters.get(),
    msgsReceived: Long = statsFor(actorRef, dispatcher).msgsReceived.get(),
    msgsProcessed: Long = statsFor(actorRef, dispatcher).msgsProcessed.get(),
    restarts: Long = statsFor(actorRef, dispatcher).restarts.get())(implicit system: ActorSystem) {
    val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].dispatcher))
    val deadline = System.currentTimeMillis + 1000
    try {
      await(deadline)(stats.suspensions.get() == suspensions)
      await(deadline)(stats.resumes.get() == resumes)
      await(deadline)(stats.registers.get() == registers)
      await(deadline)(stats.unregisters.get() == unregisters)
      await(deadline)(stats.msgsReceived.get() == msgsReceived)
      await(deadline)(stats.msgsProcessed.get() == msgsProcessed)
      await(deadline)(stats.restarts.get() == restarts)
    } catch {
      case e: Throwable ⇒
        system.eventStream.publish(Error(e,
          Option(dispatcher).toString,
          (Option(dispatcher) getOrElse this).getClass,
          "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
            ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
            ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
        throw e
    }
  }

  @tailrec def await(until: Long)(condition: ⇒ Boolean): Unit =
    if (System.currentTimeMillis() <= until) {
      var done = false
      try {
        done = condition
        if (!done) Thread.sleep(25)
      } catch {
        case e: InterruptedException ⇒
      }
      if (!done) await(until)(condition)
    } else throw new AssertionError("await failed")
}

abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with DefaultTimeout {

  import ActorModelSpec._

  def newTestActor(dispatcher: String) = system.actorOf(Props[DispatcherActor].withDispatcher(dispatcher))

  def awaitStarted(ref: ActorRef): Unit = {
    awaitCond(ref match {
      case r: RepointableRef ⇒ r.isStarted
      case _                 ⇒ true
    }, 1 second, 10 millis)
  }

  protected def interceptedDispatcher(): MessageDispatcherInterceptor
  protected def dispatcherType: String

  "A " + dispatcherType must {

    "dynamically handle its own life cycle" in {
      implicit val dispatcher = interceptedDispatcher()
      assertDispatcher(dispatcher)(stops = 0)
      val a = newTestActor(dispatcher.id)
      assertDispatcher(dispatcher)(stops = 0)
      system.stop(a)
      assertDispatcher(dispatcher)(stops = 1)
      assertRef(a, dispatcher)(
        suspensions = 0,
        resumes = 0,
        registers = 1,
        unregisters = 1,
        msgsReceived = 0,
        msgsProcessed = 0,
        restarts = 0)

      val futures = for (i ← 1 to 10) yield Future {
        i
      }
      assertDispatcher(dispatcher)(stops = 2)

      val a2 = newTestActor(dispatcher.id)
      val futures2 = for (i ← 1 to 10) yield Future { i }

      assertDispatcher(dispatcher)(stops = 2)

      system.stop(a2)
      assertDispatcher(dispatcher)(stops = 3)
    }

    "process messages one at a time" in {
      implicit val dispatcher = interceptedDispatcher()
      val start, oneAtATime = new CountDownLatch(1)
      val a = newTestActor(dispatcher.id)
      awaitStarted(a)

      a ! CountDown(start)
      assertCountDown(start, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")
      assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)

      a ! Wait(1000)
      a ! CountDown(oneAtATime)
      // in case of serialization violation, restart would happen instead of count down
      assertCountDown(oneAtATime, (1.5 seconds).dilated.toMillis, "Processed message when allowed")
      assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)

      system.stop(a)
      assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3)
    }

    "handle queueing from multiple threads" in {
      implicit val dispatcher = interceptedDispatcher()
      val counter = new CountDownLatch(200)
      val a = newTestActor(dispatcher.id)

      for (i ← 1 to 10) {
        spawn {
          for (i ← 1 to 20) {
            a ! WaitAck(1, counter)
          }
        }
      }
      assertCountDown(counter, 3.seconds.dilated.toMillis, "Should process 200 messages")
      assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)

      system.stop(a)
    }

    def spawn(f: ⇒ Unit) {
      (new Thread {
        override def run(): Unit =
          try f catch {
            case e: Throwable ⇒ system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread"))
          }
      }).start()
    }

    "not process messages for a suspended actor" in {
      implicit val dispatcher = interceptedDispatcher()
      val a = newTestActor(dispatcher.id).asInstanceOf[InternalActorRef]
      awaitStarted(a)
      val done = new CountDownLatch(1)
      a.suspend
      a ! CountDown(done)
      assertNoCountDown(done, 1000, "Should not process messages while suspended")
      assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)

      a.resume(causedByFailure = null)
      assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed")
      assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
        suspensions = 1, resumes = 1)

      system.stop(a)
      assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1,
        suspensions = 1, resumes = 1)
    }

    "handle waves of actors" in {
      val dispatcher = interceptedDispatcher()
      val props = Props[DispatcherActor].withDispatcher(dispatcher.id)

      def flood(num: Int) {
        val cachedMessage = CountDownNStop(new CountDownLatch(num))
        val stopLatch = new CountDownLatch(num)
        val keepAliveLatch = new CountDownLatch(1)
        val waitTime = (20 seconds).dilated.toMillis
        val boss = system.actorOf(Props(new Actor {
          def receive = {
            case "run"             ⇒ for (_ ← 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage
            case Terminated(child) ⇒ stopLatch.countDown()
          }
        }).withDispatcher("boss"))
        try {
          // this future is meant to keep the dispatcher alive until the end of the test run even if
          // the boss doesn't create children fast enough to keep the dispatcher from becoming empty
          // and it needs to be on a separate thread to not deadlock the calling thread dispatcher
          new Thread(new Runnable {
            def run() = Future {
              keepAliveLatch.await(waitTime, TimeUnit.MILLISECONDS)
            }(dispatcher)
          }).start()
          boss ! "run"
          try {
            assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num)
          } catch {
            case e: Throwable ⇒
              dispatcher match {
                case dispatcher: BalancingDispatcher ⇒
                  val team = dispatcher.team
                  val mq = dispatcher.messageQueue

                  System.err.println("Teammates left: " + team.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants)
                  team.toArray sorted new Ordering[AnyRef] {
                    def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path }
                  } foreach {
                    case cell: ActorCell ⇒
                      System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " "
                        + cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size)
                  }

                  System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages)
                  Iterator.continually(mq.dequeue) takeWhile (_ ne null) foreach System.err.println
                case _ ⇒
              }

              throw e
          }
          assertCountDown(stopLatch, waitTime, "Expected all children to stop")
        } finally {
          keepAliveLatch.countDown()
          system.stop(boss)
        }
      }
      for (run ← 1 to 3) {
        flood(50000)
        assertDispatcher(dispatcher)(stops = run)
      }
    }

    "continue to process messages when a thread gets interrupted and throws an exception" in {
      filterEvents(
        EventFilter[InterruptedException](),
        EventFilter[ActorInterruptedException](),
        EventFilter[akka.event.Logging.LoggerException]()) {
          implicit val dispatcher = interceptedDispatcher()
          val a = newTestActor(dispatcher.id)
          val f1 = a ? Reply("foo")
          val f2 = a ? Reply("bar")
          val f3 = a ? Interrupt
          Thread.interrupted() // CallingThreadDispatcher may necessitate this
          val f4 = a ? Reply("foo2")
          val f5 = a ? Interrupt
          Thread.interrupted() // CallingThreadDispatcher may necessitate this
          val f6 = a ? Reply("bar2")

          val c = system.scheduler.scheduleOnce(2.seconds) {
            import collection.JavaConverters._
            Thread.getAllStackTraces().asScala foreach {
              case (thread, stack) ⇒
                println(s"$thread:")
                stack foreach (s ⇒ println(s"\t$s"))
            }
          }
          assert(Await.result(f1, timeout.duration) === "foo")
          assert(Await.result(f2, timeout.duration) === "bar")
          assert(Await.result(f4, timeout.duration) === "foo2")
          assert(intercept[ActorInterruptedException](Await.result(f3, timeout.duration)).getCause.getMessage === "Ping!")
          assert(Await.result(f6, timeout.duration) === "bar2")
          assert(intercept[ActorInterruptedException](Await.result(f5, timeout.duration)).getCause.getMessage === "Ping!")
          c.cancel()
          Thread.sleep(300) // give the EventFilters a chance of catching all messages
        }
    }

    "continue to process messages without failure when a thread gets interrupted and doesn't throw an exception" in {
      filterEvents(EventFilter[InterruptedException]()) {
        implicit val dispatcher = interceptedDispatcher()
        val a = newTestActor(dispatcher.id)
        val f1 = a ? Reply("foo")
        val f2 = a ? Reply("bar")
        val f3 = a ? InterruptNicely("baz")
        val f4 = a ? Reply("foo2")
        val f5 = a ? InterruptNicely("baz2")
        val f6 = a ? Reply("bar2")

        assert(Await.result(f1, timeout.duration) === "foo")
        assert(Await.result(f2, timeout.duration) === "bar")
        assert(Await.result(f3, timeout.duration) === "baz")
        assert(Await.result(f4, timeout.duration) === "foo2")
        assert(Await.result(f5, timeout.duration) === "baz2")
        assert(Await.result(f6, timeout.duration) === "bar2")
        // clear the interrupted flag (only needed for the CallingThreadDispatcher) so the next test can continue normally
        Thread.interrupted()
      }
    }

    "continue to process messages when exception is thrown" in {
      filterEvents(EventFilter[IndexOutOfBoundsException](), EventFilter[RemoteException]()) {
        implicit val dispatcher = interceptedDispatcher()
        val a = newTestActor(dispatcher.id)
        val f1 = a ? Reply("foo")
        val f2 = a ? Reply("bar")
        val f3 = a ? ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
        val f4 = a ? Reply("foo2")
        val f5 = a ? ThrowException(new RemoteException("RemoteException"))
        val f6 = a ? Reply("bar2")

        assert(Await.result(f1, timeout.duration) === "foo")
        assert(Await.result(f2, timeout.duration) === "bar")
        assert(Await.result(f4, timeout.duration) === "foo2")
        assert(Await.result(f6, timeout.duration) === "bar2")
        assert(f3.value.isEmpty)
        assert(f5.value.isEmpty)
      }
    }

    "not double-deregister" in {
      implicit val dispatcher = interceptedDispatcher()
      for (i ← 1 to 1000) system.actorOf(Props.empty)
      val a = newTestActor(dispatcher.id)
      a ! DoubleStop
      awaitCond(statsFor(a, dispatcher).registers.get == 1)
      awaitCond(statsFor(a, dispatcher).unregisters.get == 1)
    }
  }
}

object DispatcherModelSpec {
  import ActorModelSpec._

  val config = {
    """
      boss {
        executor = thread-pool-executor
        type = PinnedDispatcher
      }
    """ +
      // use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state
      (for (n ← 1 to 30) yield """
        test-dispatcher-%s {
          type = "akka.actor.dispatch.DispatcherModelSpec$MessageDispatcherInterceptorConfigurator"
        }""".format(n)).mkString
  }

  class MessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
    extends MessageDispatcherConfigurator(config, prerequisites) {

    import akka.util.Helpers.ConfigOps

    private val instance: MessageDispatcher =
      new Dispatcher(this,
        config.getString("id"),
        config.getInt("throughput"),
        config.getNanosDuration("throughput-deadline-time"),
        configureExecutor(),
        config.getMillisDuration("shutdown-timeout")) with MessageDispatcherInterceptor

    override def dispatcher(): MessageDispatcher = instance
  }
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DispatcherModelSpec extends ActorModelSpec(DispatcherModelSpec.config) {
  import ActorModelSpec._

  val dispatcherCount = new AtomicInteger()

  override def interceptedDispatcher(): MessageDispatcherInterceptor = {
    // use new id for each test, since the MessageDispatcherInterceptor holds state
    system.dispatchers.lookup("test-dispatcher-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor]
  }

  override def dispatcherType = "Dispatcher"

  "A " + dispatcherType must {
    "process messages in parallel" in {
      implicit val dispatcher = interceptedDispatcher()
      val aStart, aStop, bParallel = new CountDownLatch(1)
      val a, b = newTestActor(dispatcher.id)

      a ! Meet(aStart, aStop)
      assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")

      b ! CountDown(bParallel)
      assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel")

      aStop.countDown()

      system.stop(a)
      system.stop(b)

      while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination

      assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
      assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
    }
  }
}

object BalancingDispatcherModelSpec {
  import ActorModelSpec._

  // TODO check why throughput=1 here? (came from old test)
  val config = {
    """
      boss {
        executor = thread-pool-executor
        type = PinnedDispatcher
      }
    """ +
      // use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state
      (for (n ← 1 to 30) yield """
        test-balancing-dispatcher-%s {
          type = "akka.actor.dispatch.BalancingDispatcherModelSpec$BalancingMessageDispatcherInterceptorConfigurator"
          throughput=1
        }""".format(n)).mkString
  }

  class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
    extends BalancingDispatcherConfigurator(config, prerequisites) {

    import akka.util.Helpers.ConfigOps

    override protected def create(mailboxType: MailboxType): BalancingDispatcher =
      new BalancingDispatcher(this,
        config.getString("id"),
        config.getInt("throughput"),
        config.getNanosDuration("throughput-deadline-time"),
        mailboxType,
        configureExecutor(),
        config.getMillisDuration("shutdown-timeout"),
        config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor
  }
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class BalancingDispatcherModelSpec extends ActorModelSpec(BalancingDispatcherModelSpec.config) {
  import ActorModelSpec._

  val dispatcherCount = new AtomicInteger()

  override def interceptedDispatcher(): MessageDispatcherInterceptor = {
    // use new id for each test, since the MessageDispatcherInterceptor holds state
    system.dispatchers.lookup("test-balancing-dispatcher-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor]
  }

  override def dispatcherType = "Balancing Dispatcher"

  "A " + dispatcherType must {
    "process messages in parallel" in {
      implicit val dispatcher = interceptedDispatcher()
      val aStart, aStop, bParallel = new CountDownLatch(1)
      val a, b = newTestActor(dispatcher.id)

      a ! Meet(aStart, aStop)
      assertCountDown(aStart, 3.seconds.dilated.toMillis, "Should process first message within 3 seconds")

      b ! CountDown(bParallel)
      assertCountDown(bParallel, 3.seconds.dilated.toMillis, "Should process other actors in parallel")

      aStop.countDown()

      system.stop(a)
      system.stop(b)

      while (!a.isTerminated && !b.isTerminated) {} //Busy wait for termination

      assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
      assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ActorModelSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.