|
Akka/Scala example source code file (FutureSpec.scala)
The FutureSpec.scala Akka example source codepackage akka.dispatch import language.postfixOps import org.scalatest.BeforeAndAfterAll import org.scalatest.prop.Checkers import org.scalacheck._ import org.scalacheck.Arbitrary._ import org.scalacheck.Prop._ import akka.actor._ import akka.testkit.{ EventFilter, filterException, AkkaSpec, DefaultTimeout, TestLatch } import scala.concurrent.{ Await, Awaitable, Future, Promise } import scala.util.control.NonFatal import scala.concurrent.duration._ import scala.concurrent.ExecutionContext import org.scalatest.junit.JUnitSuiteLike import scala.runtime.NonLocalReturnControl import akka.pattern.ask import java.lang.{ IllegalStateException, ArithmeticException } import java.util.concurrent._ import scala.reflect.{ ClassTag, classTag } import scala.util.{ Failure, Success, Try } object FutureSpec { def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = try Await.ready(awaitable, atMost) catch { case t: TimeoutException ⇒ throw t case e if NonFatal(e) ⇒ awaitable //swallow } class TestActor extends Actor { def receive = { case "Hello" ⇒ sender() ! "World" case "Failure" ⇒ sender() ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) case "NoReply" ⇒ } } class TestDelayActor(await: TestLatch) extends Actor { def receive = { case "Hello" ⇒ FutureSpec.ready(await, TestLatch.DefaultTimeout); sender() ! "World" case "NoReply" ⇒ FutureSpec.ready(await, TestLatch.DefaultTimeout) case "Failure" ⇒ FutureSpec.ready(await, TestLatch.DefaultTimeout) sender() ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) } } final case class Req[T](req: T) final case class Res[T](res: T) } class JavaFutureSpec extends JavaFutureTests with JUnitSuiteLike @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with DefaultTimeout { import FutureSpec._ implicit val ec: ExecutionContext = system.dispatcher "A Promise" when { "never completed" must { behave like emptyFuture(_(Promise().future)) "return supplied value on timeout" in { val failure = Promise.failed[String](new RuntimeException("br0ken")).future val otherFailure = Promise.failed[String](new RuntimeException("last")).future val empty = Promise[String]().future val timedOut = Promise.successful[String]("Timedout").future Await.result(failure fallbackTo timedOut, timeout.duration) should be("Timedout") Await.result(timedOut fallbackTo empty, timeout.duration) should be("Timedout") Await.result(failure fallbackTo failure fallbackTo timedOut, timeout.duration) should be("Timedout") intercept[RuntimeException] { Await.result(failure fallbackTo otherFailure, timeout.duration) }.getMessage should be("br0ken") } } "completed with a result" must { val result = "test value" val future = Promise[String]().complete(Success(result)).future behave like futureWithResult(_(future, result)) } "completed with an exception" must { val message = "Expected Exception" val future = Promise[String]().complete(Failure(new RuntimeException(message))).future behave like futureWithException[RuntimeException](_(future, message)) } "completed with an InterruptedException" must { val message = "Boxed InterruptedException" val future = Promise[String]().complete(Failure(new InterruptedException(message))).future behave like futureWithException[RuntimeException](_(future, message)) } "completed with a NonLocalReturnControl" must { val result = "test value" val future = Promise[String]().complete(Failure(new NonLocalReturnControl[String]("test", result))).future behave like futureWithResult(_(future, result)) } "have different ECs" in { def namedCtx(n: String) = ExecutionContext.fromExecutorService( Executors.newSingleThreadExecutor(new ThreadFactory { def newThread(r: Runnable) = new Thread(r, n) })) val A = namedCtx("A") val B = namedCtx("B") // create a promise with ctx A val p = Promise[String]() // I would expect that any callback from p // is executed in the context of p val result = { implicit val ec = A p.future map { _ + Thread.currentThread().getName() } } p.completeWith(Future { "Hi " }(B)) try { Await.result(result, timeout.duration) should be("Hi A") } finally { A.shutdown() B.shutdown() } } } "A Future" when { "awaiting a result" which { "is not completed" must { behave like emptyFuture { test ⇒ val latch = new TestLatch val result = "test value" val future = Future { FutureSpec.ready(latch, TestLatch.DefaultTimeout) result } test(future) latch.open() FutureSpec.ready(future, timeout.duration) } } "is completed" must { behave like futureWithResult { test ⇒ val latch = new TestLatch val result = "test value" val future = Future { FutureSpec.ready(latch, TestLatch.DefaultTimeout) result } latch.open() FutureSpec.ready(future, timeout.duration) test(future, result) } } "has actions applied" must { "pass checks" in { filterException[ArithmeticException] { check({ (future: Future[Int], actions: List[FutureAction]) ⇒ def wrap[T](f: Future[T]): Try[T] = FutureSpec.ready(f, timeout.duration).value.get val result = (future /: actions)(_ /: _) val expected = (wrap(future) /: actions)(_ /: _) ((wrap(result), expected) match { case (Success(a), Success(b)) ⇒ a == b case (Failure(a), Failure(b)) if a.toString == b.toString ⇒ true case (Failure(a), Failure(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty ⇒ a.getClass.toString == b.getClass.toString case _ ⇒ false }) :| result.value.get.toString + " is expected to be " + expected.toString }, minSuccessful(10000), workers(4)) } } } } "from an Actor" which { "returns a result" must { behave like futureWithResult { test ⇒ val actor = system.actorOf(Props[TestActor]) val future = actor ? "Hello" FutureSpec.ready(future, timeout.duration) test(future, "World") system.stop(actor) } } "throws an exception" must { behave like futureWithException[RuntimeException] { test ⇒ filterException[RuntimeException] { val actor = system.actorOf(Props[TestActor]) val future = actor ? "Failure" FutureSpec.ready(future, timeout.duration) test(future, "Expected exception; to test fault-tolerance") system.stop(actor) } } } } "using flatMap with an Actor" which { "will return a result" must { behave like futureWithResult { test ⇒ val actor1 = system.actorOf(Props[TestActor]) val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! s.toUpperCase } })) val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } FutureSpec.ready(future, timeout.duration) test(future, "WORLD") system.stop(actor1) system.stop(actor2) } } "will throw an exception" must { behave like futureWithException[ArithmeticException] { test ⇒ filterException[ArithmeticException] { val actor1 = system.actorOf(Props[TestActor]) val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! Status.Failure(new ArithmeticException("/ by zero")) } })) val future = actor1 ? "Hello" flatMap { case s: String ⇒ actor2 ? s } FutureSpec.ready(future, timeout.duration) test(future, "/ by zero") system.stop(actor1) system.stop(actor2) } } } "will throw a NoSuchElementException when matching wrong type" must { behave like futureWithException[NoSuchElementException] { test ⇒ filterException[NoSuchElementException] { val actor1 = system.actorOf(Props[TestActor]) val actor2 = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! s.toUpperCase } })) val future = actor1 ? "Hello" flatMap { case i: Int ⇒ actor2 ? i } FutureSpec.ready(future, timeout.duration) test(future, "World (of class java.lang.String)") system.stop(actor1) system.stop(actor2) } } } } "being tested" must { "compose with for-comprehensions" in { filterException[ClassCastException] { val actor = system.actorOf(Props(new Actor { def receive = { case s: String ⇒ sender() ! s.length case i: Int ⇒ sender() ! (i * 2).toString } })) val future0 = actor ? "Hello" val future1 = for { a ← future0.mapTo[Int] // returns 5 b ← (actor ? a).mapTo[String] // returns "10" c ← (actor ? 7).mapTo[String] // returns "14" } yield b + "-" + c val future2 = for { a ← future0.mapTo[Int] b ← (actor ? a).mapTo[Int] c ← (actor ? 7).mapTo[String] } yield b + "-" + c Await.result(future1, timeout.duration) should be("10-14") assert(checkType(future1, classTag[String])) intercept[ClassCastException] { Await.result(future2, timeout.duration) } system.stop(actor) } } "support pattern matching within a for-comprehension" in { filterException[NoSuchElementException] { val actor = system.actorOf(Props(new Actor { def receive = { case Req(s: String) ⇒ sender() ! Res(s.length) case Req(i: Int) ⇒ sender() ! Res((i * 2).toString) } })) val future1 = for { Res(a: Int) ← actor ? Req("Hello") Res(b: String) ← actor ? Req(a) Res(c: String) ← actor ? Req(7) } yield b + "-" + c val future2 = for { Res(a: Int) ← actor ? Req("Hello") Res(b: Int) ← actor ? Req(a) Res(c: Int) ← actor ? Req(7) } yield b + "-" + c Await.result(future1, timeout.duration) should be("10-14") intercept[NoSuchElementException] { Await.result(future2, timeout.duration) } system.stop(actor) } } "recover from exceptions" in { filterException[RuntimeException] { val future1 = Future(5) val future2 = future1 map (_ / 0) val future3 = future2 map (_.toString) val future4 = future1 recover { case e: ArithmeticException ⇒ 0 } map (_.toString) val future5 = future2 recover { case e: ArithmeticException ⇒ 0 } map (_.toString) val future6 = future2 recover { case e: MatchError ⇒ 0 } map (_.toString) val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" } val actor = system.actorOf(Props[TestActor]) val future8 = actor ? "Failure" val future9 = actor ? "Failure" recover { case e: RuntimeException ⇒ "FAIL!" } val future10 = actor ? "Hello" recover { case e: RuntimeException ⇒ "FAIL!" } val future11 = actor ? "Failure" recover { case _ ⇒ "Oops!" } Await.result(future1, timeout.duration) should be(5) intercept[ArithmeticException] { Await.result(future2, timeout.duration) } intercept[ArithmeticException] { Await.result(future3, timeout.duration) } Await.result(future4, timeout.duration) should be("5") Await.result(future5, timeout.duration) should be("0") intercept[ArithmeticException] { Await.result(future6, timeout.duration) } Await.result(future7, timeout.duration) should be("You got ERROR") intercept[RuntimeException] { Await.result(future8, timeout.duration) } Await.result(future9, timeout.duration) should be("FAIL!") Await.result(future10, timeout.duration) should be("World") Await.result(future11, timeout.duration) should be("Oops!") system.stop(actor) } } "recoverWith from exceptions" in { val o = new IllegalStateException("original") val r = new IllegalStateException("recovered") val yay = Promise.successful("yay!").future intercept[IllegalStateException] { Await.result(Promise.failed[String](o).future recoverWith { case _ if false == true ⇒ yay }, timeout.duration) } should be(o) Await.result(Promise.failed[String](o).future recoverWith { case _ ⇒ yay }, timeout.duration) should be("yay!") intercept[IllegalStateException] { Await.result(Promise.failed[String](o).future recoverWith { case _ ⇒ Promise.failed[String](r).future }, timeout.duration) } should be(r) } "andThen like a boss" in { val q = new LinkedBlockingQueue[Int] for (i ← 1 to 1000) { Await.result(Future { q.add(1); 3 } andThen { case _ ⇒ q.add(2) } andThen { case Success(0) ⇒ q.add(Int.MaxValue) } andThen { case _ ⇒ q.add(3); }, timeout.duration) should be(3) q.poll() should be(1) q.poll() should be(2) q.poll() should be(3) q.clear() } } "firstCompletedOf" in { val futures = Vector.fill[Future[Int]](10)(Promise[Int]().future) :+ Promise.successful[Int](5).future Await.result(Future.firstCompletedOf(futures), timeout.duration) should be(5) } "find" in { val futures = for (i ← 1 to 10) yield Future { i } val result = Future.find[Int](futures)(_ == 3) Await.result(result, timeout.duration) should be(Some(3)) val notFound = Future.find[Int](futures)(_ == 11) Await.result(notFound, timeout.duration) should be(None) } "fold" in { Await.result(Future.fold((1 to 10).toList map { i ⇒ Future(i) })(0)(_ + _), remainingOrDefault) should be(55) } "zip" in { val timeout = 10000 millis val f = new IllegalStateException("test") intercept[IllegalStateException] { Await.result(Promise.failed[String](f).future zip Promise.successful("foo").future, timeout) } should be(f) intercept[IllegalStateException] { Await.result(Promise.successful("foo").future zip Promise.failed[String](f).future, timeout) } should be(f) intercept[IllegalStateException] { Await.result(Promise.failed[String](f).future zip Promise.failed[String](f).future, timeout) } should be(f) Await.result(Promise.successful("foo").future zip Promise.successful("foo").future, timeout) should be(("foo", "foo")) } "fold by composing" in { val futures = (1 to 10).toList map { i ⇒ Future(i) } Await.result(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) should be(55) } "fold with an exception" in { filterException[IllegalArgumentException] { val futures = (1 to 10).toList map { case 6 ⇒ Future(throw new IllegalArgumentException("shouldFoldResultsWithException: expected")) case i ⇒ Future(i) } intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), remainingOrDefault) }.getMessage should be("shouldFoldResultsWithException: expected") } } "fold mutable zeroes safely" in { import scala.collection.mutable.ArrayBuffer def test(testNumber: Int) { val fs = (0 to 1000) map (i ⇒ Future(i)) val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) { case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef] case (l, _) ⇒ l } val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum assert(result === 250500) } (1 to 100) foreach test //Make sure it tries to provoke the problem } "return zero value if folding empty list" in { Await.result(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) should be(0) } "reduce results" in { val futures = (1 to 10).toList map { i ⇒ Future(i) } assert(Await.result(Future.reduce(futures)(_ + _), remainingOrDefault) === 55) } "reduce results with Exception" in { filterException[IllegalArgumentException] { val futures = (1 to 10).toList map { case 6 ⇒ Future(throw new IllegalArgumentException("shouldReduceResultsWithException: expected")) case i ⇒ Future(i) } intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), remainingOrDefault) }.getMessage should be("shouldReduceResultsWithException: expected") } } "throw IllegalArgumentException on empty input to reduce" in { filterException[IllegalArgumentException] { intercept[java.util.NoSuchElementException] { Await.result(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) } } } "execute onSuccess when received ask reply" in { val latch = new TestLatch val actor = system.actorOf(Props[TestActor]) actor ? "Hello" onSuccess { case "World" ⇒ latch.open() } FutureSpec.ready(latch, 5 seconds) system.stop(actor) } "traverse Futures" in { val oddActor = system.actorOf(Props(new Actor { var counter = 1 def receive = { case 'GetNext ⇒ sender() ! counter counter += 2 } })) val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo classTag[Int]) assert(Await.result(Future.sequence(oddFutures), timeout.duration).sum === 10000) system.stop(oddActor) val list = (1 to 100).toList assert(Await.result(Future.traverse(list)(x ⇒ Future(x * 2 - 1)), timeout.duration).sum === 10000) } "handle Throwables" in { class ThrowableTest(m: String) extends Throwable(m) EventFilter[ThrowableTest](occurrences = 4) intercept { val f1 = Future[Any] { throw new ThrowableTest("test") } intercept[ThrowableTest] { Await.result(f1, timeout.duration) } val latch = new TestLatch val f2 = Future { FutureSpec.ready(latch, 5 seconds); "success" } f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) f2 onSuccess { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) latch.open() assert(Await.result(f2, timeout.duration) === "success") f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) f2 onSuccess { case _ ⇒ throw new ThrowableTest("current thread receive") } assert(Await.result(f3, timeout.duration) === "SUCCESS") } } "block until result" in { val latch = new TestLatch val f = Future { FutureSpec.ready(latch, 5 seconds); 5 } val f2 = Future { Await.result(f, timeout.duration) + 5 } intercept[TimeoutException](FutureSpec.ready(f2, 100 millis)) latch.open() assert(Await.result(f2, timeout.duration) === 10) val f3 = Future { Thread.sleep(100); 5 } filterException[TimeoutException] { intercept[TimeoutException] { FutureSpec.ready(f3, 0 millis) } } } "run callbacks async" in { val latch = Vector.fill(10)(new TestLatch) val f1 = Future { latch(0).open(); FutureSpec.ready(latch(1), TestLatch.DefaultTimeout); "Hello" } val f2 = f1 map { s ⇒ latch(2).open(); FutureSpec.ready(latch(3), TestLatch.DefaultTimeout); s.length } f2 foreach (_ ⇒ latch(4).open()) FutureSpec.ready(latch(0), TestLatch.DefaultTimeout) f1 should not be ('completed) f2 should not be ('completed) latch(1).open() FutureSpec.ready(latch(2), TestLatch.DefaultTimeout) f1 should be('completed) f2 should not be ('completed) val f3 = f1 map { s ⇒ latch(5).open(); FutureSpec.ready(latch(6), TestLatch.DefaultTimeout); s.length * 2 } f3 foreach (_ ⇒ latch(3).open()) FutureSpec.ready(latch(5), TestLatch.DefaultTimeout) f3 should not be ('completed) latch(6).open() FutureSpec.ready(latch(4), TestLatch.DefaultTimeout) f2 should be('completed) f3 should be('completed) val p1 = Promise[String]() val f4 = p1.future map { s ⇒ latch(7).open(); FutureSpec.ready(latch(8), TestLatch.DefaultTimeout); s.length } f4 foreach (_ ⇒ latch(9).open()) p1 should not be ('completed) f4 should not be ('completed) p1 complete Success("Hello") FutureSpec.ready(latch(7), TestLatch.DefaultTimeout) p1 should be('completed) f4 should not be ('completed) latch(8).open() FutureSpec.ready(latch(9), TestLatch.DefaultTimeout) FutureSpec.ready(f4, timeout.duration) should be('completed) } "not deadlock with nested await (ticket 1313)" in { val simple = Future(()) map (_ ⇒ Await.result((Future(()) map (_ ⇒ ())), timeout.duration)) FutureSpec.ready(simple, timeout.duration) should be('completed) val l1, l2 = new TestLatch val complex = Future(()) map { _ ⇒ val nested = Future(()) nested foreach (_ ⇒ l1.open()) FutureSpec.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed nested foreach (_ ⇒ l2.open()) FutureSpec.ready(l2, TestLatch.DefaultTimeout) } FutureSpec.ready(complex, timeout.duration) should be('completed) } "re-use the same thread for nested futures with batching ExecutionContext" in { val failCount = new java.util.concurrent.atomic.AtomicInteger val f = Future(()) flatMap { _ ⇒ val originalThread = Thread.currentThread // run some nested futures val nested = for (i ← 1 to 100) yield Future.successful("abc") flatMap { _ ⇒ if (Thread.currentThread ne originalThread) failCount.incrementAndGet // another level of nesting Future.successful("xyz") map { _ ⇒ if (Thread.currentThread ne originalThread) failCount.incrementAndGet } } Future.sequence(nested) } Await.ready(f, timeout.duration) // TODO re-enable once we're using the batching dispatcher // failCount.get should be(0) } } } def emptyFuture(f: (Future[Any] ⇒ Unit) ⇒ Unit) { "not be completed" in { f(_ should not be ('completed)) } "not contain a value" in { f(_.value should be(None)) } } def futureWithResult(f: ((Future[Any], Any) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future should be('completed)) } "contain a value" in { f((future, result) ⇒ future.value should be(Some(Success(result)))) } "return result with 'get'" in { f((future, result) ⇒ Await.result(future, timeout.duration) should be(result)) } "return result with 'Await.result'" in { f((future, result) ⇒ Await.result(future, timeout.duration) should be(result)) } "not timeout" in { f((future, _) ⇒ FutureSpec.ready(future, 0 millis)) } "filter result" in { f { (future, result) ⇒ Await.result((future filter (_ ⇒ true)), timeout.duration) should be(result) evaluating { Await.result((future filter (_ ⇒ false)), timeout.duration) } should produce[java.util.NoSuchElementException] } } "transform result with map" in { f((future, result) ⇒ Await.result((future map (_.toString.length)), timeout.duration) should be(result.toString.length)) } "compose result with flatMap" in { f { (future, result) ⇒ val r = for (r ← future; p ← Promise.successful("foo").future) yield r.toString + p Await.result(r, timeout.duration) should be(result.toString + "foo") } } "perform action with foreach" in { f { (future, result) ⇒ val p = Promise[Any]() future foreach p.success Await.result(p.future, timeout.duration) should be(result) } } "zip properly" in { f { (future, result) ⇒ Await.result(future zip Promise.successful("foo").future, timeout.duration) should be((result, "foo")) (evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, timeout.duration) } should produce[RuntimeException]).getMessage should be("ohnoes") } } "not recover from exception" in { f((future, result) ⇒ Await.result(future.recover({ case _ ⇒ "pigdog" }), timeout.duration) should be(result)) } "perform action on result" in { f { (future, result) ⇒ val p = Promise[Any]() future.onSuccess { case x ⇒ p.success(x) } Await.result(p.future, timeout.duration) should be(result) } } "not project a failure" in { f((future, result) ⇒ (evaluating { Await.result(future.failed, timeout.duration) } should produce[NoSuchElementException]).getMessage should be("Future.failed not completed with a throwable.")) } "not perform action on exception" is pending "cast using mapTo" in { f((future, result) ⇒ Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), timeout.duration) should be(false)) } } def futureWithException[E <: Throwable: ClassTag](f: ((Future[Any], String) ⇒ Unit) ⇒ Unit) { "be completed" in { f((future, _) ⇒ future should be('completed)) } "contain a value" in { f((future, message) ⇒ { future.value should be('defined) future.value.get should be('failure) val Failure(f) = future.value.get f.getMessage should be(message) }) } "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) } "throw exception with 'Await.result'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) } "retain exception with filter" in { f { (future, message) ⇒ (evaluating { Await.result(future filter (_ ⇒ true), timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message) (evaluating { Await.result(future filter (_ ⇒ false), timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message) } } "retain exception with map" in { f((future, message) ⇒ (evaluating { Await.result(future map (_.toString.length), timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) } "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo").future), timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) } "not perform action with foreach" is pending "zip properly" in { f { (future, message) ⇒ (evaluating { Await.result(future zip Promise.successful("foo").future, timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message) } } "recover from exception" in { f((future, message) ⇒ Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), timeout.duration) should be("pigdog")) } "not perform action on result" is pending "project a failure" in { f((future, message) ⇒ Await.result(future.failed, timeout.duration).getMessage should be(message)) } "perform action on exception" in { f { (future, message) ⇒ val p = Promise[Any]() future.onFailure { case _ ⇒ p.success(message) } Await.result(p.future, timeout.duration) should be(message) } } "always cast successfully using mapTo" in { f((future, message) ⇒ (evaluating { Await.result(future.mapTo[java.lang.Thread], timeout.duration) } should produce[java.lang.Exception]).getMessage should be(message)) } } sealed trait IntAction { def apply(that: Int): Int } final case class IntAdd(n: Int) extends IntAction { def apply(that: Int) = that + n } final case class IntSub(n: Int) extends IntAction { def apply(that: Int) = that - n } final case class IntMul(n: Int) extends IntAction { def apply(that: Int) = that * n } final case class IntDiv(n: Int) extends IntAction { def apply(that: Int) = that / n } sealed trait FutureAction { def /:(that: Try[Int]): Try[Int] def /:(that: Future[Int]): Future[Int] } final case class MapAction(action: IntAction) extends FutureAction { def /:(that: Try[Int]): Try[Int] = that map action.apply def /:(that: Future[Int]): Future[Int] = that map action.apply } final case class FlatMapAction(action: IntAction) extends FutureAction { def /:(that: Try[Int]): Try[Int] = that map action.apply def /:(that: Future[Int]): Future[Int] = that flatMap (n ⇒ Future.successful(action(n))) } implicit def arbFuture: Arbitrary[Future[Int]] = Arbitrary(for (n ← arbitrary[Int]) yield Future(n)) implicit def arbFutureAction: Arbitrary[FutureAction] = Arbitrary { val genIntAction = for { n ← arbitrary[Int] a ← Gen.oneOf(IntAdd(n), IntSub(n), IntMul(n), IntDiv(n)) } yield a val genMapAction = genIntAction map (MapAction(_)) val genFlatMapAction = genIntAction map (FlatMapAction(_)) Gen.oneOf(genMapAction, genFlatMapAction) } def checkType[A: ClassTag, B](in: Future[A], reftag: ClassTag[B]): Boolean = implicitly[ClassTag[A]].runtimeClass == reftag.runtimeClass } Other Akka source code examplesHere is a short list of links related to this Akka FutureSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.