alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Inbox.scala)

This example Akka source code file (Inbox.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, any, collection, concurrent, deadline, inbox, kick, mutable, none, query, select, some, startwatch, time

The Inbox.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.actor.dsl

import scala.concurrent.Await
import akka.actor.ActorLogging
import scala.collection.immutable.TreeSet
import scala.concurrent.duration._
import akka.actor.Cancellable
import akka.actor.Actor
import scala.collection.mutable.Queue
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.util.Timeout
import akka.actor.Status
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import akka.pattern.ask
import akka.actor.ActorDSL
import akka.actor.Props

/**
 * INTERNAL API
 */
private[akka] object Inbox {

  private sealed trait Query {
    def deadline: Deadline
    def withClient(c: ActorRef): Query
    def client: ActorRef
  }
  private final case class Get(deadline: Deadline, client: ActorRef = null) extends Query {
    def withClient(c: ActorRef) = copy(client = c)
  }
  private final case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null) extends Query {
    def withClient(c: ActorRef) = copy(client = c)
  }
  private final case class StartWatch(target: ActorRef)
  private case object Kick

}

trait Inbox { this: ActorDSL.type ⇒

  import Inbox._

  protected trait InboxExtension { this: Extension ⇒
    val DSLInboxQueueSize = config.getInt("inbox-size")

    val inboxNr = new AtomicInteger
    val inboxProps = Props(classOf[InboxActor], ActorDSL, DSLInboxQueueSize)

    def newReceiver: ActorRef = mkChild(inboxProps, "inbox-" + inboxNr.incrementAndGet)
  }

  private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] {
    def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time
  }

  private class InboxActor(size: Int) extends Actor with ActorLogging {
    var clients = Queue.empty[Query]
    val messages = Queue.empty[Any]
    var clientsByTimeout = TreeSet.empty[Query]
    var printedWarning = false

    def enqueueQuery(q: Query) {
      val query = q withClient sender()
      clients enqueue query
      clientsByTimeout += query
    }

    def enqueueMessage(msg: Any) {
      if (messages.size < size) messages enqueue msg
      else {
        if (!printedWarning) {
          log.warning("dropping message: either your program is buggy or you might want to increase akka.actor.dsl.inbox-size, current value is " + size)
          printedWarning = true
        }
      }
    }

    var currentMsg: Any = _
    val clientPredicate: (Query) ⇒ Boolean = {
      case _: Get          ⇒ true
      case Select(_, p, _) ⇒ p isDefinedAt currentMsg
      case _               ⇒ false
    }

    var currentSelect: Select = _
    val messagePredicate: (Any ⇒ Boolean) = (msg) ⇒ currentSelect.predicate.isDefinedAt(msg)

    var currentDeadline: Option[(Deadline, Cancellable)] = None

    def receive = ({
      case g: Get ⇒
        if (messages.isEmpty) enqueueQuery(g)
        else sender() ! messages.dequeue()
      case s @ Select(_, predicate, _) ⇒
        if (messages.isEmpty) enqueueQuery(s)
        else {
          currentSelect = s
          messages.dequeueFirst(messagePredicate) match {
            case Some(msg) ⇒ sender() ! msg
            case None      ⇒ enqueueQuery(s)
          }
          currentSelect = null
        }
      case StartWatch(target) ⇒ context watch target
      case Kick ⇒
        val now = Deadline.now
        val pred = (q: Query) ⇒ q.deadline.time < now.time
        val overdue = clientsByTimeout.iterator.takeWhile(pred)
        while (overdue.hasNext) {
          val toKick = overdue.next()
          toKick.client ! Status.Failure(new TimeoutException("deadline passed"))
        }
        // TODO: this wants to lose the `Queue.empty ++=` part when SI-6208 is fixed
        clients = Queue.empty ++= clients.filterNot(pred)
        clientsByTimeout = clientsByTimeout.from(Get(now))
      case msg ⇒
        if (clients.isEmpty) enqueueMessage(msg)
        else {
          currentMsg = msg
          clients.dequeueFirst(clientPredicate) match {
            case Some(q) ⇒ { clientsByTimeout -= q; q.client ! msg }
            case None    ⇒ enqueueMessage(msg)
          }
          currentMsg = null
        }
    }: Receive) andThen { _ ⇒
      if (clients.isEmpty) {
        if (currentDeadline.isDefined) {
          currentDeadline.get._2.cancel()
          currentDeadline = None
        }
      } else {
        val next = clientsByTimeout.head.deadline
        import context.dispatcher
        if (currentDeadline.isEmpty) {
          currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
        } else {
          // must not rely on the Scheduler to not fire early (for robustness)
          currentDeadline.get._2.cancel()
          currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
        }
      }
    }
  }

  /*
   * make sure that AskTimeout does not accidentally mess up message reception
   * by adding this extra time to the real timeout
   */
  private val extraTime = 1.minute

  /**
   * Create a new actor which will internally queue up messages it gets so that
   * they can be interrogated with the [[akka.actor.dsl.Inbox!.Inbox!.receive]]
   * and [[akka.actor.dsl.Inbox!.Inbox!.select]] methods. It will be created as
   * a system actor in the ActorSystem which is implicitly (or explicitly)
   * supplied.
   */
  def inbox()(implicit system: ActorSystem): Inbox = new Inbox(system)

  class Inbox(system: ActorSystem) extends akka.actor.Inbox {

    val receiver: ActorRef = Extension(system).newReceiver

    // Java API
    def getRef: ActorRef = receiver
    def send(target: ActorRef, msg: AnyRef): Unit = target.tell(msg, receiver)

    private val defaultTimeout: FiniteDuration = Extension(system).DSLDefaultTimeout

    /**
     * Receive a single message from the internal `receiver` actor. The supplied
     * timeout is used for cleanup purposes and its precision is subject to the
     * resolution of the system’s scheduler (usually 100ms, but configurable).
     *
     * <b>Warning:</b> This method blocks the current thread until a message is
     * received, thus it can introduce dead-locks (directly as well as
     * indirectly by causing starvation of the thread pool). <b>Do not use
     * this method within an actor!</b>
     */
    def receive(timeout: FiniteDuration = defaultTimeout): Any = {
      implicit val t = Timeout(timeout + extraTime)
      Await.result(receiver ? Get(Deadline.now + timeout), Duration.Inf)
    }

    /**
     * Receive a single message for which the given partial function is defined
     * and return the transformed result, using the internal `receiver` actor.
     * The supplied timeout is used for cleanup purposes and its precision is
     * subject to the resolution of the system’s scheduler (usually 100ms, but
     * configurable).
     *
     * <b>Warning:</b> This method blocks the current thread until a message is
     * received, thus it can introduce dead-locks (directly as well as
     * indirectly by causing starvation of the thread pool). <b>Do not use
     * this method within an actor!</b>
     */
    def select[T](timeout: FiniteDuration = defaultTimeout)(predicate: PartialFunction[Any, T]): T = {
      implicit val t = Timeout(timeout + extraTime)
      predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf))
    }

    /**
     * Make the inbox’s actor watch the target actor such that reception of the
     * Terminated message can then be awaited.
     */
    def watch(target: ActorRef): Unit = receiver ! StartWatch(target)

    /**
     * Overridden finalizer which will try to stop the actor once this Inbox
     * is no longer referenced.
     */
    override def finalize() {
      system.stop(receiver)
    }
  }

  implicit def senderFromInbox(implicit inbox: Inbox): ActorRef = inbox.receiver
}

Other Akka source code examples

Here is a short list of links related to this Akka Inbox.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.