alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ScatterGatherFirstCompleted.scala)

This example Akka source code file (ScatterGatherFirstCompleted.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorsystem, akka, concurrent, dispatch, duration, finiteduration, int, routee, router, scattergatherfirstcompletedgroup, scattergatherfirstcompletedpool, serialversionuid, string, supervisorstrategy, time

The ScatterGatherFirstCompleted.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.routing

import scala.collection.immutable
import akka.actor.ActorContext
import akka.actor.Props
import akka.dispatch.Dispatchers
import com.typesafe.config.Config
import akka.actor.SupervisorStrategy
import akka.japi.Util.immutableSeq
import akka.actor.ActorRef
import scala.concurrent.Promise
import akka.pattern.ask
import akka.pattern.pipe
import akka.dispatch.ExecutionContexts
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import akka.util.Timeout
import akka.util.Helpers.ConfigOps
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem

/**
 * Broadcasts the message to all routees, and replies with the first response.
 *
 * @param within expecting at least one reply within this duration, otherwise
 *   it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
 */
@SerialVersionUID(1L)
final case class ScatterGatherFirstCompletedRoutingLogic(within: FiniteDuration) extends RoutingLogic {
  override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.isEmpty) NoRoutee
    else ScatterGatherFirstCompletedRoutees(routees, within)
}

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[akka] final case class ScatterGatherFirstCompletedRoutees(
  routees: immutable.IndexedSeq[Routee], within: FiniteDuration) extends Routee {

  override def send(message: Any, sender: ActorRef): Unit = {
    implicit val ec = ExecutionContexts.sameThreadExecutionContext
    implicit val timeout = Timeout(within)
    val promise = Promise[Any]()
    routees.foreach {
      case ActorRefRoutee(ref) ⇒
        promise.tryCompleteWith(ref.ask(message))
      case ActorSelectionRoutee(sel) ⇒
        promise.tryCompleteWith(sel.ask(message))
      case _ ⇒
    }

    promise.future.pipeTo(sender)
  }
}

/**
 * A router pool that broadcasts the message to all routees, and replies with the first response.
 *
 * The configuration parameter trumps the constructor arguments. This means that
 * if you provide `nrOfInstances` during instantiation they will be ignored if
 * the router is defined in the configuration file for the actor being used.
 *
 * <h1>Supervision Setup</h1>
 *
 * Any routees that are created by a router will be created as the router's children.
 * The router is therefore also the children's supervisor.
 *
 * The supervision strategy of the router actor can be configured with
 * [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
 * a strategy of “always escalate”. This means that errors are passed up to the
 * router's supervisor for handling.
 *
 * The router's supervisor will treat the error as an error with the router itself.
 * Therefore a directive to stop or restart will cause the router itself to stop or
 * restart. The router, in turn, will cause its children to stop and restart.
 *
 * @param nrOfInstances initial number of routees in the pool
 *
 * @param resizer optional resizer that dynamically adjust the pool size
 *
 * @param within expecting at least one reply within this duration, otherwise
 *   it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
 *
 * @param supervisorStrategy strategy for supervising the routees, see 'Supervision Setup'
 *
 * @param routerDispatcher dispatcher to use for the router head actor, which handles
 *   supervision, death watch and router management messages
 */
@SerialVersionUID(1L)
final case class ScatterGatherFirstCompletedPool(
  override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
  within: FiniteDuration,
  override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
  override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
  override val usePoolDispatcher: Boolean = false)
  extends Pool with PoolOverrideUnsetConfig[ScatterGatherFirstCompletedPool] {

  def this(config: Config) =
    this(
      nrOfInstances = config.getInt("nr-of-instances"),
      within = config.getMillisDuration("within"),
      resizer = DefaultResizer.fromConfig(config),
      usePoolDispatcher = config.hasPath("pool-dispatcher"))

  /**
   * Java API
   * @param nr initial number of routees in the pool
   * @param within expecting at least one reply within this duration, otherwise
   *   it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
   */
  def this(nr: Int, within: FiniteDuration) = this(nrOfInstances = nr, within = within)

  override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within))

  /**
   * Setting the supervisor strategy to be used for the “head” Router actor.
   */
  def withSupervisorStrategy(strategy: SupervisorStrategy): ScatterGatherFirstCompletedPool = copy(supervisorStrategy = strategy)

  /**
   * Setting the resizer to be used.
   */
  def withResizer(resizer: Resizer): ScatterGatherFirstCompletedPool = copy(resizer = Some(resizer))

  /**
   * Setting the dispatcher to be used for the router head actor,  which handles
   * supervision, death watch and router management messages.
   */
  def withDispatcher(dispatcherId: String): ScatterGatherFirstCompletedPool = copy(routerDispatcher = dispatcherId)

  /**
   * Uses the resizer and/or the supervisor strategy of the given Routerconfig
   * if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
   * resizer was not defined in config.
   */
  override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)

}

/**
 * A router group that broadcasts the message to all routees, and replies with the first response.
 *
 * The configuration parameter trumps the constructor arguments. This means that
 * if you provide `paths` during instantiation they will be ignored if
 * the router is defined in the configuration file for the actor being used.
 *
 * @param paths string representation of the actor paths of the routees, messages are
 *   sent with [[akka.actor.ActorSelection]] to these paths
 *
 * @param within expecting at least one reply within this duration, otherwise
 *   it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
 *
 * @param routerDispatcher dispatcher to use for the router head actor, which handles
 *   router management messages
 */
@SerialVersionUID(1L)
final case class ScatterGatherFirstCompletedGroup(
  override val paths: immutable.Iterable[String],
  within: FiniteDuration,
  override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
  extends Group {

  def this(config: Config) =
    this(
      paths = immutableSeq(config.getStringList("routees.paths")),
      within = config.getMillisDuration("within"))

  /**
   * Java API
   * @param routeePaths string representation of the actor paths of the routees, messages are
   *   sent with [[akka.actor.ActorSelection]] to these paths
   * @param within expecting at least one reply within this duration, otherwise
   *   it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
   */
  def this(routeePaths: java.lang.Iterable[String], within: FiniteDuration) =
    this(paths = immutableSeq(routeePaths), within = within)

  override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within))

  /**
   * Setting the dispatcher to be used for the router head actor, which handles
   * router management messages
   */
  def withDispatcher(dispatcherId: String): ScatterGatherFirstCompletedGroup = copy(routerDispatcher = dispatcherId)

}

Other Akka source code examples

Here is a short list of links related to this Akka ScatterGatherFirstCompleted.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.