|
Akka/Scala example source code file (FutureDocSpec.scala)
The FutureDocSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package docs.future import language.postfixOps import akka.testkit._ import akka.actor.{ Actor, Props } import akka.actor.Status import akka.util.Timeout import scala.concurrent.duration._ import java.lang.IllegalStateException import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.util.{ Failure, Success } object FutureDocSpec { class MyActor extends Actor { def receive = { case x: String => sender() ! x.toUpperCase case x: Int if x < 0 => sender() ! Status.Failure(new ArithmeticException("Negative values not supported")) case x: Int => sender() ! x } } case object GetNext class OddActor extends Actor { var n = 1 def receive = { case GetNext => sender() ! n n += 2 } } } class FutureDocSpec extends AkkaSpec { import FutureDocSpec._ import system.dispatcher val println: PartialFunction[Any, Unit] = { case _ => } "demonstrate usage custom ExecutionContext" in { val yourExecutorServiceGoesHere = java.util.concurrent.Executors.newSingleThreadExecutor() //#diy-execution-context import scala.concurrent.{ ExecutionContext, Promise } implicit val ec = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere) // Do stuff with your brand new shiny ExecutionContext val f = Promise.successful("foo") // Then shut your ExecutionContext down at some // appropriate place in your program/application ec.shutdown() //#diy-execution-context } "demonstrate usage of blocking from actor" in { val actor = system.actorOf(Props[MyActor]) val msg = "hello" //#ask-blocking import scala.concurrent.Await import akka.pattern.ask import akka.util.Timeout import scala.concurrent.duration._ implicit val timeout = Timeout(5 seconds) val future = actor ? msg // enabled by the “ask” import val result = Await.result(future, timeout.duration).asInstanceOf[String] //#ask-blocking //#pipe-to import akka.pattern.pipe future pipeTo actor //#pipe-to result should be("HELLO") } "demonstrate usage of mapTo" in { val actor = system.actorOf(Props[MyActor]) val msg = "hello" implicit val timeout = Timeout(5 seconds) //#map-to import scala.concurrent.Future import akka.pattern.ask val future: Future[String] = ask(actor, msg).mapTo[String] //#map-to Await.result(future, timeout.duration) should be("HELLO") } "demonstrate usage of simple future eval" in { //#future-eval import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ val future = Future { "Hello" + "World" } future foreach println //#future-eval Await.result(future, 3 seconds) should be("HelloWorld") } "demonstrate usage of map" in { //#map val f1 = Future { "Hello" + "World" } val f2 = f1 map { x => x.length } f2 foreach println //#map val result = Await.result(f2, 3 seconds) result should be(10) f1.value should be(Some(Success("HelloWorld"))) } "demonstrate wrong usage of nested map" in { //#wrong-nested-map val f1 = Future { "Hello" + "World" } val f2 = Future.successful(3) val f3 = f1 map { x => f2 map { y => x.length * y } } f3 foreach println //#wrong-nested-map Await.ready(f3, 3 seconds) } "demonstrate usage of flatMap" in { //#flat-map val f1 = Future { "Hello" + "World" } val f2 = Future.successful(3) val f3 = f1 flatMap { x => f2 map { y => x.length * y } } f3 foreach println //#flat-map val result = Await.result(f3, 3 seconds) result should be(30) } "demonstrate usage of filter" in { //#filter val future1 = Future.successful(4) val future2 = future1.filter(_ % 2 == 0) future2 foreach println val failedFilter = future1.filter(_ % 2 == 1).recover { // When filter fails, it will have a java.util.NoSuchElementException case m: NoSuchElementException => 0 } failedFilter foreach println //#filter val result = Await.result(future2, 3 seconds) result should be(4) val result2 = Await.result(failedFilter, 3 seconds) result2 should be(0) //Can only be 0 when there was a MatchError } "demonstrate usage of for comprehension" in { //#for-comprehension val f = for { a <- Future(10 / 2) // 10 / 2 = 5 b <- Future(a + 1) // 5 + 1 = 6 c <- Future(a - 1) // 5 - 1 = 4 if c > 3 // Future.filter } yield b * c // 6 * 4 = 24 // Note that the execution of futures a, b, and c // are not done in parallel. f foreach println //#for-comprehension val result = Await.result(f, 3 seconds) result should be(24) } "demonstrate wrong way of composing" in { val actor1 = system.actorOf(Props[MyActor]) val actor2 = system.actorOf(Props[MyActor]) val actor3 = system.actorOf(Props[MyActor]) val msg1 = 1 val msg2 = 2 implicit val timeout = Timeout(5 seconds) import scala.concurrent.Await import akka.pattern.ask //#composing-wrong val f1 = ask(actor1, msg1) val f2 = ask(actor2, msg2) val a = Await.result(f1, 3 seconds).asInstanceOf[Int] val b = Await.result(f2, 3 seconds).asInstanceOf[Int] val f3 = ask(actor3, (a + b)) val result = Await.result(f3, 3 seconds).asInstanceOf[Int] //#composing-wrong result should be(3) } "demonstrate composing" in { val actor1 = system.actorOf(Props[MyActor]) val actor2 = system.actorOf(Props[MyActor]) val actor3 = system.actorOf(Props[MyActor]) val msg1 = 1 val msg2 = 2 implicit val timeout = Timeout(5 seconds) import scala.concurrent.Await import akka.pattern.ask //#composing val f1 = ask(actor1, msg1) val f2 = ask(actor2, msg2) val f3 = for { a <- f1.mapTo[Int] b <- f2.mapTo[Int] c <- ask(actor3, (a + b)).mapTo[Int] } yield c f3 foreach println //#composing val result = Await.result(f3, 3 seconds).asInstanceOf[Int] result should be(3) } "demonstrate usage of sequence with actors" in { implicit val timeout = Timeout(5 seconds) val oddActor = system.actorOf(Props[OddActor]) //#sequence-ask // oddActor returns odd numbers sequentially from 1 as a List[Future[Int]] val listOfFutures = List.fill(100)(akka.pattern.ask(oddActor, GetNext).mapTo[Int]) // now we have a Future[List[Int]] val futureList = Future.sequence(listOfFutures) // Find the sum of the odd numbers val oddSum = futureList.map(_.sum) oddSum foreach println //#sequence-ask Await.result(oddSum, 3 seconds).asInstanceOf[Int] should be(10000) } "demonstrate usage of sequence" in { //#sequence val futureList = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1))) val oddSum = futureList.map(_.sum) oddSum foreach println //#sequence Await.result(oddSum, 3 seconds).asInstanceOf[Int] should be(10000) } "demonstrate usage of traverse" in { //#traverse val futureList = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1)) val oddSum = futureList.map(_.sum) oddSum foreach println //#traverse Await.result(oddSum, 3 seconds).asInstanceOf[Int] should be(10000) } "demonstrate usage of fold" in { //#fold // Create a sequence of Futures val futures = for (i <- 1 to 1000) yield Future(i * 2) val futureSum = Future.fold(futures)(0)(_ + _) futureSum foreach println //#fold Await.result(futureSum, 3 seconds) should be(1001000) } "demonstrate usage of reduce" in { //#reduce // Create a sequence of Futures val futures = for (i <- 1 to 1000) yield Future(i * 2) val futureSum = Future.reduce(futures)(_ + _) futureSum foreach println //#reduce Await.result(futureSum, 3 seconds) should be(1001000) } "demonstrate usage of recover" in { implicit val timeout = Timeout(5 seconds) val actor = system.actorOf(Props[MyActor]) val msg1 = -1 //#recover val future = akka.pattern.ask(actor, msg1) recover { case e: ArithmeticException => 0 } future foreach println //#recover Await.result(future, 3 seconds) should be(0) } "demonstrate usage of recoverWith" in { implicit val timeout = Timeout(5 seconds) val actor = system.actorOf(Props[MyActor]) val msg1 = -1 //#try-recover val future = akka.pattern.ask(actor, msg1) recoverWith { case e: ArithmeticException => Future.successful(0) case foo: IllegalArgumentException => Future.failed[Int](new IllegalStateException("All br0ken!")) } future foreach println //#try-recover Await.result(future, 3 seconds) should be(0) } "demonstrate usage of zip" in { val future1 = Future { "foo" } val future2 = Future { "bar" } //#zip val future3 = future1 zip future2 map { case (a, b) => a + " " + b } future3 foreach println //#zip Await.result(future3, 3 seconds) should be("foo bar") } "demonstrate usage of andThen" in { def loadPage(s: String) = s val url = "foo bar" def log(cause: Throwable) = () def watchSomeTV(): Unit = () //#and-then val result = Future { loadPage(url) } andThen { case Failure(exception) => log(exception) } andThen { case _ => watchSomeTV() } result foreach println //#and-then Await.result(result, 3 seconds) should be("foo bar") } "demonstrate usage of fallbackTo" in { val future1 = Future { "foo" } val future2 = Future { "bar" } val future3 = Future { "pigdog" } //#fallback-to val future4 = future1 fallbackTo future2 fallbackTo future3 future4 foreach println //#fallback-to Await.result(future4, 3 seconds) should be("foo") } "demonstrate usage of onSuccess & onFailure & onComplete" in { { val future = Future { "foo" } //#onSuccess future onSuccess { case "bar" => println("Got my bar alright!") case x: String => println("Got some random string: " + x) } //#onSuccess Await.result(future, 3 seconds) should be("foo") } { val future = Future.failed[String](new IllegalStateException("OHNOES")) //#onFailure future onFailure { case ise: IllegalStateException if ise.getMessage == "OHNOES" => //OHNOES! We are in deep trouble, do something! case e: Exception => //Do something else } //#onFailure } { val future = Future { "foo" } def doSomethingOnSuccess(r: String) = () def doSomethingOnFailure(t: Throwable) = () //#onComplete future onComplete { case Success(result) => doSomethingOnSuccess(result) case Failure(failure) => doSomethingOnFailure(failure) } //#onComplete Await.result(future, 3 seconds) should be("foo") } } "demonstrate usage of Future.successful & Future.failed & Future.promise" in { //#successful val future = Future.successful("Yay!") //#successful //#failed val otherFuture = Future.failed[String](new IllegalArgumentException("Bang!")) //#failed //#promise val promise = Promise[String]() val theFuture = promise.future promise.success("hello") //#promise Await.result(future, 3 seconds) should be("Yay!") intercept[IllegalArgumentException] { Await.result(otherFuture, 3 seconds) } Await.result(theFuture, 3 seconds) should be("hello") } "demonstrate usage of pattern.after" in { //#after // TODO after is unfortunately shadowed by ScalaTest, fix as part of #3759 // import akka.pattern.after val delayed = akka.pattern.after(200 millis, using = system.scheduler)(Future.failed( new IllegalStateException("OHNOES"))) val future = Future { Thread.sleep(1000); "foo" } val result = Future firstCompletedOf Seq(future, delayed) //#after intercept[IllegalStateException] { Await.result(result, 2 second) } } "demonstrate context.dispatcher" in { //#context-dispatcher class A extends Actor { import context.dispatcher val f = Future("hello") def receive = { //#receive-omitted case _ => //#receive-omitted } } //#context-dispatcher } } Other Akka source code examplesHere is a short list of links related to this Akka FutureDocSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.