alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (CallingThreadDispatcher.scala)

This example Akka source code file (CallingThreadDispatcher.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorcell, akka, boolean, callingthreaddispatcher, callingthreaddispatcherqueues, callingthreadmailbox, concurrent, dispatch, duration, envelope, interrupted, interruptedexception, messagequeue, testing, time, unit

The CallingThreadDispatcher.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.testkit

import language.postfixOps

import java.lang.ref.WeakReference
import java.util.concurrent.locks.ReentrantLock
import scala.annotation.tailrec
import com.typesafe.config.Config
import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.dispatch.{ MessageQueue, MailboxType, TaskInvocation, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.dispatch.sysmsg.{ SystemMessage, Suspend, Resume }
import scala.concurrent.duration._
import akka.util.Switch
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import java.util.concurrent.TimeUnit

/*
 * Locking rules:
 *
 * Normal messages are always queued thread locally.
 * Processing a queue checks suspendSwitch before each invocation, not processing
 * if suspendSwitch.
 * When resuming an actor, all messages are atomically scooped from all threads and
 * queued on the resuming thread's queue, to be processed immediately.
 * Scooping up messages means replacing the ThreadLocal contents with an empty
 * new MessageQueue.
 *
 * All accesses to the queue must be done under the suspendSwitch-switch's lock, so
 * within one of its methods taking a closure argument.
 *
 * System messages always go directly to the actors SystemMessageQueue which isn't thread local.
 */

private[testkit] object CallingThreadDispatcherQueues extends ExtensionId[CallingThreadDispatcherQueues] with ExtensionIdProvider {
  override def lookup = CallingThreadDispatcherQueues
  override def createExtension(system: ExtendedActorSystem): CallingThreadDispatcherQueues = new CallingThreadDispatcherQueues
}

private[testkit] class CallingThreadDispatcherQueues extends Extension {

  // PRIVATE DATA

  private var queues = Map[CallingThreadMailbox, Set[WeakReference[MessageQueue]]]()
  private var lastGC = 0l

  // we have to forget about long-gone threads sometime
  private def gc(): Unit = {
    queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[MessageQueue]]] /: queues) {
      case (m, (k, v)) ⇒
        val nv = v filter (_.get ne null)
        if (nv.isEmpty) m else m += (k -> nv)
    }.result
  }

  protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: MessageQueue): Unit = synchronized {
    if (queues contains mbox) {
      val newSet = queues(mbox) + new WeakReference(q)
      queues += mbox -> newSet
    } else {
      queues += mbox -> Set(new WeakReference(q))
    }
    val now = System.nanoTime
    if (now - lastGC > 1000000000l) {
      lastGC = now
      gc()
    }
  }

  protected[akka] def unregisterQueues(mbox: CallingThreadMailbox): Unit = synchronized {
    queues -= mbox
  }

  /*
   * This method must be called with "own" being this thread's queue for the
   * given mailbox. When this method returns, the queue will be entered
   * (active).
   */
  protected[akka] def gatherFromAllOtherQueues(mbox: CallingThreadMailbox, own: MessageQueue): Unit = synchronized {
    if (queues contains mbox) {
      for {
        ref ← queues(mbox)
        q = ref.get
        if (q ne null) && (q ne own)
      } {
        val owner = mbox.actor.self
        var msg = q.dequeue()
        while (msg ne null) {
          // this is safe because this method is only ever called while holding the suspendSwitch monitor
          own.enqueue(owner, msg)
          msg = q.dequeue()
        }
      }
    }
  }
}

object CallingThreadDispatcher {
  val Id = "akka.test.calling-thread-dispatcher"
}

