alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Mailbox.scala)

This example Akka source code file (Mailbox.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, akka, boolean, closed, concurrent, dispatch, duration, envelope, int, mailboxtype, messagequeue, option, queue, time, unit, utilities

The Mailbox.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.dispatch

import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import java.util.concurrent._
import akka.AkkaException
import akka.dispatch.sysmsg._
import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter }
import akka.util.{ Unsafe, BoundedBlockingQueue }
import akka.util.Helpers.ConfigOps
import akka.event.Logging.Error
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.annotation.tailrec
import scala.util.control.NonFatal
import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock

/**
 * INTERNAL API
 */
private[akka] object Mailbox {

  type Status = Int

  /*
   * The following assigned numbers CANNOT be changed without looking at the code which uses them!
   */

  // Primary status
  final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant
  final val Closed = 1 // Deliberately without type ascription to make it a compile-time constant
  // Secondary status: Scheduled bit may be added to Open/Suspended
  final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant
  // Shifted by 2: the suspend count!
  final val shouldScheduleMask = 3
  final val shouldNotProcessMask = ~2
  final val suspendMask = ~3
  final val suspendUnit = 4

  // mailbox debugging helper using println (see below)
  // since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
  final val debug = false // Deliberately without type ascription to make it a compile-time constant
}

