alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ActorSelection.scala)

This example Akka source code file (ActorSelection.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, actorselection, actorselectionmessage, akka, any, collection, concurrent, scalaactorselection, selectchildname, selectionpathelement, selectparent, serialversionuid, string, time, util, utilities

The ActorSelection.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.actor

import language.implicitConversions
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Failure
import java.util.regex.Pattern
import akka.pattern.ask
import akka.routing.MurmurHash
import akka.util.Helpers
import akka.util.Timeout
import akka.dispatch.ExecutionContexts

/**
 * An ActorSelection is a logical view of a section of an ActorSystem's tree of Actors,
 * allowing for broadcasting of messages to that section.
 */
@SerialVersionUID(1L)
abstract class ActorSelection extends Serializable {
  this: ScalaActorSelection ⇒

  protected[akka] val anchor: ActorRef

  protected val path: immutable.IndexedSeq[SelectionPathElement]

  /**
   * Sends the specified message to the sender, i.e. fire-and-forget
   * semantics, including the sender reference if possible.
   *
   * Pass [[ActorRef#noSender]] or `null` as sender if there is nobody to reply to
   */
  def tell(msg: Any, sender: ActorRef): Unit =
    ActorSelection.deliverSelection(anchor.asInstanceOf[InternalActorRef], sender,
      ActorSelectionMessage(msg, path, wildcardFanOut = false))

  /**
   * Forwards the message and passes the original sender actor as the sender.
   *
   * Works, no matter whether originally sent with tell/'!' or ask/'?'.
   */
  def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender())

  /**
   * Resolve the [[ActorRef]] matching this selection.
   * The result is returned as a Future that is completed with the [[ActorRef]]
   * if such an actor exists. It is completed with failure [[ActorNotFound]] if
   * no such actor exists or the identification didn't complete within the
   * supplied `timeout`.
   *
   * Under the hood it talks to the actor to verify its existence and acquire its
   * [[ActorRef]].
   */
  def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = {
    implicit val ec = ExecutionContexts.sameThreadExecutionContext
    val p = Promise[ActorRef]()
    this.ask(Identify(None)) onComplete {
      case Success(ActorIdentity(_, Some(ref))) ⇒ p.success(ref)
      case _                                    ⇒ p.failure(ActorNotFound(this))
    }
    p.future
  }

  /**
   * Resolve the [[ActorRef]] matching this selection.
   * The result is returned as a Future that is completed with the [[ActorRef]]
   * if such an actor exists. It is completed with failure [[ActorNotFound]] if
   * no such actor exists or the identification didn't complete within the
   * supplied `timeout`.
   *
   * Under the hood it talks to the actor to verify its existence and acquire its
   * [[ActorRef]].
   */
  def resolveOne(timeout: FiniteDuration): Future[ActorRef] = resolveOne()(timeout)

  override def toString: String = {
    val builder = new java.lang.StringBuilder()
    builder.append("ActorSelection[Anchor(").append(anchor.path)
    if (anchor.path.uid != ActorCell.undefinedUid)
      builder.append("#").append(anchor.path.uid)

    builder.append("), Path(").append(path.mkString("/", "/", "")).append(")]")
    builder.toString
  }

  /**
   * The [[akka.actor.ActorPath]] of the anchor actor.
   */
  def anchorPath: ActorPath = anchor.path

  /**
   * String representation of the path elements, starting with "/" and separated with "/".
   */
  def pathString: String = path.mkString("/", "/", "")

  /**
   * String representation of the actor selection suitable for storage and recreation.
   * The output is similar to the URI fragment returned by [[akka.actor.ActorPath.toSerializationFormat]].
   * @return URI fragment
   */
  def toSerializationFormat: String = {
    val anchorPath = anchor match {
      case a: ActorRefWithCell ⇒ anchor.path.toStringWithAddress(a.provider.getDefaultAddress)
      case _                   ⇒ anchor.path.toString
    }

    val builder = new java.lang.StringBuilder()
    builder.append(anchorPath)
    val lastChar = builder.charAt(builder.length - 1)
    if (path.nonEmpty && lastChar != '/')
      builder.append(path.mkString("/", "/", ""))
    else if (path.nonEmpty)
      builder.append(path.mkString("/"))
    builder.toString
  }

  override def equals(obj: Any): Boolean = obj match {
    case s: ActorSelection ⇒ this.anchor == s.anchor && this.path == s.path
    case _                 ⇒ false
  }

  override lazy val hashCode: Int = {
    import MurmurHash._
    var h = startHash(anchor.##)
    h = extendHash(h, path.##, startMagicA, startMagicB)
    finalizeHash(h)
  }
}

/**
 * An ActorSelection is a logical view of a section of an ActorSystem's tree of Actors,
 * allowing for broadcasting of messages to that section.
 */
object ActorSelection {
  //This cast is safe because the self-type of ActorSelection requires that it mixes in ScalaActorSelection
  implicit def toScala(sel: ActorSelection): ScalaActorSelection = sel.asInstanceOf[ScalaActorSelection]

  /**
   * Construct an ActorSelection from the given string representing a path
   * relative to the given target. This operation has to create all the
   * matching magic, so it is preferable to cache its result if the
   * intention is to send messages frequently.
   */
  def apply(anchorRef: ActorRef, path: String): ActorSelection = apply(anchorRef, path.split("/+"))

  /**
   * Construct an ActorSelection from the given string representing a path
   * relative to the given target. This operation has to create all the
   * matching magic, so it is preferable to cache its result if the
   * intention is to send messages frequently.
   */
  def apply(anchorRef: ActorRef, elements: Iterable[String]): ActorSelection = {
    val compiled: immutable.IndexedSeq[SelectionPathElement] = elements.collect({
      case x if !x.isEmpty ⇒
        if ((x.indexOf('?') != -1) || (x.indexOf('*') != -1)) SelectChildPattern(x)
        else if (x == "..") SelectParent
        else SelectChildName(x)
    })(scala.collection.breakOut)
    new ActorSelection with ScalaActorSelection {
      override val anchor = anchorRef
      override val path = compiled
    }
  }

  /**
   * INTERNAL API
   * The receive logic for ActorSelectionMessage. The idea is to recursively descend as far as possible
   * with local refs and hand over to that “foreign” child when we encounter it.
   */
  private[akka] def deliverSelection(anchor: InternalActorRef, sender: ActorRef, sel: ActorSelectionMessage): Unit =
    if (sel.elements.isEmpty)
      anchor.tell(sel.msg, sender)
    else {

      val iter = sel.elements.iterator

      @tailrec def rec(ref: InternalActorRef): Unit = {
        ref match {
          case refWithCell: ActorRefWithCell ⇒

            def emptyRef = new EmptyLocalActorRef(refWithCell.provider, anchor.path / sel.elements.map(_.toString),
              refWithCell.underlying.system.eventStream)

            iter.next() match {
              case SelectParent ⇒
                val parent = ref.getParent
                if (iter.isEmpty)
                  parent.tell(sel.msg, sender)
                else
                  rec(parent)
              case SelectChildName(name) ⇒
                val child = refWithCell.getSingleChild(name)
                if (child == Nobody) {
                  // don't send to emptyRef after wildcard fan-out
                  if (!sel.wildcardFanOut) emptyRef.tell(sel, sender)
                } else if (iter.isEmpty)
                  child.tell(sel.msg, sender)
                else
                  rec(child)
              case p: SelectChildPattern ⇒
                // fan-out when there is a wildcard
                val chldr = refWithCell.children
                if (iter.isEmpty) {
                  // leaf
                  val matchingChildren = chldr.filter(c ⇒ p.pattern.matcher(c.path.name).matches)
                  if (matchingChildren.isEmpty && !sel.wildcardFanOut)
                    emptyRef.tell(sel, sender)
                  else
                    matchingChildren.foreach(_.tell(sel.msg, sender))
                } else {
                  val matchingChildren = chldr.filter(c ⇒ p.pattern.matcher(c.path.name).matches)
                  // don't send to emptyRef after wildcard fan-out 
                  if (matchingChildren.isEmpty && !sel.wildcardFanOut)
                    emptyRef.tell(sel, sender)
                  else {
                    val m = sel.copy(elements = iter.toVector,
                      wildcardFanOut = sel.wildcardFanOut || matchingChildren.size > 1)
                    matchingChildren.foreach(c ⇒ deliverSelection(c.asInstanceOf[InternalActorRef], sender, m))
                  }
                }
            }

          case _ ⇒
            // foreign ref, continue by sending ActorSelectionMessage to it with remaining elements
            ref.tell(sel.copy(elements = iter.toVector), sender)
        }
      }

      rec(anchor)
    }
}

/**
 * Contains the Scala API (!-method) for ActorSelections) which provides automatic tracking of the sender,
 * as per the usual implicit ActorRef pattern.
 */
trait ScalaActorSelection {
  this: ActorSelection ⇒

  def !(msg: Any)(implicit sender: ActorRef = Actor.noSender) = tell(msg, sender)
}

/**
 * INTERNAL API
 * ActorRefFactory.actorSelection returns a ActorSelection which sends these
 * nested path descriptions whenever using ! on them, the idea being that the
 * message is delivered by traversing the various actor paths involved.
 */
@SerialVersionUID(2L) // it has protobuf serialization in akka-remote
private[akka] final case class ActorSelectionMessage(
  msg: Any,
  elements: immutable.Iterable[SelectionPathElement],
  wildcardFanOut: Boolean)
  extends AutoReceivedMessage with PossiblyHarmful {

  def identifyRequest: Option[Identify] = msg match {
    case x: Identify ⇒ Some(x)
    case _           ⇒ None
  }
}

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[akka] sealed trait SelectionPathElement

/**
 * INTERNAL API
 */
@SerialVersionUID(2L)
private[akka] final case class SelectChildName(name: String) extends SelectionPathElement {
  override def toString: String = name
}

/**
 * INTERNAL API
 */
@SerialVersionUID(2L)
private[akka] final case class SelectChildPattern(patternStr: String) extends SelectionPathElement {
  val pattern: Pattern = Helpers.makePattern(patternStr)
  override def toString: String = patternStr
}

/**
 * INTERNAL API
 */
@SerialVersionUID(2L)
private[akka] case object SelectParent extends SelectionPathElement {
  override def toString: String = ".."
}

/**
 * When [[ActorSelection#resolveOne]] can't identify the actor the
 * `Future` is completed with this failure.
 */
@SerialVersionUID(1L)
final case class ActorNotFound(selection: ActorSelection) extends RuntimeException("Actor not found for: " + selection)

Other Akka source code examples

Here is a short list of links related to this Akka ActorSelection.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.