/**
 * Dispatcher which runs invocations on the current thread only. This
 * dispatcher does not create any new threads, but it can be used from
 * different threads concurrently for the same actor. The dispatch strategy is
 * to run on the current thread unless the target actor is either suspendSwitch or
 * already running on the current thread (if it is running on a different
 * thread, then this thread will block until that other invocation is
 * finished); if the invocation is not run, it is queued in a thread-local
 * queue to be executed once the active invocation further up the call stack
 * finishes. This leads to completely deterministic execution order if only one
 * thread is used.
 *
 * Suspending and resuming are global actions for one actor, meaning they can
 * affect different threads, which leads to complications. If messages are
 * queued (thread-locally) during the suspendSwitch period, the only thread to run
 * them upon resume is the thread actually calling the resume method. Hence,
 * all thread-local queues which are not currently being drained (possible,
 * since suspend-queue-resume might happen entirely during an invocation on a
 * different thread) are scooped up into the current thread-local queue which
 * is then executed. It is possible to suspend an actor from within its call
 * stack.
 *
 * @since 1.1
 */
class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher(_configurator) {
  import CallingThreadDispatcher._
  import configurator.prerequisites._

  val log = akka.event.Logging(eventStream, "CallingThreadDispatcher")

  override def id: String = Id

  protected[akka] override def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType) =
    new CallingThreadMailbox(actor, mailboxType)

  protected[akka] override def shutdown() {}

  protected[akka] override def throughput = 0
  protected[akka] override def throughputDeadlineTime = Duration.Zero
  protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = false

  protected[akka] override def shutdownTimeout = 1 second

  protected[akka] override def register(actor: ActorCell): Unit = {
    super.register(actor)
    actor.mailbox match {
      case mbox: CallingThreadMailbox ⇒
        val queue = mbox.queue
        runQueue(mbox, queue)
      case x ⇒ throw ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass)
    }
  }

  protected[akka] override def unregister(actor: ActorCell): Unit = {
    val mbox = actor.mailbox match {
      case m: CallingThreadMailbox ⇒ Some(m)
      case _                       ⇒ None
    }
    super.unregister(actor)
    mbox foreach CallingThreadDispatcherQueues(actor.system).unregisterQueues
  }

  protected[akka] override def suspend(actor: ActorCell) {
    actor.mailbox match {
      case m: CallingThreadMailbox ⇒ { m.suspendSwitch.switchOn; m.suspend() }
      case m                       ⇒ m.systemEnqueue(actor.self, Suspend())
    }
  }

  protected[akka] override def resume(actor: ActorCell) {
    actor.mailbox match {
      case mbox: CallingThreadMailbox ⇒
        val queue = mbox.queue
        val switched = mbox.suspendSwitch.switchOff {
          CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(mbox, queue)
          mbox.resume()
        }
        if (switched)
          runQueue(mbox, queue)
      case m ⇒ m.systemEnqueue(actor.self, Resume(causedByFailure = null))
    }
  }

  protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
    receiver.mailbox match {
      case mbox: CallingThreadMailbox ⇒
        mbox.systemEnqueue(receiver.self, message)
        runQueue(mbox, mbox.queue)
      case m ⇒ m.systemEnqueue(receiver.self, message)
    }
  }

  protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) {
    receiver.mailbox match {
      case mbox: CallingThreadMailbox ⇒
        val queue = mbox.queue
        val execute = mbox.suspendSwitch.fold {
          queue.enqueue(receiver.self, handle)
          false
        } {
          queue.enqueue(receiver.self, handle)
          true
        }
        if (execute) runQueue(mbox, queue)
      case m ⇒ m.enqueue(receiver.self, handle)
    }
  }

  protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run }

  /*
   * This method must be called with this thread's queue.
   *
   * If the catch block is executed, then a non-empty mailbox may be stalled as
   * there is no-one who cares to execute it before the next message is sent or
   * it is suspendSwitch and resumed.
   */
  @tailrec
  private def runQueue(mbox: CallingThreadMailbox, queue: MessageQueue, interruptedEx: InterruptedException = null) {
    def checkThreadInterruption(intEx: InterruptedException): InterruptedException = {
      if (Thread.interrupted()) { // clear interrupted flag before we continue, exception will be thrown later
        val ie = new InterruptedException("Interrupted during message processing")
        log.error(ie, "Interrupted during message processing")
        ie
      } else intEx
    }

    def throwInterruptionIfExistsOrSet(intEx: InterruptedException): Unit = {
      val ie = checkThreadInterruption(intEx)
      if (ie ne null) {
        Thread.interrupted() // clear interrupted flag before throwing according to java convention
        throw ie
      }
    }

    @tailrec
    def process(intEx: InterruptedException): InterruptedException = {
      var intex = intEx
      val recurse = {
        mbox.processAllSystemMessages()
        val handle = mbox.suspendSwitch.fold[Envelope](null) {
          if (mbox.isClosed) null else queue.dequeue()
        }
        if (handle ne null) {
          try {
            if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle)
            mbox.actor.invoke(handle)
            intex = checkThreadInterruption(intex)
            true
          } catch {
            case ie: InterruptedException ⇒
              log.error(ie, "Interrupted during message processing")
              Thread.interrupted() // clear interrupted flag before we continue, exception will be thrown later
              intex = ie
              true
            case NonFatal(e) ⇒
              log.error(e, "Error during message processing")
              false
          }
        } else false
      }
      if (recurse) process(intex)
      else intex
    }

    // if we own the lock then we shouldn't do anything since we are processing
    // this actors mailbox at some other level on our call stack
    if (!mbox.ctdLock.isHeldByCurrentThread) {
      var intex = interruptedEx
      val gotLock = try {
        mbox.ctdLock.tryLock(50, TimeUnit.MILLISECONDS)
      } catch {
        case ie: InterruptedException ⇒
          Thread.interrupted() // clear interrupted flag before we continue, exception will be thrown later
          intex = ie
          false
      }
      if (gotLock) {
        val ie = try {
          process(intex)
        } finally {
          mbox.ctdLock.unlock
        }
        throwInterruptionIfExistsOrSet(ie)
      } else {
        // if we didn't get the lock and our mailbox still has messages, then we need to try again
        if (mbox.hasSystemMessages || mbox.hasMessages) {
          runQueue(mbox, queue, intex)
        } else {
          throwInterruptionIfExistsOrSet(intex)
        }
      }
    }
  }
}

