alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Future.scala)

This example Akka source code file (Future.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, anyref, boolean, boxedunit, concurrent, dispatch, executioncontext, future, japi, jiterable, r, runtime, t, throwable, unit, utilities

The Future.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.dispatch

import scala.runtime.{ BoxedUnit, AbstractPartialFunction }
import akka.japi.{ Function ⇒ JFunc, Option ⇒ JOption, Procedure }
import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutor, ExecutionContextExecutorService }
import java.lang.{ Iterable ⇒ JIterable }
import java.util.{ LinkedList ⇒ JLinkedList }
import java.util.concurrent.{ Executor, ExecutorService, ExecutionException, Callable, TimeoutException }
import scala.util.{ Try, Success, Failure }

/**
 * ExecutionContexts is the Java API for ExecutionContexts
 */
object ExecutionContexts {
  /**
   * Returns a new ExecutionContextExecutor which will delegate execution to the underlying Executor,
   * and which will use the default error reporter.
   *
   * @param executor the Executor which will be used for the ExecutionContext
   * @return a new ExecutionContext
   */
  def fromExecutor(executor: Executor): ExecutionContextExecutor =
    ExecutionContext.fromExecutor(executor)

  /**
   * Returns a new ExecutionContextExecutor which will delegate execution to the underlying Executor,
   * and which will use the provided error reporter.
   *
   * @param executor the Executor which will be used for the ExecutionContext
   * @param errorReporter a Procedure that will log any exceptions passed to it
   * @return a new ExecutionContext
   */
  def fromExecutor(executor: Executor, errorReporter: Procedure[Throwable]): ExecutionContextExecutor =
    ExecutionContext.fromExecutor(executor, errorReporter.apply)

  /**
   * Returns a new ExecutionContextExecutorService which will delegate execution to the underlying ExecutorService,
   * and which will use the default error reporter.
   *
   * @param executor the ExecutorService which will be used for the ExecutionContext
   * @return a new ExecutionContext
   */
  def fromExecutorService(executorService: ExecutorService): ExecutionContextExecutorService =
    ExecutionContext.fromExecutorService(executorService)

  /**
   * Returns a new ExecutionContextExecutorService which will delegate execution to the underlying ExecutorService,
   * and which will use the provided error reporter.
   *
   * @param executor the ExecutorService which will be used for the ExecutionContext
   * @param errorReporter a Procedure that will log any exceptions passed to it
   * @return a new ExecutionContext
   */
  def fromExecutorService(executorService: ExecutorService, errorReporter: Procedure[Throwable]): ExecutionContextExecutorService =
    ExecutionContext.fromExecutorService(executorService, errorReporter.apply)

  /**
   * @return a reference to the global ExecutionContext
   */
  def global(): ExecutionContextExecutor = ExecutionContext.global

  /**
   * WARNING: Not A General Purpose ExecutionContext!
   *
   * This is an execution context which runs everything on the calling thread.
   * It is very useful for actions which are known to be non-blocking and
   * non-throwing in order to save a round-trip to the thread pool.
   */
  private[akka] object sameThreadExecutionContext extends ExecutionContext with BatchingExecutor {
    override protected def unbatchedExecute(runnable: Runnable): Unit = runnable.run()
    override protected def resubmitOnBlock: Boolean = false // No point since we execute on same thread
    override def reportFailure(t: Throwable): Unit =
      throw new IllegalStateException("exception in sameThreadExecutionContext", t)
  }
}

/**
 * Futures is the Java API for Futures and Promises
 */