/**
 * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
 * but can't be exposed to user defined mailbox subclasses.
 *
 * INTERNAL API
 */
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
  extends SystemMessageQueue with Runnable {

  import Mailbox._

  /*
   * This is needed for actually executing the mailbox, i.e. invoking the
   * ActorCell. There are situations (e.g. RepointableActorRef) where a Mailbox
   * is constructed but we know that we will not execute it, in which case this
   * will be null. It must be a var to support switching into an “active”
   * mailbox, should the owning ActorRef turn local.
   *
   * ANOTHER THING, IMPORTANT:
   *
   * actorCell.start() publishes actorCell & self to the dispatcher, which
   * means that messages may be processed theoretically before self’s constructor
   * ends. The JMM guarantees visibility for final fields only after the end
   * of the constructor, so safe publication requires that THIS WRITE BELOW
   * stay as it is.
   */
  @volatile
  var actor: ActorCell = _
  def setActor(cell: ActorCell): Unit = actor = cell

  def dispatcher: MessageDispatcher = actor.dispatcher

  /**
   * Try to enqueue the message to this queue, or throw an exception.
   */
  def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)

  /**
   * Try to dequeue the next message from this queue, return null failing that.
   */
  def dequeue(): Envelope = messageQueue.dequeue()

  /**
   * Indicates whether this queue is non-empty.
   */
  def hasMessages: Boolean = messageQueue.hasMessages

  /**
   * Should return the current number of messages held in this queue; may
   * always return 0 if no other value is available efficiently. Do not use
   * this for testing for presence of messages, use `hasMessages` instead.
   */
  def numberOfMessages: Int = messageQueue.numberOfMessages

  @volatile
  protected var _statusDoNotCallMeDirectly: Status = _ //0 by default

  @volatile
  protected var _systemQueueDoNotCallMeDirectly: SystemMessage = _ //null by default

  @inline
  final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)

  @inline
  final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0

  @inline
  final def suspendCount: Int = status / suspendUnit

  @inline
  final def isSuspended: Boolean = (status & suspendMask) != 0

  @inline
  final def isClosed: Boolean = status == Closed

  @inline
  final def isScheduled: Boolean = (status & Scheduled) != 0

  @inline
  protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
    Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)

  @inline
  protected final def setStatus(newStatus: Status): Unit =
    Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus)

  /**
   * Reduce the suspend count by one. Caller does not need to worry about whether
   * status was Scheduled or not.
   *
   * @return true if the suspend count reached zero
   */
  @tailrec
  final def resume(): Boolean = status match {
    case Closed ⇒
      setStatus(Closed); false
    case s ⇒
      val next = if (s < suspendUnit) s else s - suspendUnit
      if (updateStatus(s, next)) next < suspendUnit
      else resume()
  }

  /**
   * Increment the suspend count by one. Caller does not need to worry about whether
   * status was Scheduled or not.
   *
   * @return true if the previous suspend count was zero
   */
  @tailrec
  final def suspend(): Boolean = status match {
    case Closed ⇒
      setStatus(Closed); false
    case s ⇒
      if (updateStatus(s, s + suspendUnit)) s < suspendUnit
      else suspend()
  }

  /**
   * set new primary status Closed. Caller does not need to worry about whether
   * status was Scheduled or not.
   */
  @tailrec
  final def becomeClosed(): Boolean = status match {
    case Closed ⇒
      setStatus(Closed); false
    case s ⇒ updateStatus(s, Closed) || becomeClosed()
  }

  /**
   * Set Scheduled status, keeping primary status as is.
   */
  @tailrec
  final def setAsScheduled(): Boolean = {
    val s = status
    /*
     * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
     * Scheduled bit already set.
     */
    if ((s & shouldScheduleMask) != Open) false
    else updateStatus(s, s | Scheduled) || setAsScheduled()
  }

  /**
   * Reset Scheduled status, keeping primary status as is.
   */
  @tailrec
  final def setAsIdle(): Boolean = {
    val s = status
    updateStatus(s, s & ~Scheduled) || setAsIdle()
  }
  /*
   * AtomicReferenceFieldUpdater for system queue.
   */
  protected final def systemQueueGet: LatestFirstSystemMessageList =
    // Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such
    // it just exists as a typed view during compile-time. The actual return type is still SystemMessage.
    new LatestFirstSystemMessageList(Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage])

  protected final def systemQueuePut(_old: LatestFirstSystemMessageList, _new: LatestFirstSystemMessageList): Boolean =
    // Note: calling .head is not actually existing on the bytecode level as the parameters _old and _new
    // are SystemMessage instances hidden during compile time behind the SystemMessageList value class.
    // Without calling .head the parameters would be boxed in SystemMessageList wrapper.
    Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head)

  final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
    case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
    case Closed           ⇒ false
    case _                ⇒ hasSystemMessageHint || hasSystemMessages
  }

  final def run = {
    try {
      if (!isClosed) { //Volatile read, needed here
        processAllSystemMessages() //First, deal with any system messages
        processMailbox() //Then deal with messages
      }
    } finally {
      setAsIdle() //Volatile write, needed here
      dispatcher.registerForExecution(this, false, false)
    }
  }

  /**
   * Process the messages in the mailbox
   */
  @tailrec private final def processMailbox(
    left: Int = java.lang.Math.max(dispatcher.throughput, 1),
    deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
    if (shouldProcessMessage) {
      val next = dequeue()
      if (next ne null) {
        if (Mailbox.debug) println(actor.self + " processing message " + next)
        actor invoke next
        if (Thread.interrupted())
          throw new InterruptedException("Interrupted while processing actor messages")
        processAllSystemMessages()
        if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
          processMailbox(left - 1, deadlineNs)
      }
    }

  /**
   * Will at least try to process all queued system messages: in case of
   * failure simply drop and go on to the next, because there is nothing to
   * restart here (failure is in ActorCell somewhere …). In case the mailbox
   * becomes closed (because of processing a Terminate message), dump all
   * already dequeued message to deadLetters.
   */
  final def processAllSystemMessages() {
    var interruption: Throwable = null
    var messageList = systemDrain(SystemMessageList.LNil)
    while ((messageList.nonEmpty) && !isClosed) {
      val msg = messageList.head
      messageList = messageList.tail
      msg.unlink()
      if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs)
      // we know here that systemInvoke ensures that only "fatal" exceptions get rethrown
      actor systemInvoke msg
      if (Thread.interrupted())
        interruption = new InterruptedException("Interrupted while processing system messages")
      // don’t ever execute normal message when system message present!
      if ((messageList.isEmpty) && !isClosed) messageList = systemDrain(SystemMessageList.LNil)
    }
    /*
     * if we closed the mailbox, we must dump the remaining system messages
     * to deadLetters (this is essential for DeathWatch)
     */
    val dlm = actor.dispatcher.mailboxes.deadLetterMailbox
    while (messageList.nonEmpty) {
      val msg = messageList.head
      messageList = messageList.tail
      msg.unlink()
      try dlm.systemEnqueue(actor.self, msg)
      catch {
        case e: InterruptedException ⇒ interruption = e
        case NonFatal(e) ⇒ actor.system.eventStream.publish(
          Error(e, actor.self.path.toString, this.getClass, "error while enqueuing " + msg + " to deadLetters: " + e.getMessage))
      }
    }
    // if we got an interrupted exception while handling system messages, then rethrow it
    if (interruption ne null) {
      Thread.interrupted() // clear interrupted flag before throwing according to java convention
      throw interruption
    }
  }

  /**
   * Overridable callback to clean up the mailbox,
   * called when an actor is unregistered.
   * By default it dequeues all system messages + messages and ships them to the owning actors' systems' DeadLetterMailbox
   */
  protected[dispatch] def cleanUp(): Unit =
    if (actor ne null) { // actor is null for the deadLetterMailbox
      val dlm = actor.dispatcher.mailboxes.deadLetterMailbox
      var messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage))
      while (messageList.nonEmpty) {
        // message must be “virgin” before being able to systemEnqueue again
        val msg = messageList.head
        messageList = messageList.tail
        msg.unlink()
        dlm.systemEnqueue(actor.self, msg)
      }

      if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
        messageQueue.cleanUp(actor.self, actor.dispatcher.mailboxes.deadLetterMailbox.messageQueue)
    }
}

