|
Akka/Scala example source code file (TestActorRefSpec.scala)
The TestActorRefSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.testkit import language.{ postfixOps, reflectiveCalls } import org.scalatest.Matchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ import akka.event.Logging.Warning import scala.concurrent.{ Future, Promise, Await } import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.pattern.ask import akka.dispatch.Dispatcher /** * Test whether TestActorRef behaves as an ActorRef should, besides its own spec. */ object TestActorRefSpec { var counter = 4 val thread = Thread.currentThread var otherthread: Thread = null trait TActor extends Actor { def receive = new Receive { val recv = receiveT def isDefinedAt(o: Any) = recv.isDefinedAt(o) def apply(o: Any) { if (Thread.currentThread ne thread) otherthread = Thread.currentThread recv(o) } } def receiveT: Receive } class ReplyActor extends TActor { import context.system var replyTo: ActorRef = null def receiveT = { case "complexRequest" ⇒ { replyTo = sender() val worker = TestActorRef(Props[WorkerActor]) worker ! "work" } case "complexRequest2" ⇒ val worker = TestActorRef(Props[WorkerActor]) worker ! sender() case "workDone" ⇒ replyTo ! "complexReply" case "simpleRequest" ⇒ sender() ! "simpleReply" } } class WorkerActor() extends TActor { def receiveT = { case "work" ⇒ sender() ! "workDone" context stop self case replyTo: Promise[_] ⇒ replyTo.asInstanceOf[Promise[Any]].success("complexReply") case replyTo: ActorRef ⇒ replyTo ! "complexReply" } } class SenderActor(replyActor: ActorRef) extends TActor { def receiveT = { case "complex" ⇒ replyActor ! "complexRequest" case "complex2" ⇒ replyActor ! "complexRequest2" case "simple" ⇒ replyActor ! "simpleRequest" case "complexReply" ⇒ { counter -= 1 } case "simpleReply" ⇒ { counter -= 1 } } } class Logger extends Actor { var count = 0 var msg: String = _ def receive = { case Warning(_, _, m: String) ⇒ count += 1; msg = m } } class ReceiveTimeoutActor(target: ActorRef) extends Actor { context setReceiveTimeout 1.second def receive = { case ReceiveTimeout ⇒ target ! "timeout" context stop self } } /** * Forwarding `Terminated` to non-watching testActor is not possible, * and therefore the `Terminated` message is wrapped. */ final case class WrappedTerminated(t: Terminated) } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndAfterEach with DefaultTimeout { import TestActorRefSpec._ override def beforeEach(): Unit = otherthread = null private def assertThread(): Unit = otherthread should (be(null) or equal(thread)) "A TestActorRef should be an ActorRef, hence it" must { "support nested Actor creation" when { "used with TestActorRef" in { val a = TestActorRef(Props(new Actor { val nested = TestActorRef(Props(new Actor { def receive = { case _ ⇒ } })) def receive = { case _ ⇒ sender() ! nested } })) a should not be (null) val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration) nested should not be (null) a should not be theSameInstanceAs(nested) } "used with ActorRef" in { val a = TestActorRef(Props(new Actor { val nested = context.actorOf(Props(new Actor { def receive = { case _ ⇒ } })) def receive = { case _ ⇒ sender() ! nested } })) a should not be (null) val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration) nested should not be (null) a should not be theSameInstanceAs(nested) } } "support reply via sender()" in { val serverRef = TestActorRef(Props[ReplyActor]) val clientRef = TestActorRef(Props(classOf[SenderActor], serverRef)) counter = 4 clientRef ! "complex" clientRef ! "simple" clientRef ! "simple" clientRef ! "simple" counter should be(0) counter = 4 clientRef ! "complex2" clientRef ! "simple" clientRef ! "simple" clientRef ! "simple" counter should be(0) assertThread() } "stop when sent a poison pill" in { EventFilter[ActorKilledException]() intercept { val a = TestActorRef(Props[WorkerActor]) val forwarder = system.actorOf(Props(new Actor { context.watch(a) def receive = { case t: Terminated ⇒ testActor forward WrappedTerminated(t) case x ⇒ testActor forward x } })) a.!(PoisonPill)(testActor) expectMsgPF(5 seconds) { case WrappedTerminated(Terminated(`a`)) ⇒ true } a.isTerminated should be(true) assertThread() } } "restart when Kill:ed" in { EventFilter[ActorKilledException]() intercept { counter = 2 val boss = TestActorRef(Props(new TActor { val ref = TestActorRef(Props(new TActor { def receiveT = { case _ ⇒ } override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 } override def postRestart(reason: Throwable) { counter -= 1 } }), self, "child") override def supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 second)(List(classOf[ActorKilledException])) def receiveT = { case "sendKill" ⇒ ref ! Kill } })) boss ! "sendKill" counter should be(0) assertThread() } } "support futures" in { val a = TestActorRef[WorkerActor] val f = a ? "work" // CallingThreadDispatcher means that there is no delay f should be('completed) Await.result(f, timeout.duration) should be("workDone") } "support receive timeout" in { val a = TestActorRef(new ReceiveTimeoutActor(testActor)) expectMsg("timeout") } } "A TestActorRef" must { "allow access to internals" in { val ref = TestActorRef(new TActor { var s: String = _ def receiveT = { case x: String ⇒ s = x } }) ref ! "hallo" val actor = ref.underlyingActor actor.s should be("hallo") } "set receiveTimeout to None" in { val a = TestActorRef[WorkerActor] a.underlyingActor.context.receiveTimeout should be theSameInstanceAs Duration.Undefined } "set CallingThreadDispatcher" in { val a = TestActorRef[WorkerActor] a.underlying.dispatcher.getClass should be(classOf[CallingThreadDispatcher]) } "allow override of dispatcher" in { val a = TestActorRef(Props[WorkerActor].withDispatcher("disp1")) a.underlying.dispatcher.getClass should be(classOf[Dispatcher]) } "proxy receive for the underlying actor without sender()" in { val ref = TestActorRef[WorkerActor] ref.receive("work") ref.isTerminated should be(true) } "proxy receive for the underlying actor with sender()" in { val ref = TestActorRef[WorkerActor] ref.receive("work", testActor) ref.isTerminated should be(true) expectMsg("workDone") } } } Other Akka source code examplesHere is a short list of links related to this Akka TestActorRefSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.