class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
  extends MessageDispatcherConfigurator(config, prerequisites) {

  private val instance = new CallingThreadDispatcher(this)

  override def dispatcher(): MessageDispatcher = instance
}

class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxType)
  extends Mailbox(null) with DefaultSystemMessageQueue {

  val system = _receiver.system
  val self = _receiver.self

  private val q = new ThreadLocal[MessageQueue]() {
    override def initialValue = {
      val queue = mailboxType.create(Some(self), Some(system))
      CallingThreadDispatcherQueues(system).registerQueue(CallingThreadMailbox.this, queue)
      queue
    }
  }

  /**
   * This is only a marker to be put in the messageQueue’s stead to make error
   * messages pertaining to violated mailbox type requirements less cryptic.
   */
  override val messageQueue: MessageQueue = q.get

  override def enqueue(receiver: ActorRef, msg: Envelope): Unit = q.get.enqueue(receiver, msg)
  override def dequeue(): Envelope = throw new UnsupportedOperationException("CallingThreadMailbox cannot dequeue normally")
  override def hasMessages: Boolean = q.get.hasMessages
  override def numberOfMessages: Int = 0

  def queue = q.get

  val ctdLock = new ReentrantLock
  val suspendSwitch = new Switch

  override def cleanUp(): Unit = {
    /*
     * This is called from dispatcher.unregister, i.e. under this.lock. If
     * another thread obtained a reference to this mailbox and enqueues after
     * the gather operation, tough luck: no guaranteed delivery to deadLetters.
     */
    suspendSwitch.locked {
      val qq = queue
      CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq)
      super.cleanUp()
      qq.cleanUp(actor.self, actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue)
      q.remove()
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka CallingThreadDispatcher.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.