alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (RoutedActorCell.scala)

This example Akka source code file (RoutedActorCell.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorrefroutee, akka, boolean, collection, concurrent, dispatch, internalactorref, pool, routedactorcell, routee, router, routeractor, routerpoolactor, time, unit

The RoutedActorCell.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.routing

import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorCell
import akka.actor.ActorInitializationException
import akka.actor.ActorSystemImpl
import akka.actor.AutoReceivedMessage
import akka.actor.IndirectActorProducer
import akka.actor.InternalActorRef
import akka.actor.Props
import akka.actor.Terminated
import akka.dispatch.Envelope
import akka.dispatch.MessageDispatcher
import akka.actor.ActorContext
import akka.actor.PoisonPill
import akka.actor.SupervisorStrategy
import akka.actor.ActorRef
import akka.actor.ReceiveTimeout
import akka.actor.Identify
import akka.actor.ActorIdentity

/**
 * INTERNAL API
 */
private[akka] object RoutedActorCell {
  class RouterActorCreator(routerConfig: RouterConfig) extends IndirectActorProducer {
    override def actorClass = classOf[RouterActor]
    override def produce() = routerConfig.createRouterActor()
  }

}

/**
 * INTERNAL API
 */
private[akka] class RoutedActorCell(
  _system: ActorSystemImpl,
  _ref: InternalActorRef,
  _routerProps: Props,
  _routerDispatcher: MessageDispatcher,
  val routeeProps: Props,
  _supervisor: InternalActorRef)
  extends ActorCell(_system, _ref, _routerProps, _routerDispatcher, _supervisor) {

  private[akka] val routerConfig = _routerProps.routerConfig

  @volatile
  private var _router: Router = null // initialized in start, and then only updated from the actor
  def router: Router = _router

  def addRoutee(routee: Routee): Unit = addRoutees(List(routee))

  /**
   * Add routees to the `Router`. Messages in flight may still be routed to
   * the old `Router` instance containing the old routees.
   */
  def addRoutees(routees: immutable.Iterable[Routee]): Unit = {
    routees foreach watch
    val r = _router
    _router = r.withRoutees(r.routees ++ routees)
  }

  def removeRoutee(routee: Routee, stopChild: Boolean): Unit =
    removeRoutees(List(routee), stopChild)

  /**
   * Remove routees from the `Router`. Messages in flight may still be routed to
   * the old `Router` instance containing the old routees.
   */
  def removeRoutees(routees: immutable.Iterable[Routee], stopChild: Boolean): Unit = {
    val r = _router
    val newRoutees = routees.foldLeft(r.routees) { (xs, x) ⇒ unwatch(x); xs.filterNot(_ == x) }
    _router = r.withRoutees(newRoutees)
    if (stopChild) routees foreach stopIfChild
  }

  private def watch(routee: Routee): Unit = routee match {
    case ActorRefRoutee(ref) ⇒ watch(ref)
    case _                   ⇒
  }

  private def unwatch(routee: Routee): Unit = routee match {
    case ActorRefRoutee(ref) ⇒ unwatch(ref)
    case _                   ⇒
  }

  private def stopIfChild(routee: Routee): Unit = routee match {
    case ActorRefRoutee(ref) ⇒ child(ref.path.name) match {
      case Some(`ref`) ⇒
        // The reason for the delay is to give concurrent
        // messages a chance to be placed in mailbox before sending PoisonPill,
        // best effort.
        system.scheduler.scheduleOnce(100.milliseconds, ref, PoisonPill)(dispatcher)
      case _ ⇒
    }
    case _ ⇒
  }

  override def start(): this.type = {
    // create the initial routees before scheduling the Router actor
    _router = routerConfig.createRouter(system)
    routerConfig match {
      case pool: Pool ⇒
        if (pool.nrOfInstances > 0)
          addRoutees(Vector.fill(pool.nrOfInstances)(pool.newRoutee(routeeProps, this)))
      case group: Group ⇒
        val paths = group.paths
        if (paths.nonEmpty)
          addRoutees(paths.map(p ⇒ group.routeeFor(p, this))(collection.breakOut))
      case _ ⇒
    }
    preSuperStart()
    super.start()
  }

  /**
   * Called when `router` is initalized but before `super.start()` to
   * be able to do extra initialization in subclass.
   */
  protected def preSuperStart(): Unit = ()

  /*
   * end of construction
   */

  /**
   * Route the message via the router to the selected destination.
   */
  override def sendMessage(envelope: Envelope): Unit = {
    if (routerConfig.isManagementMessage(envelope.message))
      super.sendMessage(envelope)
    else
      router.route(envelope.message, envelope.sender)
  }

}

/**
 * INTERNAL API
 */
private[akka] class RouterActor extends Actor {

  val cell = context match {
    case x: RoutedActorCell ⇒ x
    case _ ⇒
      throw ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass)
  }

  val routingLogicController: Option[ActorRef] = cell.routerConfig.routingLogicController(
    cell.router.logic).map(props ⇒ context.actorOf(props.withDispatcher(context.props.dispatcher),
      name = "routingLogicController"))

  def receive = {
    case GetRoutees ⇒
      sender() ! Routees(cell.router.routees)
    case AddRoutee(routee) ⇒
      cell.addRoutee(routee)
    case RemoveRoutee(routee) ⇒
      cell.removeRoutee(routee, stopChild = true)
      stopIfAllRouteesRemoved()
    case other if routingLogicController.isDefined ⇒
      routingLogicController.foreach(_.forward(other))
  }

  def stopIfAllRouteesRemoved(): Unit =
    if (cell.router.routees.isEmpty && cell.routerConfig.stopRouterWhenAllRouteesRemoved)
      context.stop(self)

  override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
    // do not scrap children
  }
}

/**
 * INTERNAL API
 */
private[akka] class RouterPoolActor(override val supervisorStrategy: SupervisorStrategy) extends RouterActor {

  val pool = cell.routerConfig match {
    case x: Pool ⇒ x
    case other ⇒
      throw ActorInitializationException("RouterPoolActor can only be used with Pool, not " + other.getClass)
  }

  override def receive = ({
    case Terminated(child) ⇒
      cell.removeRoutee(ActorRefRoutee(child), stopChild = false)
      stopIfAllRouteesRemoved()
    case AdjustPoolSize(change: Int) ⇒
      if (change > 0) {
        val newRoutees = Vector.fill(change)(pool.newRoutee(cell.routeeProps, context))
        cell.addRoutees(newRoutees)
      } else if (change < 0) {
        val currentRoutees = cell.router.routees
        val abandon = currentRoutees.drop(currentRoutees.length + change)
        cell.removeRoutees(abandon, stopChild = true)
      }
  }: Actor.Receive) orElse super.receive

}

Other Akka source code examples

Here is a short list of links related to this Akka RoutedActorCell.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.