|
Akka/Scala example source code file (ActorDocSpec.scala)
The ActorDocSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package docs.actor import language.postfixOps //#imports1 import akka.actor.Actor import akka.actor.Props import akka.event.Logging //#imports1 import scala.concurrent.Future import akka.actor.{ ActorRef, ActorSystem, PoisonPill, Terminated, ActorLogging } import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.Matchers import akka.testkit._ import akka.util._ import scala.concurrent.duration._ import scala.concurrent.Await //#my-actor class MyActor extends Actor { val log = Logging(context.system, this) def receive = { case "test" => log.info("received test") case _ => log.info("received unknown message") } } //#my-actor final case class DoIt(msg: ImmutableMessage) final case class Message(s: String) //#context-actorOf class FirstActor extends Actor { val child = context.actorOf(Props[MyActor], name = "myChild") //#plus-some-behavior def receive = { case x => sender() ! x } //#plus-some-behavior } //#context-actorOf class ActorWithArgs(arg: String) extends Actor { def receive = { case _ => () } } class DemoActorWrapper extends Actor { //#props-factory object DemoActor { /** * Create Props for an actor of this type. * @param magciNumber The magic number to be passed to this actor’s constructor. * @return a Props for creating this actor, which can then be further configured * (e.g. calling `.withDispatcher()` on it) */ def props(magicNumber: Int): Props = Props(new DemoActor(magicNumber)) } class DemoActor(magicNumber: Int) extends Actor { def receive = { case x: Int => sender() ! (x + magicNumber) } } class SomeOtherActor extends Actor { // Props(new DemoActor(42)) would not be safe context.actorOf(DemoActor.props(42), "demo") // ... //#props-factory def receive = { case msg => } //#props-factory } //#props-factory def receive = Actor.emptyBehavior } class Hook extends Actor { var child: ActorRef = _ //#preStart override def preStart() { child = context.actorOf(Props[MyActor], "child") } //#preStart def receive = Actor.emptyBehavior //#postStop override def postStop() { //#clean-up-some-resources () //#clean-up-some-resources } //#postStop } class ReplyException extends Actor { def receive = { case _ => //#reply-exception try { val result = operation() sender() ! result } catch { case e: Exception => sender() ! akka.actor.Status.Failure(e) throw e } //#reply-exception } def operation(): String = { "Hi" } } //#gracefulStop-actor object Manager { case object Shutdown } class Manager extends Actor { import Manager._ val worker = context.watch(context.actorOf(Props[Cruncher], "worker")) def receive = { case "job" => worker ! "crunch" case Shutdown => worker ! PoisonPill context become shuttingDown } def shuttingDown: Receive = { case "job" => sender() ! "service unavailable, shutting down" case Terminated(`worker`) => context stop self } } //#gracefulStop-actor class Cruncher extends Actor { def receive = { case "crunch" => // crunch... } } //#swapper case object Swap class Swapper extends Actor { import context._ val log = Logging(system, this) def receive = { case Swap => log.info("Hi") become({ case Swap => log.info("Ho") unbecome() // resets the latest 'become' (just for fun) }, discardOld = false) // push on top instead of replace } } object SwapperApp extends App { val system = ActorSystem("SwapperSystem") val swap = system.actorOf(Props[Swapper], name = "swapper") swap ! Swap // logs Hi swap ! Swap // logs Ho swap ! Swap // logs Hi swap ! Swap // logs Ho swap ! Swap // logs Hi swap ! Swap // logs Ho } //#swapper //#receive-orElse trait ProducerBehavior { this: Actor => val producerBehavior: Receive = { case GiveMeThings => sender() ! Give("thing") } } trait ConsumerBehavior { this: Actor with ActorLogging => val consumerBehavior: Receive = { case ref: ActorRef => ref ! GiveMeThings case Give(thing) => log.info("Got a thing! It's {}", thing) } } class Producer extends Actor with ProducerBehavior { def receive = producerBehavior } class Consumer extends Actor with ActorLogging with ConsumerBehavior { def receive = consumerBehavior } class ProducerConsumer extends Actor with ActorLogging with ProducerBehavior with ConsumerBehavior { def receive = producerBehavior orElse consumerBehavior } // protocol case object GiveMeThings final case class Give(thing: Any) //#receive-orElse class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "import context" in { new AnyRef { //#import-context class FirstActor extends Actor { import context._ val myActor = actorOf(Props[MyActor], name = "myactor") def receive = { case x => myActor ! x } } //#import-context val first = system.actorOf(Props(classOf[FirstActor], this), name = "first") system.stop(first) } } "creating actor with system.actorOf" in { val myActor = system.actorOf(Props[MyActor]) // testing the actor // TODO: convert docs to AkkaSpec(Map(...)) val filter = EventFilter.custom { case e: Logging.Info => true case _ => false } system.eventStream.publish(TestEvent.Mute(filter)) system.eventStream.subscribe(testActor, classOf[Logging.Info]) myActor ! "test" expectMsgPF(1 second) { case Logging.Info(_, _, "received test") => true } myActor ! "unknown" expectMsgPF(1 second) { case Logging.Info(_, _, "received unknown message") => true } system.eventStream.unsubscribe(testActor) system.eventStream.publish(TestEvent.UnMute(filter)) system.stop(myActor) } "creating a Props config" in { //#creating-props import akka.actor.Props val props1 = Props[MyActor] val props2 = Props(new ActorWithArgs("arg")) // careful, see below val props3 = Props(classOf[ActorWithArgs], "arg") //#creating-props //#creating-props-deprecated // NOT RECOMMENDED within another actor: // encourages to close over enclosing class val props7 = Props(new MyActor) //#creating-props-deprecated } "creating actor with Props" in { //#system-actorOf import akka.actor.ActorSystem // ActorSystem is a heavy object: create only one per application val system = ActorSystem("mySystem") val myActor = system.actorOf(Props[MyActor], "myactor2") //#system-actorOf shutdown(system) } "creating actor with IndirectActorProducer" in { class Echo(name: String) extends Actor { def receive = { case n: Int => sender() ! name case message => val target = testActor //#forward target forward message //#forward } } val a: { def actorRef: ActorRef } = new AnyRef { val applicationContext = this //#creating-indirectly import akka.actor.IndirectActorProducer class DependencyInjector(applicationContext: AnyRef, beanName: String) extends IndirectActorProducer { override def actorClass = classOf[Actor] override def produce = //#obtain-fresh-Actor-instance-from-DI-framework new Echo(beanName) def this(beanName: String) = this("", beanName) //#obtain-fresh-Actor-instance-from-DI-framework } val actorRef = system.actorOf( Props(classOf[DependencyInjector], applicationContext, "hello"), "helloBean") //#creating-indirectly } val actorRef = { import scala.language.reflectiveCalls a.actorRef } val message = 42 implicit val self = testActor //#tell actorRef ! message //#tell expectMsg("hello") actorRef ! "huhu" expectMsg("huhu") } "using implicit timeout" in { val myActor = system.actorOf(Props[FirstActor]) //#using-implicit-timeout import scala.concurrent.duration._ import akka.util.Timeout import akka.pattern.ask implicit val timeout = Timeout(5 seconds) val future = myActor ? "hello" //#using-implicit-timeout Await.result(future, timeout.duration) should be("hello") } "using explicit timeout" in { val myActor = system.actorOf(Props[FirstActor]) //#using-explicit-timeout import scala.concurrent.duration._ import akka.pattern.ask val future = myActor.ask("hello")(5 seconds) //#using-explicit-timeout Await.result(future, 5 seconds) should be("hello") } "using receiveTimeout" in { //#receive-timeout import akka.actor.ReceiveTimeout import scala.concurrent.duration._ class MyActor extends Actor { // To set an initial delay context.setReceiveTimeout(30 milliseconds) def receive = { case "Hello" => // To set in a response to a message context.setReceiveTimeout(100 milliseconds) case ReceiveTimeout => // To turn it off context.setReceiveTimeout(Duration.Undefined) throw new RuntimeException("Receive timed out") } } //#receive-timeout } //#hot-swap-actor class HotSwapActor extends Actor { import context._ def angry: Receive = { case "foo" => sender() ! "I am already angry?" case "bar" => become(happy) } def happy: Receive = { case "bar" => sender() ! "I am already happy :-)" case "foo" => become(angry) } def receive = { case "foo" => become(angry) case "bar" => become(happy) } } //#hot-swap-actor "using hot-swap" in { val actor = system.actorOf(Props(classOf[HotSwapActor], this), name = "hot") } "using Stash" in { //#stash import akka.actor.Stash class ActorWithProtocol extends Actor with Stash { def receive = { case "open" => unstashAll() context.become({ case "write" => // do writing... case "close" => unstashAll() context.unbecome() case msg => stash() }, discardOld = false) // stack on top instead of replacing case msg => stash() } } //#stash } "using watch" in { new AnyRef { //#watch import akka.actor.{ Actor, Props, Terminated } class WatchActor extends Actor { val child = context.actorOf(Props.empty, "child") context.watch(child) // <-- this is the only call needed for registration var lastSender = system.deadLetters def receive = { case "kill" => context.stop(child); lastSender = sender() case Terminated(`child`) => lastSender ! "finished" } } //#watch val a = system.actorOf(Props(classOf[WatchActor], this)) implicit val sender = testActor a ! "kill" expectMsg("finished") } } "demonstrate ActorSelection" in { val context = system //#selection-local // will look up this absolute path context.actorSelection("/user/serviceA/aggregator") // will look up sibling beneath same supervisor context.actorSelection("../joe") //#selection-local //#selection-wildcard // will look all children to serviceB with names starting with worker context.actorSelection("/user/serviceB/worker*") // will look up all siblings beneath same supervisor context.actorSelection("../*") //#selection-wildcard //#selection-remote context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB") //#selection-remote } "using Identify" in { new AnyRef { //#identify import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated } class Follower extends Actor { val identifyId = 1 context.actorSelection("/user/another") ! Identify(identifyId) def receive = { case ActorIdentity(`identifyId`, Some(ref)) => context.watch(ref) context.become(active(ref)) case ActorIdentity(`identifyId`, None) => context.stop(self) } def active(another: ActorRef): Actor.Receive = { case Terminated(`another`) => context.stop(self) } } //#identify val a = system.actorOf(Props.empty) val b = system.actorOf(Props(classOf[Follower], this)) watch(b) system.stop(a) expectMsgType[akka.actor.Terminated].actor should be(b) } } "using pattern gracefulStop" in { val actorRef = system.actorOf(Props[Manager]) //#gracefulStop import akka.pattern.gracefulStop import scala.concurrent.Await try { val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds, Manager.Shutdown) Await.result(stopped, 6 seconds) // the actor has been stopped } catch { // the actor wasn't stopped within 5 seconds case e: akka.pattern.AskTimeoutException => } //#gracefulStop } "using pattern ask / pipeTo" in { val actorA, actorB, actorC, actorD = system.actorOf(Props.empty) //#ask-pipeTo import akka.pattern.{ ask, pipe } import system.dispatcher // The ExecutionContext that will be used final case class Result(x: Int, s: String, d: Double) case object Request implicit val timeout = Timeout(5 seconds) // needed for `?` below val f: Future[Result] = for { x <- ask(actorA, Request).mapTo[Int] // call pattern directly s <- (actorB ask Request).mapTo[String] // call by implicit conversion d <- (actorC ? Request).mapTo[Double] // call by symbolic name } yield Result(x, s, d) f pipeTo actorD // .. or .. pipe(f) to actorD //#ask-pipeTo } class Replier extends Actor { def receive = { case ref: ActorRef => //#reply-with-sender sender().tell("reply", context.parent) // replies will go back to parent sender().!("reply")(context.parent) // alternative syntax (beware of the parens!) //#reply-with-sender case x => //#reply-without-sender sender() ! x // replies will go to this actor //#reply-without-sender } } "replying with own or other sender" in { val actor = system.actorOf(Props(classOf[Replier], this)) implicit val me = testActor actor ! 42 expectMsg(42) lastSender should be(actor) actor ! me expectMsg("reply") lastSender.path.toStringWithoutAddress should be("/user") expectMsg("reply") lastSender.path.toStringWithoutAddress should be("/user") } "using ActorDSL outside of akka.actor package" in { import akka.actor.ActorDSL._ actor(new Act { superviseWith(OneForOneStrategy() { case _ => Stop; Restart; Resume; Escalate }) superviseWith(AllForOneStrategy() { case _ => Stop; Restart; Resume; Escalate }) }) } } Other Akka source code examplesHere is a short list of links related to this Akka ActorDocSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.