|
Akka/Scala example source code file (ActorWithStashSpec.scala)
The ActorWithStashSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.actor import language.postfixOps import akka.testkit._ import akka.testkit.DefaultTimeout import akka.testkit.TestEvent._ import scala.concurrent.Await import akka.pattern.ask import scala.concurrent.duration._ import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.BeforeAndAfterEach import org.scalatest.junit.JUnitSuiteLike object ActorWithStashSpec { class StashingActor extends Actor with Stash { import context.system def greeted: Receive = { case "bye" ⇒ state.s = "bye" state.finished.await case _ ⇒ // do nothing } def receive = { case "hello" ⇒ state.s = "hello" unstashAll() context.become(greeted) case msg ⇒ stash() } } class StashingTwiceActor extends Actor with Stash { def receive = { case "hello" ⇒ try { stash() stash() } catch { case e: IllegalStateException ⇒ state.expectedException.open() } case msg ⇒ // do nothing } } class ActorWithProtocol extends Actor with Stash { import context.system def receive = { case "open" ⇒ unstashAll() context.become { case "write" ⇒ // do writing... case "close" ⇒ unstashAll() context.unbecome() case msg ⇒ stash() } case "done" ⇒ state.finished.await case msg ⇒ stash() } } class WatchedActor extends Actor { def receive = Actor.emptyBehavior } class TerminatedMessageStashingActor(probe: ActorRef) extends Actor with Stash { val watched = context.watch(context.actorOf(Props[WatchedActor])) var stashed = false context.stop(watched) def receive = { case Terminated(`watched`) ⇒ if (!stashed) { stash() stashed = true unstashAll() } probe ! "terminated" } } object state { @volatile var s: String = "" val finished = TestBarrier(2) var expectedException: TestLatch = null } val testConf = """ akka.actor.serialize-messages = off """ } class JavaActorWithStashSpec extends StashJavaAPI with JUnitSuiteLike @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach { import ActorWithStashSpec._ override def atStartup { system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) } override def beforeEach() = state.finished.reset "An Actor with Stash" must { "stash messages" in { val stasher = system.actorOf(Props(new StashingActor)) stasher ! "bye" stasher ! "hello" state.finished.await state.s should be("bye") } "support protocols" in { val protoActor = system.actorOf(Props[ActorWithProtocol]) protoActor ! "open" protoActor ! "write" protoActor ! "open" protoActor ! "close" protoActor ! "write" protoActor ! "close" protoActor ! "done" state.finished.await } "throw an IllegalStateException if the same messages is stashed twice" in { state.expectedException = new TestLatch val stasher = system.actorOf(Props[StashingTwiceActor]) stasher ! "hello" stasher ! "hello" Await.ready(state.expectedException, 10 seconds) } "process stashed messages after restart" in { val boss = system.actorOf(Props(new Supervisor( OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable]))))) val restartLatch = new TestLatch val hasMsgLatch = new TestLatch val slaveProps = Props(new Actor with Stash { def receive = { case "crash" ⇒ throw new Exception("Crashing...") // when restartLatch is not yet open, stash all messages != "crash" case msg if !restartLatch.isOpen ⇒ stash() // when restartLatch is open, must receive "hello" case "hello" ⇒ hasMsgLatch.open() } override def preRestart(reason: Throwable, message: Option[Any]) = { if (!restartLatch.isOpen) restartLatch.open() super.preRestart(reason, message) } }) val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) slave ! "hello" slave ! "crash" Await.ready(restartLatch, 10 seconds) Await.ready(hasMsgLatch, 10 seconds) } "re-receive unstashed Terminated messages" in { system.actorOf(Props(classOf[TerminatedMessageStashingActor], testActor)) expectMsg("terminated") expectMsg("terminated") } } "An ActWithStash" must { "allow using whenRestarted" in { import ActorDSL._ val a = actor(new ActWithStash { become { case "die" ⇒ throw new RuntimeException("dying") } whenRestarted { thr ⇒ testActor ! "restarted" } }) EventFilter[RuntimeException]("dying", occurrences = 1) intercept { a ! "die" } expectMsg("restarted") } "allow using whenStopping" in { import ActorDSL._ val a = actor(new ActWithStash { whenStopping { testActor ! "stopping" } }) a ! PoisonPill expectMsg("stopping") } } } Other Akka source code examplesHere is a short list of links related to this Akka ActorWithStashSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.