object Futures {
  import scala.collection.JavaConverters.iterableAsScalaIterableConverter
  /**
   * Starts an asynchronous computation and returns a `Future` object with the result of that computation.
   *
   * The result becomes available once the asynchronous computation is completed.
   *
   * @param body     the asychronous computation
   * @param executor the execution context on which the future is run
   * @return         the `Future` holding the result of the computation
   */
  def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor)

  /**
   * Creates a promise object which can be completed with a value.
   *
   * @return         the newly created `Promise` object
   */
  def promise[T](): Promise[T] = Promise[T]()

  /**
   * creates an already completed Promise with the specified exception
   */
  def failed[T](exception: Throwable): Future[T] = Future.failed(exception)

  /**
   * Creates an already completed Promise with the specified result
   */
  def successful[T](result: T): Future[T] = Future.successful(result)

  /**
   * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
   */
  def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = {
    implicit val ec = executor
    Future.find[T](futures.asScala)(predicate.apply(_))(executor) map JOption.fromScalaOption
  }

  /**
   * Returns a Future to the result of the first future in the list that is completed
   */
  def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] =
    Future.firstCompletedOf(futures.asScala)(executor)

  /**
   * A non-blocking fold over the specified futures, with the start value of the given zero.
   * The fold is performed on the thread where the last future is completed,
   * the result will be the first failure of any of the futures, or any failure in the actual fold,
   * or the result of the fold.
   */
  def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] =
    Future.fold(futures.asScala)(zero)(fun.apply)(executor)

  /**
   * Reduces the results of the supplied futures and binary function.
   */
  def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] =
    Future.reduce[T, R](futures.asScala)(fun.apply)(executor)

  /**
   * Simple version of [[#traverse]]. Transforms a JIterable[Future[A]] into a Future[JIterable[A]].
   * Useful for reducing many Futures into a single Future.
   */
  def sequence[A](in: JIterable[Future[A]], executor: ExecutionContext): Future[JIterable[A]] = {
    implicit val d = executor
    in.asScala.foldLeft(Future(new JLinkedList[A]())) { (fr, fa) ⇒ for (r ← fr; a ← fa) yield { r add a; r } }
  }

  /**
   * Transforms a JIterable[A] into a Future[JIterable[B]] using the provided Function A ⇒ Future[B].
   * This is useful for performing a parallel map. For example, to apply a function to all items of a list
   * in parallel.
   */
  def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], executor: ExecutionContext): Future[JIterable[B]] = {
    implicit val d = executor
    in.asScala.foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒
      val fb = fn(a)
      for (r ← fr; b ← fb) yield { r add b; r }
    }
  }
}

/**
 * This class contains bridge classes between Scala and Java.
 * Internal use only.
 */
object japi {
  @deprecated("Do not use this directly, use subclasses of this", "2.0")
  class CallbackBridge[-T] extends AbstractPartialFunction[T, BoxedUnit] {
    override final def isDefinedAt(t: T): Boolean = true
    override final def apply(t: T): BoxedUnit = {
      internal(t)
      BoxedUnit.UNIT
    }
    protected def internal(result: T): Unit = ()
  }

  @deprecated("Do not use this directly, use 'Recover'", "2.0")
  class RecoverBridge[+T] extends AbstractPartialFunction[Throwable, T] {
    override final def isDefinedAt(t: Throwable): Boolean = true
    override final def apply(t: Throwable): T = internal(t)
    protected def internal(result: Throwable): T = null.asInstanceOf[T]
  }

  @deprecated("Do not use this directly, use subclasses of this", "2.0")
  class BooleanFunctionBridge[-T] extends scala.Function1[T, Boolean] {
    override final def apply(t: T): Boolean = internal(t)
    protected def internal(result: T): Boolean = false
  }

  @deprecated("Do not use this directly, use subclasses of this", "2.0")
  class UnitFunctionBridge[-T] extends (T ⇒ BoxedUnit) {
    final def apply$mcLJ$sp(l: Long): BoxedUnit = { internal(l.asInstanceOf[T]); BoxedUnit.UNIT }
    final def apply$mcLI$sp(i: Int): BoxedUnit = { internal(i.asInstanceOf[T]); BoxedUnit.UNIT }
    final def apply$mcLF$sp(f: Float): BoxedUnit = { internal(f.asInstanceOf[T]); BoxedUnit.UNIT }
    final def apply$mcLD$sp(d: Double): BoxedUnit = { internal(d.asInstanceOf[T]); BoxedUnit.UNIT }
    override final def apply(t: T): BoxedUnit = { internal(t); BoxedUnit.UNIT }
    protected def internal(result: T): Unit = ()
  }
}

/**
 * Callback for when a Future is completed successfully
 * SAM (Single Abstract Method) class
 *
 * Java API
 */
abstract class OnSuccess[-T] extends japi.CallbackBridge[T] {
  protected final override def internal(result: T) = onSuccess(result)

  /**
   * This method will be invoked once when/if a Future that this callback is registered on
   * becomes successfully completed
   */
  @throws(classOf[Throwable])
  def onSuccess(result: T): Unit
}

/**
 * Callback for when a Future is completed with a failure
 * SAM (Single Abstract Method) class
 *
 * Java API
 */