/**
 * A MessageQueue is one of the core components in forming an Akka Mailbox.
 * The MessageQueue is where the normal messages that are sent to Actors will be enqueued (and subsequently dequeued)
 * It needs to at least support N producers and 1 consumer thread-safely.
 */
trait MessageQueue {
  /**
   * Try to enqueue the message to this queue, or throw an exception.
   */
  def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed

  /**
   * Try to dequeue the next message from this queue, return null failing that.
   */
  def dequeue(): Envelope

  /**
   * Should return the current number of messages held in this queue; may
   * always return 0 if no other value is available efficiently. Do not use
   * this for testing for presence of messages, use `hasMessages` instead.
   */
  def numberOfMessages: Int

  /**
   * Indicates whether this queue is non-empty.
   */
  def hasMessages: Boolean

  /**
   * Called when the mailbox this queue belongs to is disposed of. Normally it
   * is expected to transfer all remaining messages into the dead letter queue
   * which is passed in. The owner of this MessageQueue is passed in if
   * available (e.g. for creating DeadLetters()), “/deadletters” otherwise.
   */
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
}

class NodeMessageQueue extends AbstractNodeQueue[Envelope] with MessageQueue with UnboundedMessageQueueSemantics {

  final def enqueue(receiver: ActorRef, handle: Envelope): Unit = add(handle)

  final def dequeue(): Envelope = poll()

  final def numberOfMessages: Int = count()

  final def hasMessages: Boolean = !isEmpty()

  @tailrec final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
    val envelope = dequeue()
    if (envelope ne null) {
      deadLetters.enqueue(owner, envelope)
      cleanUp(owner, deadLetters)
    }
  }
}

/**
 * INTERNAL API
 */
private[akka] trait SystemMessageQueue {
  /**
   * Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list.
   */
  def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit

  /**
   * Dequeue all messages from system queue and return them as single-linked list.
   */
  def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList

  def hasSystemMessages: Boolean
}

/**
 * INTERNAL API
 */
private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒

  @tailrec
  final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
    assert(message.unlinked)
    if (Mailbox.debug) println(receiver + " having enqueued " + message)
    val currentList = systemQueueGet
    if (currentList.head == NoMessage) {
      if (actor ne null) actor.dispatcher.mailboxes.deadLetterMailbox.systemEnqueue(receiver, message)
    } else {
      if (!systemQueuePut(currentList, message :: currentList)) {
        message.unlink()
        systemEnqueue(receiver, message)
      }
    }
  }

  @tailrec
  final def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = {
    val currentList = systemQueueGet
    if (currentList.head == NoMessage) new EarliestFirstSystemMessageList(null)
    else if (systemQueuePut(currentList, newContents)) currentList.reverse
    else systemDrain(newContents)
  }

  def hasSystemMessages: Boolean = systemQueueGet.head match {
    case null | NoMessage ⇒ false
    case _                ⇒ true
  }

}

/**
 * This is a marker trait for message queues which support multiple consumers,
 * as is required by the BalancingDispatcher.
 */
trait MultipleConsumerSemantics

/**
 * A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue.
 */
