alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Router.scala)

This example Akka source code file (Router.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, actorselection, akka, any, collection, noroutee, route, routee, router, routerenvelope, routing, routinglogic, serialversionuid, unit

The Router.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.routing

import scala.collection.immutable
import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.InternalActorRef
import akka.japi.Util.immutableSeq
import akka.actor.NoSerializationVerificationNeeded

/**
 * The interface of the routing logic that is used in a [[Router]] to select
 * destination routed messages.
 *
 * The implementation must be thread safe.
 */
trait RoutingLogic extends NoSerializationVerificationNeeded {
  /**
   * Pick the destination for a given message. Normally it picks one of the
   * passed `routees`, but in the end it is up to the implementation to
   * return whatever [[Routee]] to use for sending a specific message.
   *
   * When implemented from Java it can be good to know that
   * `routees.apply(index)` can be used to get an element
   * from the `IndexedSeq`.
   */
  def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee

}

/**
 * Abstraction of a destination for messages routed via a [[Router]].
 */
trait Routee {
  def send(message: Any, sender: ActorRef): Unit
}

/**
 * [[Routee]] that sends the messages to an [[akka.actor.ActorRef]].
 */
final case class ActorRefRoutee(ref: ActorRef) extends Routee {
  override def send(message: Any, sender: ActorRef): Unit =
    ref.tell(message, sender)
}

/**
 * [[Routee]] that sends the messages to an [[akka.actor.ActorSelection]].
 */
final case class ActorSelectionRoutee(selection: ActorSelection) extends Routee {
  override def send(message: Any, sender: ActorRef): Unit =
    selection.tell(message, sender)
}

/**
 * [[Routee]] that doesn't send the message to any routee.
 * The [[Router]] will send the message to `deadLetters` if
 * `NoRoutee` is returned from [[RoutingLogic#select]]
 */
object NoRoutee extends Routee {
  override def send(message: Any, sender: ActorRef): Unit = ()
}

/**
 * [[Routee]] that sends each message to all `routees`.
 */
final case class SeveralRoutees(routees: immutable.IndexedSeq[Routee]) extends Routee {

  /**
   * Java API
   */
  def this(rs: java.lang.Iterable[Routee]) = this(routees = immutableSeq(rs).toVector)

  /**
   * Java API
   */
  def getRoutees(): java.util.List[Routee] = {
    import scala.collection.JavaConverters._
    routees.asJava
  }

  override def send(message: Any, sender: ActorRef): Unit =
    routees.foreach(_.send(message, sender))
}

/**
 * For each message that is sent through the router via the [[#route]] method the
 * [[RoutingLogic]] decides to which [[Routee]] to send the message. The [[Routee]] itself
 * knows how to perform the actual sending. Normally the [[RoutingLogic]] picks one of the
 * contained `routees`, but that is up to the implementation of the [[RoutingLogic]].
 *
 * A `Router` is immutable and the [[RoutingLogic]] must be thread safe.
 */
final case class Router(val logic: RoutingLogic, val routees: immutable.IndexedSeq[Routee] = Vector.empty) {

  /**
   * Java API
   */
  def this(logic: RoutingLogic) = this(logic, Vector.empty)

  /**
   * Java API
   */
  def this(logic: RoutingLogic, routees: java.lang.Iterable[Routee]) = this(logic, immutableSeq(routees).toVector)

  /**
   * Send the message to the destination [[Routee]] selected by the [[RoutingLogic]].
   * If the message is a [[akka.routing.RouterEnvelope]] it will be unwrapped
   * before sent to the destinations.
   * Messages wrapped in a [[Broadcast]] envelope are always sent to all `routees`.
   */
  def route(message: Any, sender: ActorRef): Unit =
    message match {
      case akka.routing.Broadcast(msg) ⇒ SeveralRoutees(routees).send(msg, sender)
      case msg                         ⇒ send(logic.select(msg, routees), message, sender)
    }

  private def send(routee: Routee, msg: Any, sender: ActorRef): Unit = {
    if (routee == NoRoutee && sender.isInstanceOf[InternalActorRef])
      sender.asInstanceOf[InternalActorRef].provider.deadLetters.tell(unwrap(msg), sender)
    else
      routee.send(unwrap(msg), sender)
  }

  private def unwrap(msg: Any): Any = msg match {
    case env: RouterEnvelope ⇒ env.message
    case _                   ⇒ msg
  }

  /**
   * Create a new instance with the specified routees and the same [[RoutingLogic]].
   */
  def withRoutees(rs: immutable.IndexedSeq[Routee]): Router = copy(routees = rs)

  /**
   * Create a new instance with one more routee and the same [[RoutingLogic]].
   */
  def addRoutee(routee: Routee): Router = copy(routees = routees :+ routee)

  /**
   * Create a new instance with one more [[ActorRefRoutee]] for the
   * specified [[akka.actor.ActorRef]] and the same [[RoutingLogic]].
   */
  def addRoutee(ref: ActorRef): Router = addRoutee(ActorRefRoutee(ref))

  /**
   * Create a new instance with one more [[ActorSelectionRoutee]] for the
   * specified [[akka.actor.ActorSelection]] and the same [[RoutingLogic]].
   */
  def addRoutee(sel: ActorSelection): Router = addRoutee(ActorSelectionRoutee(sel))

  /**
   * Create a new instance without the specified routee.
   */
  def removeRoutee(routee: Routee): Router = copy(routees = routees.filterNot(_ == routee))

  /**
   * Create a new instance without the [[ActorRefRoutee]] for the specified
   * [[akka.actor.ActorRef]].
   */
  def removeRoutee(ref: ActorRef): Router = removeRoutee(ActorRefRoutee(ref))

  /**
   * Create a new instance without the [[ActorSelectionRoutee]] for the specified
   * [[akka.actor.ActorSelection]].
   */
  def removeRoutee(sel: ActorSelection): Router = removeRoutee(ActorSelectionRoutee(sel))

}

/**
 * Used to broadcast a message to all routees in a router; only the
 * contained message will be forwarded, i.e. the `Broadcast(...)`
 * envelope will be stripped off.
 */
@SerialVersionUID(1L)
final case class Broadcast(message: Any) extends RouterEnvelope

/**
 * Only the contained message will be forwarded to the
 * destination, i.e. the envelope will be stripped off.
 */
trait RouterEnvelope {
  def message: Any
}

Other Akka source code examples

Here is a short list of links related to this Akka Router.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.