alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Resizer.scala)

This example Akka source code file (Resizer.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, boolean, concurrent, defaultresizer, dispatch, double, duration, illegalargumentexception, int, internalactorref, props, resizablepoolcell, resizer, time, unit

The Resizer.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.routing

import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration

import com.typesafe.config.Config

import akka.actor.Actor
import akka.actor.ActorCell
import akka.actor.ActorInitializationException
import akka.actor.ActorRefWithCell
import akka.actor.ActorSystemImpl
import akka.actor.InternalActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.dispatch.Envelope
import akka.dispatch.MessageDispatcher
import java.lang.{ Double ⇒ JDouble }

/**
 * [[Pool]] routers with dynamically resizable number of routees are implemented by providing a Resizer
 * implementation in the [[akka.routing.Pool]] configuration.
 */
trait Resizer {
  /**
   * Is it time for resizing. Typically implemented with modulo of nth message, but
   * could be based on elapsed time or something else. The messageCounter starts with 0
   * for the initial resize and continues with 1 for the first message. Make sure to perform
   * initial resize before first message (messageCounter == 0), because there is no guarantee
   * that resize will be done when concurrent messages are in play.
   *
   * CAUTION: this method is invoked from the thread which tries to send a
   * message to the pool, i.e. the ActorRef.!() method, hence it may be called
   * concurrently.
   */
  def isTimeForResize(messageCounter: Long): Boolean

  /**
   * Decide if the capacity of the router need to be changed. Will be invoked when `isTimeForResize`
   * returns true and no other resize is in progress.
   *
   * Return the number of routees to add or remove. Negative value will remove that number of routees.
   * Positive value will add that number of routess. 0 will not change the routees.
   *
   * This method is invoked only in the context of the Router actor.
   */
  def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int
}

case object DefaultResizer {

  /**
   * Creates a new DefaultResizer from the given configuration
   */
  def apply(resizerConfig: Config): DefaultResizer =
    DefaultResizer(
      lowerBound = resizerConfig.getInt("lower-bound"),
      upperBound = resizerConfig.getInt("upper-bound"),
      pressureThreshold = resizerConfig.getInt("pressure-threshold"),
      rampupRate = resizerConfig.getDouble("rampup-rate"),
      backoffThreshold = resizerConfig.getDouble("backoff-threshold"),
      backoffRate = resizerConfig.getDouble("backoff-rate"),
      messagesPerResize = resizerConfig.getInt("messages-per-resize"))

  def fromConfig(resizerConfig: Config): Option[DefaultResizer] =
    if (resizerConfig.getBoolean("resizer.enabled"))
      Some(DefaultResizer(resizerConfig.getConfig("resizer")))
    else
      None
}

/**
 * Implementation of [[Resizer]] that adjust the [[Pool]] based on specified
 * thresholds.
 */
@SerialVersionUID(1L)
case class DefaultResizer(
  /**
   * The fewest number of routees the router should ever have.
   */
  val lowerBound: Int = 1,
  /**
 * The most number of routees the router should ever have.
 * Must be greater than or equal to `lowerBound`.
 */
  val upperBound: Int = 10,
  /**
 * Threshold to evaluate if routee is considered to be busy (under pressure).
 * Implementation depends on this value (default is 1).
 * <ul>
 * <li> 0:   number of routees currently processing a message.</li>
 * <li> 1:   number of routees currently processing a message has
 *           some messages in mailbox.</li>
 * <li> > 1: number of routees with at least the configured `pressureThreshold`
 *           messages in their mailbox. Note that estimating mailbox size of
 *           default UnboundedMailbox is O(N) operation.</li>
 * </ul>
 */
  val pressureThreshold: Int = 1,
  /**
 * Percentage to increase capacity whenever all routees are busy.
 * For example, 0.2 would increase 20% (rounded up), i.e. if current
 * capacity is 6 it will request an increase of 2 more routees.
 */
  val rampupRate: Double = 0.2,
  /**
 * Minimum fraction of busy routees before backing off.
 * For example, if this is 0.3, then we'll remove some routees only when
 * less than 30% of routees are busy, i.e. if current capacity is 10 and
 * 3 are busy then the capacity is unchanged, but if 2 or less are busy
 * the capacity is decreased.
 *
 * Use 0.0 or negative to avoid removal of routees.
 */
  val backoffThreshold: Double = 0.3,
  /**
 * Fraction of routees to be removed when the resizer reaches the
 * backoffThreshold.
 * For example, 0.1 would decrease 10% (rounded up), i.e. if current
 * capacity is 9 it will request an decrease of 1 routee.
 */
  val backoffRate: Double = 0.1,
  /**
 * Number of messages between resize operation.
 * Use 1 to resize before each message.
 */
  val messagesPerResize: Int = 10) extends Resizer {

  /**
   * Java API constructor for default values except bounds.
   */
  def this(lower: Int, upper: Int) = this(lowerBound = lower, upperBound = upper)

  if (lowerBound < 0) throw new IllegalArgumentException("lowerBound must be >= 0, was: [%s]".format(lowerBound))
  if (upperBound < 0) throw new IllegalArgumentException("upperBound must be >= 0, was: [%s]".format(upperBound))
  if (upperBound < lowerBound) throw new IllegalArgumentException("upperBound must be >= lowerBound, was: [%s] < [%s]".format(upperBound, lowerBound))
  if (rampupRate < 0.0) throw new IllegalArgumentException("rampupRate must be >= 0.0, was [%s]".format(rampupRate))
  if (backoffThreshold > 1.0) throw new IllegalArgumentException("backoffThreshold must be <= 1.0, was [%s]".format(backoffThreshold))
  if (backoffRate < 0.0) throw new IllegalArgumentException("backoffRate must be >= 0.0, was [%s]".format(backoffRate))
  if (messagesPerResize <= 0) throw new IllegalArgumentException("messagesPerResize must be > 0, was [%s]".format(messagesPerResize))

  def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % messagesPerResize == 0)

  override def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int =
    capacity(currentRoutees)

  /**
   * Returns the overall desired change in resizer capacity. Positive value will
   * add routees to the resizer. Negative value will remove routees from the
   * resizer.
   * @param routees The current actor in the resizer
   * @return the number of routees by which the resizer should be adjusted (positive, negative or zero)
   */
  def capacity(routees: immutable.IndexedSeq[Routee]): Int = {
    val currentSize = routees.size
    val press = pressure(routees)
    val delta = filter(press, currentSize)
    val proposed = currentSize + delta

    if (proposed < lowerBound) delta + (lowerBound - proposed)
    else if (proposed > upperBound) delta - (proposed - upperBound)
    else delta
  }

  /**
   * Number of routees considered busy, or above 'pressure level'.
   *
   * Implementation depends on the value of `pressureThreshold`
   * (default is 1).
   * <ul>
   * <li> 0:   number of routees currently processing a message.</li>
   * <li> 1:   number of routees currently processing a message has
   *           some messages in mailbox.</li>
   * <li> > 1: number of routees with at least the configured `pressureThreshold`
   *           messages in their mailbox. Note that estimating mailbox size of
   *           default UnboundedMailbox is O(N) operation.</li>
   * </ul>
   *
   * @param routees the current resizer of routees
   * @return number of busy routees, between 0 and routees.size
   */
  def pressure(routees: immutable.IndexedSeq[Routee]): Int = {
    routees count {
      case ActorRefRoutee(a: ActorRefWithCell) ⇒
        a.underlying match {
          case cell: ActorCell ⇒
            pressureThreshold match {
              case 1          ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages
              case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null
              case threshold  ⇒ cell.mailbox.numberOfMessages >= threshold
            }
          case cell ⇒
            pressureThreshold match {
              case 1          ⇒ cell.hasMessages
              case i if i < 1 ⇒ true // unstarted cells are always busy, for example
              case threshold  ⇒ cell.numberOfMessages >= threshold
            }
        }
      case x ⇒
        false
    }
  }

  /**
   * This method can be used to smooth the capacity delta by considering
   * the current pressure and current capacity.
   *
   * @param pressure current number of busy routees
   * @param capacity current number of routees
   * @return proposed change in the capacity
   */
  def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity) + backoff(pressure, capacity)

  /**
   * Computes a proposed positive (or zero) capacity delta using
   * the configured `rampupRate`.
   * @param pressure the current number of busy routees
   * @param capacity the current number of total routees
   * @return proposed increase in capacity
   */
  def rampup(pressure: Int, capacity: Int): Int =
    if (pressure < capacity) 0 else math.ceil(rampupRate * capacity).toInt

  /**
   * Computes a proposed negative (or zero) capacity delta using
   * the configured `backoffThreshold` and `backoffRate`
   * @param pressure the current number of busy routees
   * @param capacity the current number of total routees
   * @return proposed decrease in capacity (as a negative number)
   */
  def backoff(pressure: Int, capacity: Int): Int =
    if (backoffThreshold > 0.0 && backoffRate > 0.0 && capacity > 0 && pressure.toDouble / capacity < backoffThreshold)
      math.floor(-1.0 * backoffRate * capacity).toInt
    else 0
}