trait QueueBasedMessageQueue extends MessageQueue with MultipleConsumerSemantics {
  def queue: Queue[Envelope]
  def numberOfMessages = queue.size
  def hasMessages = !queue.isEmpty
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
    if (hasMessages) {
      var envelope = dequeue
      while (envelope ne null) {
        deadLetters.enqueue(owner, envelope)
        envelope = dequeue
      }
    }
  }
}

/**
 * UnboundedMessageQueueSemantics adds unbounded semantics to a QueueBasedMessageQueue,
 * i.e. a non-blocking enqueue and dequeue.
 */
trait UnboundedMessageQueueSemantics

trait UnboundedQueueBasedMessageQueue extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
  def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
  def dequeue(): Envelope = queue.poll()
}

/**
 * BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue,
 * i.e. blocking enqueue with timeout.
 */
trait BoundedMessageQueueSemantics {
  def pushTimeOut: Duration
}

trait BoundedQueueBasedMessageQueue extends QueueBasedMessageQueue with BoundedMessageQueueSemantics {
  override def queue: BlockingQueue[Envelope]

  def enqueue(receiver: ActorRef, handle: Envelope): Unit =
    if (pushTimeOut.length >= 0) {
      if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
        receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
          DeadLetter(handle.message, handle.sender, receiver), handle.sender)
    } else queue put handle

  def dequeue(): Envelope = queue.poll()
}

/**
 * DequeBasedMessageQueue refines QueueBasedMessageQueue to be backed by a java.util.Deque.
 */
trait DequeBasedMessageQueueSemantics {
  def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit
}

trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueueSemantics with UnboundedMessageQueueSemantics

trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueueSemantics with BoundedMessageQueueSemantics

trait DequeBasedMessageQueue extends QueueBasedMessageQueue with DequeBasedMessageQueueSemantics {
  def queue: Deque[Envelope]
}

/**
 * UnboundedDequeBasedMessageQueueSemantics adds unbounded semantics to a DequeBasedMessageQueue,
 * i.e. a non-blocking enqueue and dequeue.
 */
trait UnboundedDequeBasedMessageQueue extends DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
  def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
  def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle
  def dequeue(): Envelope = queue.poll()
}

/**
 * BoundedMessageQueueSemantics adds bounded semantics to a DequeBasedMessageQueue,
 * i.e. blocking enqueue with timeout.
 */
trait BoundedDequeBasedMessageQueue extends DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
  def pushTimeOut: Duration
  override def queue: BlockingDeque[Envelope]

  def enqueue(receiver: ActorRef, handle: Envelope): Unit =
    if (pushTimeOut.length >= 0) {
      if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
        receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
          DeadLetter(handle.message, handle.sender, receiver), handle.sender)
    } else queue put handle

  def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
    if (pushTimeOut.length >= 0) {
      if (!queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit))
        receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
          DeadLetter(handle.message, handle.sender, receiver), handle.sender)
    } else queue putFirst handle

  def dequeue(): Envelope = queue.poll()
}

/**
 * MailboxType is a factory to create MessageQueues for an optionally
 * provided ActorContext.
 *
 * <b>Possibly Important Notice</b>
 *
 * When implementing a custom mailbox type, be aware that there is special
 * semantics attached to `system.actorOf()` in that sending to the returned
 * ActorRef may—for a short period of time—enqueue the messages first in a
 * dummy queue. Top-level actors are created in two steps, and only after the
 * guardian actor has performed that second step will all previously sent
 * messages be transferred from the dummy queue into the real mailbox.
 */
trait MailboxType {
  def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue
}

trait ProducesMessageQueue[T <: MessageQueue]

/**
 * UnboundedMailbox is the default unbounded MailboxType used by Akka Actors.
 */
final case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {

  def this(settings: ActorSystem.Settings, config: Config) = this()

  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new UnboundedMailbox.MessageQueue
}

object UnboundedMailbox {
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
    final def queue: Queue[Envelope] = this
  }
}

/**
 * SingleConsumerOnlyUnboundedMailbox is a high-performance, multiple producer—single consumer, unbounded MailboxType,
 * the only drawback is that you can't have multiple consumers,
 * which rules out using it with BalancingPool (BalancingDispatcher) for instance.
 */
