|
Akka/Scala example source code file (TestActorRefSpec.scala)
The TestActorRefSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit
import language.{ postfixOps, reflectiveCalls }
import org.scalatest.Matchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.event.Logging.Warning
import scala.concurrent.{ Future, Promise, Await }
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.dispatch.Dispatcher
/**
* Test whether TestActorRef behaves as an ActorRef should, besides its own spec.
*/
object TestActorRefSpec {
var counter = 4
val thread = Thread.currentThread
var otherthread: Thread = null
trait TActor extends Actor {
def receive = new Receive {
val recv = receiveT
def isDefinedAt(o: Any) = recv.isDefinedAt(o)
def apply(o: Any) {
if (Thread.currentThread ne thread)
otherthread = Thread.currentThread
recv(o)
}
}
def receiveT: Receive
}
class ReplyActor extends TActor {
import context.system
var replyTo: ActorRef = null
def receiveT = {
case "complexRequest" ⇒ {
replyTo = sender()
val worker = TestActorRef(Props[WorkerActor])
worker ! "work"
}
case "complexRequest2" ⇒
val worker = TestActorRef(Props[WorkerActor])
worker ! sender()
case "workDone" ⇒ replyTo ! "complexReply"
case "simpleRequest" ⇒ sender() ! "simpleReply"
}
}
class WorkerActor() extends TActor {
def receiveT = {
case "work" ⇒
sender() ! "workDone"
context stop self
case replyTo: Promise[_] ⇒ replyTo.asInstanceOf[Promise[Any]].success("complexReply")
case replyTo: ActorRef ⇒ replyTo ! "complexReply"
}
}
class SenderActor(replyActor: ActorRef) extends TActor {
def receiveT = {
case "complex" ⇒ replyActor ! "complexRequest"
case "complex2" ⇒ replyActor ! "complexRequest2"
case "simple" ⇒ replyActor ! "simpleRequest"
case "complexReply" ⇒ {
counter -= 1
}
case "simpleReply" ⇒ {
counter -= 1
}
}
}
class Logger extends Actor {
var count = 0
var msg: String = _
def receive = {
case Warning(_, _, m: String) ⇒ count += 1; msg = m
}
}
class ReceiveTimeoutActor(target: ActorRef) extends Actor {
context setReceiveTimeout 1.second
def receive = {
case ReceiveTimeout ⇒
target ! "timeout"
context stop self
}
}
/**
* Forwarding `Terminated` to non-watching testActor is not possible,
* and therefore the `Terminated` message is wrapped.
*/
final case class WrappedTerminated(t: Terminated)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndAfterEach with DefaultTimeout {
import TestActorRefSpec._
override def beforeEach(): Unit = otherthread = null
private def assertThread(): Unit = otherthread should (be(null) or equal(thread))
"A TestActorRef should be an ActorRef, hence it" must {
"support nested Actor creation" when {
"used with TestActorRef" in {
val a = TestActorRef(Props(new Actor {
val nested = TestActorRef(Props(new Actor { def receive = { case _ ⇒ } }))
def receive = { case _ ⇒ sender() ! nested }
}))
a should not be (null)
val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
nested should not be (null)
a should not be theSameInstanceAs(nested)
}
"used with ActorRef" in {
val a = TestActorRef(Props(new Actor {
val nested = context.actorOf(Props(new Actor { def receive = { case _ ⇒ } }))
def receive = { case _ ⇒ sender() ! nested }
}))
a should not be (null)
val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
nested should not be (null)
a should not be theSameInstanceAs(nested)
}
}
"support reply via sender()" in {
val serverRef = TestActorRef(Props[ReplyActor])
val clientRef = TestActorRef(Props(classOf[SenderActor], serverRef))
counter = 4
clientRef ! "complex"
clientRef ! "simple"
clientRef ! "simple"
clientRef ! "simple"
counter should be(0)
counter = 4
clientRef ! "complex2"
clientRef ! "simple"
clientRef ! "simple"
clientRef ! "simple"
counter should be(0)
assertThread()
}
"stop when sent a poison pill" in {
EventFilter[ActorKilledException]() intercept {
val a = TestActorRef(Props[WorkerActor])
val forwarder = system.actorOf(Props(new Actor {
context.watch(a)
def receive = {
case t: Terminated ⇒ testActor forward WrappedTerminated(t)
case x ⇒ testActor forward x
}
}))
a.!(PoisonPill)(testActor)
expectMsgPF(5 seconds) {
case WrappedTerminated(Terminated(`a`)) ⇒ true
}
a.isTerminated should be(true)
assertThread()
}
}
"restart when Kill:ed" in {
EventFilter[ActorKilledException]() intercept {
counter = 2
val boss = TestActorRef(Props(new TActor {
val ref = TestActorRef(Props(new TActor {
def receiveT = { case _ ⇒ }
override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 }
override def postRestart(reason: Throwable) { counter -= 1 }
}), self, "child")
override def supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 second)(List(classOf[ActorKilledException]))
def receiveT = { case "sendKill" ⇒ ref ! Kill }
}))
boss ! "sendKill"
counter should be(0)
assertThread()
}
}
"support futures" in {
val a = TestActorRef[WorkerActor]
val f = a ? "work"
// CallingThreadDispatcher means that there is no delay
f should be('completed)
Await.result(f, timeout.duration) should be("workDone")
}
"support receive timeout" in {
val a = TestActorRef(new ReceiveTimeoutActor(testActor))
expectMsg("timeout")
}
}
"A TestActorRef" must {
"allow access to internals" in {
val ref = TestActorRef(new TActor {
var s: String = _
def receiveT = {
case x: String ⇒ s = x
}
})
ref ! "hallo"
val actor = ref.underlyingActor
actor.s should be("hallo")
}
"set receiveTimeout to None" in {
val a = TestActorRef[WorkerActor]
a.underlyingActor.context.receiveTimeout should be theSameInstanceAs Duration.Undefined
}
"set CallingThreadDispatcher" in {
val a = TestActorRef[WorkerActor]
a.underlying.dispatcher.getClass should be(classOf[CallingThreadDispatcher])
}
"allow override of dispatcher" in {
val a = TestActorRef(Props[WorkerActor].withDispatcher("disp1"))
a.underlying.dispatcher.getClass should be(classOf[Dispatcher])
}
"proxy receive for the underlying actor without sender()" in {
val ref = TestActorRef[WorkerActor]
ref.receive("work")
ref.isTerminated should be(true)
}
"proxy receive for the underlying actor with sender()" in {
val ref = TestActorRef[WorkerActor]
ref.receive("work", testActor)
ref.isTerminated should be(true)
expectMsg("workDone")
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka TestActorRefSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.