/**
 * INTERNAL API
 */
private[akka] final class ResizablePoolCell(
  _system: ActorSystemImpl,
  _ref: InternalActorRef,
  _routerProps: Props,
  _routerDispatcher: MessageDispatcher,
  _routeeProps: Props,
  _supervisor: InternalActorRef,
  val pool: Pool)
  extends RoutedActorCell(_system, _ref, _routerProps, _routerDispatcher, _routeeProps, _supervisor) {

  require(pool.resizer.isDefined, "RouterConfig must be a Pool with defined resizer")
  val resizer = pool.resizer.get
  private val resizeInProgress = new AtomicBoolean
  private val resizeCounter = new AtomicLong

  override protected def preSuperStart(): Unit = {
    // initial resize, before message send
    if (resizer.isTimeForResize(resizeCounter.getAndIncrement())) {
      resize(initial = true)
    }
  }

  override def sendMessage(envelope: Envelope): Unit = {
    if (!routerConfig.isManagementMessage(envelope.message) &&
      resizer.isTimeForResize(resizeCounter.getAndIncrement()) && resizeInProgress.compareAndSet(false, true)) {
      super.sendMessage(Envelope(ResizablePoolActor.Resize, self, system))
    }
    super.sendMessage(envelope)
  }

  private[akka] def resize(initial: Boolean): Unit = {
    if (resizeInProgress.get || initial) try {
      val requestedCapacity = resizer.resize(router.routees)
      if (requestedCapacity > 0) {
        val newRoutees = Vector.fill(requestedCapacity)(pool.newRoutee(routeeProps, this))
        addRoutees(newRoutees)
      } else if (requestedCapacity < 0) {
        val currentRoutees = router.routees
        val abandon = currentRoutees.drop(currentRoutees.length + requestedCapacity)
        removeRoutees(abandon, stopChild = true)
      }
    } finally resizeInProgress.set(false)
  }

}

/**
 * INTERNAL API
 */
private[akka] object ResizablePoolActor {
  case object Resize extends RouterManagementMesssage
}

/**
 * INTERNAL API
 */
private[akka] class ResizablePoolActor(supervisorStrategy: SupervisorStrategy)
  extends RouterPoolActor(supervisorStrategy) {
  import ResizablePoolActor._

  val resizerCell = context match {
    case x: ResizablePoolCell ⇒ x
    case _ ⇒
      throw ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass)
  }

  override def receive = ({
    case Resize ⇒ resizerCell.resize(initial = false)
  }: Actor.Receive) orElse super.receive
}

Other Akka source code examples

Here is a short list of links related to this Akka Resizer.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.