final case class SingleConsumerOnlyUnboundedMailbox() extends MailboxType with ProducesMessageQueue[NodeMessageQueue] {

  def this(settings: ActorSystem.Settings, config: Config) = this()

  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new NodeMessageQueue
}

/**
 * BoundedMailbox is the default bounded MailboxType used by Akka Actors.
 */
final case class BoundedMailbox(val capacity: Int, val pushTimeOut: FiniteDuration)
  extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] {

  def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
    config.getNanosDuration("mailbox-push-timeout-time"))

  if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
  if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")

  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new BoundedMailbox.MessageQueue(capacity, pushTimeOut)
}

object BoundedMailbox {
  class MessageQueue(capacity: Int, final val pushTimeOut: FiniteDuration)
    extends LinkedBlockingQueue[Envelope](capacity) with BoundedQueueBasedMessageQueue {
    final def queue: BlockingQueue[Envelope] = this
  }
}

/**
 * UnboundedPriorityMailbox is an unbounded mailbox that allows for prioritization of its contents.
 * Extend this class and provide the Comparator in the constructor.
 */
class UnboundedPriorityMailbox(val cmp: Comparator[Envelope], val initialCapacity: Int)
  extends MailboxType with ProducesMessageQueue[UnboundedPriorityMailbox.MessageQueue] {
  def this(cmp: Comparator[Envelope]) = this(cmp, 11)
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new UnboundedPriorityMailbox.MessageQueue(initialCapacity, cmp)
}

object UnboundedPriorityMailbox {
  class MessageQueue(initialCapacity: Int, cmp: Comparator[Envelope])
    extends PriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue {
    final def queue: Queue[Envelope] = this
  }
}

/**
 * BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents.
 * Extend this class and provide the Comparator in the constructor.
 */
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration)
  extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] {

  if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
  if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")

  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new BoundedPriorityMailbox.MessageQueue(capacity, cmp, pushTimeOut)
}

object BoundedPriorityMailbox {
  class MessageQueue(capacity: Int, cmp: Comparator[Envelope], val pushTimeOut: Duration)
    extends BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
    with BoundedQueueBasedMessageQueue {
    final def queue: BlockingQueue[Envelope] = this
  }
}

/**
 * UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque.
 */
final case class UnboundedDequeBasedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedDequeBasedMailbox.MessageQueue] {

  def this(settings: ActorSystem.Settings, config: Config) = this()

  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new UnboundedDequeBasedMailbox.MessageQueue
}

object UnboundedDequeBasedMailbox {
  class MessageQueue extends LinkedBlockingDeque[Envelope] with UnboundedDequeBasedMessageQueue {
    final val queue = this
  }
}

/**
 * BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque.
 */
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration)
  extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] {

  def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
    config.getNanosDuration("mailbox-push-timeout-time"))

  if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedDequeBasedMailbox can not be negative")
  if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null")

  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new BoundedDequeBasedMailbox.MessageQueue(capacity, pushTimeOut)
}

object BoundedDequeBasedMailbox {
  class MessageQueue(capacity: Int, val pushTimeOut: FiniteDuration)
    extends LinkedBlockingDeque[Envelope](capacity) with BoundedDequeBasedMessageQueue {
    final val queue = this
  }
}

/**
 * ControlAwareMessageQueue handles messages that extend [[akka.dispatch.ControlMessage]] with priority.
 */
trait ControlAwareMessageQueueSemantics extends QueueBasedMessageQueue {
  def controlQueue: Queue[Envelope]
  def queue: Queue[Envelope]

  def enqueue(receiver: ActorRef, handle: Envelope): Unit = handle match {
    case envelope @ Envelope(_: ControlMessage, _) ⇒ controlQueue add envelope
    case envelope                                  ⇒ queue add envelope
  }

  def dequeue(): Envelope = {
    val controlMsg = controlQueue.poll()

    if (controlMsg ne null) controlMsg
    else queue.poll()
  }

  override def numberOfMessages: Int = controlQueue.size() + queue.size()

  override def hasMessages: Boolean = !(queue.isEmpty && controlQueue.isEmpty)
}