abstract class OnFailure extends japi.CallbackBridge[Throwable] {
  protected final override def internal(failure: Throwable) = onFailure(failure)

  /**
   * This method will be invoked once when/if a Future that this callback is registered on
   * becomes completed with a failure
   */
  @throws(classOf[Throwable])
  def onFailure(failure: Throwable): Unit
}

/**
 * Callback for when a Future is completed with either failure or a success
 * SAM (Single Abstract Method) class
 *
 * Java API
 */
abstract class OnComplete[-T] extends japi.CallbackBridge[Try[T]] {
  protected final override def internal(value: Try[T]): Unit = value match {
    case Failure(t) ⇒ onComplete(t, null.asInstanceOf[T])
    case Success(r) ⇒ onComplete(null, r)
  }

  /**
   * This method will be invoked once when/if a Future that this callback is registered on
   * becomes completed with a failure or a success.
   * In the case of success then "failure" will be null, and in the case of failure the "success" will be null.
   */
  @throws(classOf[Throwable])
  def onComplete(failure: Throwable, success: T): Unit
}

/**
 * Callback for the Future.recover operation that conditionally turns failures into successes.
 *
 * SAM (Single Abstract Method) class
 *
 * Java API
 */
abstract class Recover[+T] extends japi.RecoverBridge[T] {
  protected final override def internal(result: Throwable): T = recover(result)

  /**
   * This method will be invoked once when/if the Future this recover callback is registered on
   * becomes completed with a failure.
   *
   * @return a successful value for the passed in failure
   * @throws the passed in failure to propagate it.
   *
   * Java API
   */
  @throws(classOf[Throwable])
  def recover(failure: Throwable): T
}

/**
 * <i><b>Java API (not recommended):</b></i>
 * Callback for the Future.filter operation that creates a new Future which will
 * conditionally contain the success of another Future.
 *
 * Unfortunately it is not possible to express the type of a Scala filter in
 * Java: Function1[T, Boolean], where “Boolean” is the primitive type. It is
 * possible to use `Future.filter` by constructing such a function indirectly:
 *
 * {{{
 * import static akka.dispatch.Filter.filterOf;
 * Future<String> f = ...;
 * f.filter(filterOf(new Function<String, Boolean>() {
 *   @Override
 *   public Boolean apply(String s) {
 *     ...
 *   }
 * }));
 * }}}
 *
 * However, `Future.filter` exists mainly to support Scala’s for-comprehensions,
 * thus Java users should prefer `Future.map`, translating non-matching values
 * to failure cases.
 */
object Filter {
  def filterOf[T](f: akka.japi.Function[T, java.lang.Boolean]): (T ⇒ Boolean) =
    new Function1[T, Boolean] { def apply(result: T): Boolean = f(result).booleanValue() }
}

/**
 * Callback for the Future.foreach operation that will be invoked if the Future that this callback
 * is registered on becomes completed with a success. This method is essentially the same operation
 * as onSuccess.
 *
 * SAM (Single Abstract Method) class
 * Java API
 */
abstract class Foreach[-T] extends japi.UnitFunctionBridge[T] {
  override final def internal(t: T): Unit = each(t)

  /**
   * This method will be invoked once when/if a Future that this callback is registered on
   * becomes successfully completed
   */
  @throws(classOf[Throwable])
  def each(result: T): Unit
}

/**
 * Callback for the Future.map and Future.flatMap operations that will be invoked
 * if the Future that this callback is registered on becomes completed with a success.
 * This callback is the equivalent of an akka.japi.Function
 *
 * Override "apply" normally, or "checkedApply" if you need to throw checked exceptions.
 *
 * SAM (Single Abstract Method) class
 *
 * Java API
 */
abstract class Mapper[-T, +R] extends scala.runtime.AbstractFunction1[T, R] {

  /**
   * Override this method to perform the map operation, by default delegates to "checkedApply"
   * which by default throws an UnsupportedOperationException.
   */
  def apply(parameter: T): R = checkedApply(parameter)

  /**
   * Override this method if you need to throw checked exceptions
   *
   * @throws UnsupportedOperation by default
   */
  @throws(classOf[Throwable])
  def checkedApply(parameter: T): R = throw new UnsupportedOperationException("Mapper.checkedApply has not been implemented")
}

Other Akka source code examples

Here is a short list of links related to this Akka Future.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.