alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (FutureSpec.scala)

This example Akka source code file (FutureSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, concurrent, failure, future, hello, int, req, res, string, test, testing, testlatch, time, try, utilities

The FutureSpec.scala Akka example source code

package akka.dispatch

import language.postfixOps

import org.scalatest.BeforeAndAfterAll
import org.scalatest.prop.Checkers
import org.scalacheck._
import org.scalacheck.Arbitrary._
import org.scalacheck.Prop._
import akka.actor._
import akka.testkit.{ EventFilter, filterException, AkkaSpec, DefaultTimeout, TestLatch }
import scala.concurrent.{ Await, Awaitable, Future, Promise }
import scala.util.control.NonFatal
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import org.scalatest.junit.JUnitSuiteLike
import scala.runtime.NonLocalReturnControl
import akka.pattern.ask
import java.lang.{ IllegalStateException, ArithmeticException }
import java.util.concurrent._
import scala.reflect.{ ClassTag, classTag }
import scala.util.{ Failure, Success, Try }

object FutureSpec {

  def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type =
    try Await.ready(awaitable, atMost) catch {
      case t: TimeoutException ⇒ throw t
      case e if NonFatal(e)    ⇒ awaitable //swallow
    }

  class TestActor extends Actor {
    def receive = {
      case "Hello" ⇒ sender() ! "World"
      case "Failure" ⇒
        sender() ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
      case "NoReply" ⇒
    }
  }

  class TestDelayActor(await: TestLatch) extends Actor {
    def receive = {
      case "Hello" ⇒
        FutureSpec.ready(await, TestLatch.DefaultTimeout); sender() ! "World"
      case "NoReply" ⇒ FutureSpec.ready(await, TestLatch.DefaultTimeout)
      case "Failure" ⇒
        FutureSpec.ready(await, TestLatch.DefaultTimeout)
        sender() ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
    }
  }

  final case class Req[T](req: T)
  final case class Res[T](res: T)
}

class JavaFutureSpec extends JavaFutureTests with JUnitSuiteLike

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with DefaultTimeout {
  import FutureSpec._
  implicit val ec: ExecutionContext = system.dispatcher
  "A Promise" when {
    "never completed" must {
      behave like emptyFuture(_(Promise().future))
      "return supplied value on timeout" in {
        val failure = Promise.failed[String](new RuntimeException("br0ken")).future
        val otherFailure = Promise.failed[String](new RuntimeException("last")).future
        val empty = Promise[String]().future
        val timedOut = Promise.successful[String]("Timedout").future

        Await.result(failure fallbackTo timedOut, timeout.duration) should be("Timedout")
        Await.result(timedOut fallbackTo empty, timeout.duration) should be("Timedout")
        Await.result(failure fallbackTo failure fallbackTo timedOut, timeout.duration) should be("Timedout")
        intercept[RuntimeException] {
          Await.result(failure fallbackTo otherFailure, timeout.duration)
        }.getMessage should be("br0ken")
      }
    }
    "completed with a result" must {
      val result = "test value"
      val future = Promise[String]().complete(Success(result)).future
      behave like futureWithResult(_(future, result))
    }
    "completed with an exception" must {
      val message = "Expected Exception"
      val future = Promise[String]().complete(Failure(new RuntimeException(message))).future
      behave like futureWithException[RuntimeException](_(future, message))
    }
    "completed with an InterruptedException" must {
      val message = "Boxed InterruptedException"
      val future = Promise[String]().complete(Failure(new InterruptedException(message))).future
      behave like futureWithException[RuntimeException](_(future, message))
    }
    "completed with a NonLocalReturnControl" must {
      val result = "test value"
      val future = Promise[String]().complete(Failure(new NonLocalReturnControl[String]("test", result))).future
      behave like futureWithResult(_(future, result))
    }

    "have different ECs" in {
      def namedCtx(n: String) =
        ExecutionContext.fromExecutorService(
          Executors.newSingleThreadExecutor(new ThreadFactory { def newThread(r: Runnable) = new Thread(r, n) }))

      val A = namedCtx("A")
      val B = namedCtx("B")

      // create a promise with ctx A
      val p = Promise[String]()

      // I would expect that any callback from p
      // is executed in the context of p
      val result = {
        implicit val ec = A
        p.future map { _ + Thread.currentThread().getName() }
      }

      p.completeWith(Future { "Hi " }(B))
      try {
        Await.result(result, timeout.duration) should be("Hi A")
      } finally {
        A.shutdown()
        B.shutdown()
      }
    }
  }

  "A Future" when {

    "awaiting a result" which {
      "is not completed" must {
        behave like emptyFuture { test ⇒
          val latch = new TestLatch
          val result = "test value"
          val future = Future {
            FutureSpec.ready(latch, TestLatch.DefaultTimeout)
            result
          }
          test(future)
          latch.open()
          FutureSpec.ready(future, timeout.duration)
        }
      }
      "is completed" must {
        behave like futureWithResult { test ⇒
          val latch = new TestLatch
          val result = "test value"
          val future = Future {
            FutureSpec.ready(latch, TestLatch.DefaultTimeout)
            result
          }
          latch.open()
          FutureSpec.ready(future, timeout.duration)
          test(future, result)
        }
      }
      "has actions applied" must {
        "pass checks" in {
          filterException[ArithmeticException] {
            check({ (future: Future[Int], actions: List[FutureAction]) ⇒
              def wrap[T](f: Future[T]): Try[T] = FutureSpec.ready(f, timeout.duration).value.get
              val result = (future /: actions)(_ /: _)
              val expected = (wrap(future) /: actions)(_ /: _)
              ((wrap(result), expected) match {
                case (Success(a), Success(b)) ⇒ a == b
                case (Failure(a), Failure(b)) if a.toString == b.toString ⇒ true
                case (Failure(a), Failure(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ a.getClass.toString == b.getClass.toString
                case _ ⇒ false
              }) :| result.value.get.toString + " is expected to be " + expected.toString
            }, minSuccessful(10000), workers(4))
          }
        }
      }
    }

    "from an Actor" which {
      "returns a result" must {
        behave like futureWithResult { test ⇒
          val actor = system.actorOf(Props[TestActor])
          val future = actor ? "Hello"
          FutureSpec.ready(future, timeout.duration)
          test(future, "World")
          system.stop(actor)
        }
      }
      "throws an exception" must {
        behave like futureWithException[RuntimeException] { test ⇒
          filterException[RuntimeException] {
            val actor = system.actorOf(Props[TestActor])
            val future = actor ? "Failure"
            FutureSpec.ready(future, timeout.duration)
            test(future, "Expected exception; to test fault-tolerance")
            system.stop(actor)
          }
        }
      }
    }

    "using flatMap with an Actor" which {
      "will return a result" must {
        behave like futureWithResult { test ⇒
          val actor1 = system.actorOf(Props[TestActor])
          val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! s.toUpperCase } }))
          val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s }
          FutureSpec.ready(future, timeout.duration)
          test(future, "WORLD")
          system.stop(actor1)
          system.stop(actor2)
        }
      }
      "will throw an exception" must {
        behave like futureWithException[ArithmeticException] { test ⇒
          filterException[ArithmeticException] {
            val actor1 = system.actorOf(Props[TestActor])
            val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! Status.Failure(new ArithmeticException("/ by zero")) } }))
            val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s }
            FutureSpec.ready(future, timeout.duration)
            test(future, "/ by zero")
            system.stop(actor1)
            system.stop(actor2)
          }
        }
      }
      "will throw a NoSuchElementException when matching wrong type" must {
        behave like futureWithException[NoSuchElementException] { test ⇒
          filterException[NoSuchElementException] {
            val actor1 = system.actorOf(Props[TestActor])
            val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! s.toUpperCase } }))
            val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i }
            FutureSpec.ready(future, timeout.duration)
            test(future, "World (of class java.lang.String)")
            system.stop(actor1)
            system.stop(actor2)
          }
        }
      }
    }

    "being tested" must {

      "compose with for-comprehensions" in {
        filterException[ClassCastException] {
          val actor = system.actorOf(Props(new Actor {
            def receive = {
              case s: String ⇒ sender() ! s.length
              case i: Int    ⇒ sender() ! (i * 2).toString
            }
          }))

          val future0 = actor ? "Hello"

          val future1 = for {
            a ← future0.mapTo[Int] // returns 5
            b ← (actor ? a).mapTo[String] // returns "10"
            c ← (actor ? 7).mapTo[String] // returns "14"
          } yield b + "-" + c

          val future2 = for {
            a ← future0.mapTo[Int]
            b ← (actor ? a).mapTo[Int]
            c ← (actor ? 7).mapTo[String]
          } yield b + "-" + c

          Await.result(future1, timeout.duration) should be("10-14")
          assert(checkType(future1, classTag[String]))
          intercept[ClassCastException] { Await.result(future2, timeout.duration) }
          system.stop(actor)
        }
      }

      "support pattern matching within a for-comprehension" in {
        filterException[NoSuchElementException] {
          val actor = system.actorOf(Props(new Actor {
            def receive = {
              case Req(s: String) ⇒ sender() ! Res(s.length)
              case Req(i: Int)    ⇒ sender() ! Res((i * 2).toString)
            }
          }))

          val future1 = for {
            Res(a: Int) ← actor ? Req("Hello")
            Res(b: String) ← actor ? Req(a)
            Res(c: String) ← actor ? Req(7)
          } yield b + "-" + c

          val future2 = for {
            Res(a: Int) ← actor ? Req("Hello")
            Res(b: Int) ← actor ? Req(a)
            Res(c: Int) ← actor ? Req(7)
          } yield b + "-" + c

          Await.result(future1, timeout.duration) should be("10-14")
          intercept[NoSuchElementException] { Await.result(future2, timeout.duration) }
          system.stop(actor)
        }
      }

      "recover from exceptions" in {
        filterException[RuntimeException] {
          val future1 = Future(5)
          val future2 = future1 map (_ / 0)
          val future3 = future2 map (_.toString)

          val future4 = future1 recover {
            case e: ArithmeticException ⇒ 0
          } map (_.toString)

          val future5 = future2 recover {
            case e: ArithmeticException ⇒ 0
          } map (_.toString)

          val future6 = future2 recover {
            case e: MatchError ⇒ 0
          } map (_.toString)

          val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" }

          val actor = system.actorOf(Props[TestActor])

          val future8 = actor ? "Failure"
          val future9 = actor ? "Failure" recover {
            case e: RuntimeException ⇒ "FAIL!"
          }
          val future10 = actor ? "Hello" recover {
            case e: RuntimeException ⇒ "FAIL!"
          }
          val future11 = actor ? "Failure" recover { case _ ⇒ "Oops!" }

          Await.result(future1, timeout.duration) should be(5)
          intercept[ArithmeticException] { Await.result(future2, timeout.duration) }
          intercept[ArithmeticException] { Await.result(future3, timeout.duration) }
          Await.result(future4, timeout.duration) should be("5")
          Await.result(future5, timeout.duration) should be("0")
          intercept[ArithmeticException] { Await.result(future6, timeout.duration) }
          Await.result(future7, timeout.duration) should be("You got ERROR")
          intercept[RuntimeException] { Await.result(future8, timeout.duration) }
          Await.result(future9, timeout.duration) should be("FAIL!")
          Await.result(future10, timeout.duration) should be("World")
          Await.result(future11, timeout.duration) should be("Oops!")

          system.stop(actor)
        }
      }

      "recoverWith from exceptions" in {
        val o = new IllegalStateException("original")
        val r = new IllegalStateException("recovered")
        val yay = Promise.successful("yay!").future

        intercept[IllegalStateException] {
          Await.result(Promise.failed[String](o).future recoverWith { case _ if false == true ⇒ yay }, timeout.duration)
        } should be(o)

        Await.result(Promise.failed[String](o).future recoverWith { case _ ⇒ yay }, timeout.duration) should be("yay!")

        intercept[IllegalStateException] {
          Await.result(Promise.failed[String](o).future recoverWith { case _ ⇒ Promise.failed[String](r).future }, timeout.duration)
        } should be(r)
      }

      "andThen like a boss" in {
        val q = new LinkedBlockingQueue[Int]
        for (i ← 1 to 1000) {
          Await.result(Future { q.add(1); 3 } andThen { case _ ⇒ q.add(2) } andThen { case Success(0) ⇒ q.add(Int.MaxValue) } andThen { case _ ⇒ q.add(3); }, timeout.duration) should be(3)
          q.poll() should be(1)
          q.poll() should be(2)
          q.poll() should be(3)
          q.clear()
        }
      }

      "firstCompletedOf" in {
        val futures = Vector.fill[Future[Int]](10)(Promise[Int]().future) :+ Promise.successful[Int](5).future
        Await.result(Future.firstCompletedOf(futures), timeout.duration) should be(5)
      }

      "find" in {
        val futures = for (i ← 1 to 10) yield Future { i }
        val result = Future.find[Int](futures)(_ == 3)
        Await.result(result, timeout.duration) should be(Some(3))

        val notFound = Future.find[Int](futures)(_ == 11)
        Await.result(notFound, timeout.duration) should be(None)
      }

      "fold" in {
        Await.result(Future.fold((1 to 10).toList map { i ⇒ Future(i) })(0)(_ + _), remainingOrDefault) should be(55)
      }

      "zip" in {
        val timeout = 10000 millis
        val f = new IllegalStateException("test")
        intercept[IllegalStateException] {
          Await.result(Promise.failed[String](f).future zip Promise.successful("foo").future, timeout)
        } should be(f)

        intercept[IllegalStateException] {
          Await.result(Promise.successful("foo").future zip Promise.failed[String](f).future, timeout)
        } should be(f)

        intercept[IllegalStateException] {
          Await.result(Promise.failed[String](f).future zip Promise.failed[String](f).future, timeout)
        } should be(f)

        Await.result(Promise.successful("foo").future zip Promise.successful("foo").future, timeout) should be(("foo", "foo"))
      }

      "fold by composing" in {
        val futures = (1 to 10).toList map { i ⇒ Future(i) }
        Await.result(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) should be(55)
      }

      "fold with an exception" in {
        filterException[IllegalArgumentException] {
          val futures = (1 to 10).toList map {
            case 6 ⇒ Future(throw new IllegalArgumentException("shouldFoldResultsWithException: expected"))
            case i ⇒ Future(i)
          }
          intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), remainingOrDefault) }.getMessage should be("shouldFoldResultsWithException: expected")
        }
      }

      "fold mutable zeroes safely" in {
        import scala.collection.mutable.ArrayBuffer
        def test(testNumber: Int) {
          val fs = (0 to 1000) map (i ⇒ Future(i))
          val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) {
            case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef]
            case (l, _)               ⇒ l
          }
          val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum

          assert(result === 250500)
        }

        (1 to 100) foreach test //Make sure it tries to provoke the problem
      }

      "return zero value if folding empty list" in {
        Await.result(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) should be(0)
      }

      "reduce results" in {
        val futures = (1 to 10).toList map { i ⇒ Future(i) }
        assert(Await.result(Future.reduce(futures)(_ + _), remainingOrDefault) === 55)
      }

      "reduce results with Exception" in {
        filterException[IllegalArgumentException] {
          val futures = (1 to 10).toList map {
            case 6 ⇒ Future(throw new IllegalArgumentException("shouldReduceResultsWithException: expected"))
            case i ⇒ Future(i)
          }
          intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), remainingOrDefault) }.getMessage should be("shouldReduceResultsWithException: expected")
        }
      }

      "throw IllegalArgumentException on empty input to reduce" in {
        filterException[IllegalArgumentException] {
          intercept[java.util.NoSuchElementException] { Await.result(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) }
        }
      }

      "execute onSuccess when received ask reply" in {
        val latch = new TestLatch
        val actor = system.actorOf(Props[TestActor])
        actor ? "Hello" onSuccess { case "World" ⇒ latch.open() }
        FutureSpec.ready(latch, 5 seconds)
        system.stop(actor)
      }

      "traverse Futures" in {
        val oddActor = system.actorOf(Props(new Actor {
          var counter = 1
          def receive = {
            case 'GetNext ⇒
              sender() ! counter
              counter += 2
          }
        }))

        val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo classTag[Int])

        assert(Await.result(Future.sequence(oddFutures), timeout.duration).sum === 10000)
        system.stop(oddActor)

        val list = (1 to 100).toList
        assert(Await.result(Future.traverse(list)(x ⇒ Future(x * 2 - 1)), timeout.duration).sum === 10000)
      }

      "handle Throwables" in {
        class ThrowableTest(m: String) extends Throwable(m)

        EventFilter[ThrowableTest](occurrences = 4) intercept {
          val f1 = Future[Any] { throw new ThrowableTest("test") }
          intercept[ThrowableTest] { Await.result(f1, timeout.duration) }

          val latch = new TestLatch
          val f2 = Future { FutureSpec.ready(latch, 5 seconds); "success" }
          f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach"))
          f2 onSuccess { case _ ⇒ throw new ThrowableTest("dispatcher receive") }
          val f3 = f2 map (s ⇒ s.toUpperCase)
          latch.open()
          assert(Await.result(f2, timeout.duration) === "success")
          f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach"))
          f2 onSuccess { case _ ⇒ throw new ThrowableTest("current thread receive") }
          assert(Await.result(f3, timeout.duration) === "SUCCESS")
        }
      }

      "block until result" in {
        val latch = new TestLatch

        val f = Future { FutureSpec.ready(latch, 5 seconds); 5 }
        val f2 = Future { Await.result(f, timeout.duration) + 5 }

        intercept[TimeoutException](FutureSpec.ready(f2, 100 millis))
        latch.open()
        assert(Await.result(f2, timeout.duration) === 10)

        val f3 = Future { Thread.sleep(100); 5 }
        filterException[TimeoutException] { intercept[TimeoutException] { FutureSpec.ready(f3, 0 millis) } }
      }

      "run callbacks async" in {
        val latch = Vector.fill(10)(new TestLatch)

        val f1 = Future { latch(0).open(); FutureSpec.ready(latch(1), TestLatch.DefaultTimeout); "Hello" }
        val f2 = f1 map { s ⇒ latch(2).open(); FutureSpec.ready(latch(3), TestLatch.DefaultTimeout); s.length }
        f2 foreach (_ ⇒ latch(4).open())

        FutureSpec.ready(latch(0), TestLatch.DefaultTimeout)

        f1 should not be ('completed)
        f2 should not be ('completed)

        latch(1).open()
        FutureSpec.ready(latch(2), TestLatch.DefaultTimeout)

        f1 should be('completed)
        f2 should not be ('completed)

        val f3 = f1 map { s ⇒ latch(5).open(); FutureSpec.ready(latch(6), TestLatch.DefaultTimeout); s.length * 2 }
        f3 foreach (_ ⇒ latch(3).open())

        FutureSpec.ready(latch(5), TestLatch.DefaultTimeout)

        f3 should not be ('completed)

        latch(6).open()
        FutureSpec.ready(latch(4), TestLatch.DefaultTimeout)

        f2 should be('completed)
        f3 should be('completed)

        val p1 = Promise[String]()
        val f4 = p1.future map { s ⇒ latch(7).open(); FutureSpec.ready(latch(8), TestLatch.DefaultTimeout); s.length }
        f4 foreach (_ ⇒ latch(9).open())

        p1 should not be ('completed)
        f4 should not be ('completed)

        p1 complete Success("Hello")

        FutureSpec.ready(latch(7), TestLatch.DefaultTimeout)

        p1 should be('completed)
        f4 should not be ('completed)

        latch(8).open()
        FutureSpec.ready(latch(9), TestLatch.DefaultTimeout)

        FutureSpec.ready(f4, timeout.duration) should be('completed)
      }

      "not deadlock with nested await (ticket 1313)" in {
        val simple = Future(()) map (_ ⇒ Await.result((Future(()) map (_ ⇒ ())), timeout.duration))
        FutureSpec.ready(simple, timeout.duration) should be('completed)

        val l1, l2 = new TestLatch
        val complex = Future(()) map { _ ⇒
          val nested = Future(())
          nested foreach (_ ⇒ l1.open())
          FutureSpec.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed
          nested foreach (_ ⇒ l2.open())
          FutureSpec.ready(l2, TestLatch.DefaultTimeout)
        }
        FutureSpec.ready(complex, timeout.duration) should be('completed)
      }

      "re-use the same thread for nested futures with batching ExecutionContext" in {
        val failCount = new java.util.concurrent.atomic.AtomicInteger
        val f = Future(()) flatMap { _ ⇒
          val originalThread = Thread.currentThread
          // run some nested futures
          val nested =
            for (i ← 1 to 100)
              yield Future.successful("abc") flatMap { _ ⇒
              if (Thread.currentThread ne originalThread)
                failCount.incrementAndGet
              // another level of nesting
              Future.successful("xyz") map { _ ⇒
                if (Thread.currentThread ne originalThread)
                  failCount.incrementAndGet
              }
            }
          Future.sequence(nested)
        }
        Await.ready(f, timeout.duration)
        // TODO re-enable once we're using the batching dispatcher
        // failCount.get should be(0)
      }

    }
  }

  def emptyFuture(f: (Future[Any] ⇒ Unit) ⇒ Unit) {
    "not be completed" in { f(_ should not be ('completed)) }
    "not contain a value" in { f(_.value should be(None)) }
  }

  def futureWithResult(f: ((Future[Any], Any) ⇒ Unit) ⇒ Unit) {
    "be completed" in { f((future, _) ⇒ future should be('completed)) }
    "contain a value" in { f((future, result) ⇒ future.value should be(Some(Success(result)))) }
    "return result with 'get'" in { f((future, result) ⇒ Await.result(future, timeout.duration) should be(result)) }
    "return result with 'Await.result'" in { f((future, result) ⇒ Await.result(future, timeout.duration) should be(result)) }
    "not timeout" in { f((future, _) ⇒ FutureSpec.ready(future, 0 millis)) }
    "filter result" in {
      f { (future, result) ⇒
        Await.result((future filter (_ ⇒ true)), timeout.duration) should be(result)
        evaluating { Await.result((future filter (_ ⇒ false)), timeout.duration) } should produce[java.util.NoSuchElementException]
      }
    }
    "transform result with map" in { f((future, result) ⇒ Await.result((future map (_.toString.length)), timeout.duration) should be(result.toString.length)) }
    "compose result with flatMap" in {
      f { (future, result) ⇒
        val r = for (r ← future; p ← Promise.successful("foo").future) yield r.toString + p
        Await.result(r, timeout.duration) should be(result.toString + "foo")
      }
    }
    "perform action with foreach" in {
      f { (future, result) ⇒
        val p = Promise[Any]()
        future foreach p.success
        Await.result(p.future, timeout.duration) should be(result)
      }
    }
    "zip properly" in {
      f { (future, result) ⇒
        Await.result(future zip Promise.successful("foo").future, timeout.duration) should be((result, "foo"))
        (evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, timeout.duration) } should produce[RuntimeException]).getMessage should be("ohnoes")
      }
    }
    "not recover from exception" in { f((future, result) ⇒ Await.result(future.recover({ case _ ⇒ "pigdog" }), timeout.duration) should be(result)) }
    "perform action on result" in {
      f { (future, result) ⇒
        val p = Promise[Any]()
        future.onSuccess { case x ⇒ p.success(x) }
        Await.result(p.future, timeout.duration) should be(result)
      }
    }
    "not project a failure" in { f((future, result) ⇒ (evaluating { Await.result(future.failed, timeout.duration) } should produce[NoSuchElementException]).getMessage should be("Future.failed not completed with a throwable.")) }
    "not perform action on exception" is pending
    "cast using mapTo" in { f((future, result) ⇒ Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), timeout.duration) should be(false)) }
  }

  def futureWithException[E <: Throwable: ClassTag](f: ((Future[Any], String) ⇒ Unit) ⇒ Unit) {
    "be completed" in { f((future, _) ⇒ future should be('completed)) }
    "contain a value" in {
      f((future, message) ⇒ {
        future.value should be('defined)
        future.value.get should be('failure)
        val Failure(f) = future.value.get
        f.getMessage should be(message)
      })
    }
    "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) }
    "throw exception with 'Await.result'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) }
    "retain exception with filter" in {
      f { (future, message) ⇒
        (evaluating { Await.result(future filter (_ ⇒ true), timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)
        (evaluating { Await.result(future filter (_ ⇒ false), timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)
      }
    }
    "retain exception with map" in { f((future, message) ⇒ (evaluating { Await.result(future map (_.toString.length), timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) }
    "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo").future), timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) }
    "not perform action with foreach" is pending

    "zip properly" in {
      f { (future, message) ⇒ (evaluating { Await.result(future zip Promise.successful("foo").future, timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message) }
    }
    "recover from exception" in { f((future, message) ⇒ Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), timeout.duration) should be("pigdog")) }
    "not perform action on result" is pending
    "project a failure" in { f((future, message) ⇒ Await.result(future.failed, timeout.duration).getMessage should be(message)) }
    "perform action on exception" in {
      f { (future, message) ⇒
        val p = Promise[Any]()
        future.onFailure { case _ ⇒ p.success(message) }
        Await.result(p.future, timeout.duration) should be(message)
      }
    }
    "always cast successfully using mapTo" in { f((future, message) ⇒ (evaluating { Await.result(future.mapTo[java.lang.Thread], timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) }
  }

  sealed trait IntAction { def apply(that: Int): Int }
  final case class IntAdd(n: Int) extends IntAction { def apply(that: Int) = that + n }
  final case class IntSub(n: Int) extends IntAction { def apply(that: Int) = that - n }
  final case class IntMul(n: Int) extends IntAction { def apply(that: Int) = that * n }
  final case class IntDiv(n: Int) extends IntAction { def apply(that: Int) = that / n }

  sealed trait FutureAction {
    def /:(that: Try[Int]): Try[Int]
    def /:(that: Future[Int]): Future[Int]
  }

  final case class MapAction(action: IntAction) extends FutureAction {
    def /:(that: Try[Int]): Try[Int] = that map action.apply
    def /:(that: Future[Int]): Future[Int] = that map action.apply
  }

  final case class FlatMapAction(action: IntAction) extends FutureAction {
    def /:(that: Try[Int]): Try[Int] = that map action.apply
    def /:(that: Future[Int]): Future[Int] = that flatMap (n ⇒ Future.successful(action(n)))
  }

  implicit def arbFuture: Arbitrary[Future[Int]] = Arbitrary(for (n ← arbitrary[Int]) yield Future(n))

  implicit def arbFutureAction: Arbitrary[FutureAction] = Arbitrary {

    val genIntAction = for {
      n ← arbitrary[Int]
      a ← Gen.oneOf(IntAdd(n), IntSub(n), IntMul(n), IntDiv(n))
    } yield a

    val genMapAction = genIntAction map (MapAction(_))

    val genFlatMapAction = genIntAction map (FlatMapAction(_))

    Gen.oneOf(genMapAction, genFlatMapAction)

  }

  def checkType[A: ClassTag, B](in: Future[A], reftag: ClassTag[B]): Boolean = implicitly[ClassTag[A]].runtimeClass == reftag.runtimeClass
}

Other Akka source code examples

Here is a short list of links related to this Akka FutureSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.