trait UnboundedControlAwareMessageQueueSemantics extends UnboundedMessageQueueSemantics with ControlAwareMessageQueueSemantics
trait BoundedControlAwareMessageQueueSemantics extends BoundedMessageQueueSemantics with ControlAwareMessageQueueSemantics

/**
 * Messages that extend this trait will be handled with priority by control aware mailboxes.
 */
trait ControlMessage

/**
 * UnboundedControlAwareMailbox is an unbounded MailboxType, that maintains two queues
 * to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority.
 */
final case class UnboundedControlAwareMailbox() extends MailboxType with ProducesMessageQueue[UnboundedControlAwareMailbox.MessageQueue] {

  // this constructor will be called via reflection when this mailbox type
  // is used in the application config
  def this(settings: ActorSystem.Settings, config: Config) = this()

  def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new UnboundedControlAwareMailbox.MessageQueue
}

object UnboundedControlAwareMailbox {
  class MessageQueue extends ControlAwareMessageQueueSemantics with UnboundedMessageQueueSemantics {
    val controlQueue: Queue[Envelope] = new ConcurrentLinkedQueue[Envelope]()
    val queue: Queue[Envelope] = new ConcurrentLinkedQueue[Envelope]()
  }
}

/**
 * BoundedControlAwareMailbox is a bounded MailboxType, that maintains two queues
 * to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority.
 */
final case class BoundedControlAwareMailbox(capacity: Int, pushTimeOut: FiniteDuration) extends MailboxType with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] {
  def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
    config.getNanosDuration("mailbox-push-timeout-time"))

  def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new BoundedControlAwareMailbox.MessageQueue(capacity, pushTimeOut)
}

object BoundedControlAwareMailbox {
  class MessageQueue(val capacity: Int, val pushTimeOut: FiniteDuration) extends BoundedControlAwareMessageQueueSemantics {

    private final val size = new AtomicInteger(0)
    private final val putLock = new ReentrantLock()
    private final val notFull = putLock.newCondition()

    // no need to use blocking queues here, as blocking is being handled in `enqueueWithTimeout`
    val controlQueue = new ConcurrentLinkedQueue[Envelope]()
    val queue = new ConcurrentLinkedQueue[Envelope]()

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = handle match {
      case envelope @ Envelope(_: ControlMessage, _) ⇒ enqueueWithTimeout(controlQueue, receiver, envelope)
      case envelope                                  ⇒ enqueueWithTimeout(queue, receiver, envelope)
    }

    override def numberOfMessages: Int = size.get()
    override def hasMessages: Boolean = numberOfMessages > 0

    @tailrec
    final override def dequeue(): Envelope = {
      val count = size.get()

      // if both queues are empty return null
      if (count > 0) {
        // if there are messages try to fetch the current head
        // or retry if other consumer dequeued in the mean time
        if (size.compareAndSet(count, count - 1)) {
          val item = super.dequeue()

          if (size.get < capacity) signalNotFull()

          item
        } else {
          dequeue()
        }
      } else {
        null
      }
    }

    private def signalNotFull() {
      putLock.lock()

      try {
        notFull.signal()
      } finally {
        putLock.unlock()
      }
    }

    private final def enqueueWithTimeout(q: Queue[Envelope], receiver: ActorRef, envelope: Envelope) {
      var remaining = pushTimeOut.toNanos

      putLock.lockInterruptibly()
      val inserted = try {
        var stop = false
        while (size.get() == capacity && !stop) {
          remaining = notFull.awaitNanos(remaining)
          stop = remaining <= 0
        }

        if (stop) {
          false
        } else {
          q.add(envelope)
          val c = size.incrementAndGet()

          if (c < capacity) notFull.signal()

          true
        }
      } finally {
        putLock.unlock()
      }

      if (!inserted) {
        receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
          DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender)
      }
    }
  }
}

/**
 * Trait to signal that an Actor requires a certain type of message queue semantics.
 *
 * The mailbox type will be looked up by mapping the type T via akka.actor.mailbox.requirements in the config,
 * to a mailbox configuration. If no mailbox is assigned on Props or in deployment config then this one will be used.
 *
 * The queue type of the created mailbox will be checked against the type T and actor creation will fail if it doesn't
 * fulfill the requirements.
 */
trait RequiresMessageQueue[T]

Other Akka source code examples

Here is a short list of links related to this Akka Mailbox.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.