alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (TestActorRefSpec.scala)

This example Akka source code file (TestActorRefSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

a, actor, actorref, akka, concurrent, dispatch, eventfilter, string, tactor, terminated, test, testactorref, testing, throwable, time, wrappedterminated

The TestActorRefSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.testkit

import language.{ postfixOps, reflectiveCalls }
import org.scalatest.Matchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.event.Logging.Warning
import scala.concurrent.{ Future, Promise, Await }
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.dispatch.Dispatcher

/**
 * Test whether TestActorRef behaves as an ActorRef should, besides its own spec.
 */
object TestActorRefSpec {

  var counter = 4
  val thread = Thread.currentThread
  var otherthread: Thread = null

  trait TActor extends Actor {
    def receive = new Receive {
      val recv = receiveT
      def isDefinedAt(o: Any) = recv.isDefinedAt(o)
      def apply(o: Any) {
        if (Thread.currentThread ne thread)
          otherthread = Thread.currentThread
        recv(o)
      }
    }
    def receiveT: Receive
  }

  class ReplyActor extends TActor {
    import context.system
    var replyTo: ActorRef = null

    def receiveT = {
      case "complexRequest" ⇒ {
        replyTo = sender()
        val worker = TestActorRef(Props[WorkerActor])
        worker ! "work"
      }
      case "complexRequest2" ⇒
        val worker = TestActorRef(Props[WorkerActor])
        worker ! sender()
      case "workDone"      ⇒ replyTo ! "complexReply"
      case "simpleRequest" ⇒ sender() ! "simpleReply"
    }
  }

  class WorkerActor() extends TActor {
    def receiveT = {
      case "work" ⇒
        sender() ! "workDone"
        context stop self
      case replyTo: Promise[_] ⇒ replyTo.asInstanceOf[Promise[Any]].success("complexReply")
      case replyTo: ActorRef   ⇒ replyTo ! "complexReply"
    }
  }

  class SenderActor(replyActor: ActorRef) extends TActor {

    def receiveT = {
      case "complex"  ⇒ replyActor ! "complexRequest"
      case "complex2" ⇒ replyActor ! "complexRequest2"
      case "simple"   ⇒ replyActor ! "simpleRequest"
      case "complexReply" ⇒ {
        counter -= 1
      }
      case "simpleReply" ⇒ {
        counter -= 1
      }
    }
  }

  class Logger extends Actor {
    var count = 0
    var msg: String = _
    def receive = {
      case Warning(_, _, m: String) ⇒ count += 1; msg = m
    }
  }

  class ReceiveTimeoutActor(target: ActorRef) extends Actor {
    context setReceiveTimeout 1.second
    def receive = {
      case ReceiveTimeout ⇒
        target ! "timeout"
        context stop self
    }
  }

  /**
   * Forwarding `Terminated` to non-watching testActor is not possible,
   * and therefore the `Terminated` message is wrapped.
   */
  final case class WrappedTerminated(t: Terminated)

}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndAfterEach with DefaultTimeout {

  import TestActorRefSpec._

  override def beforeEach(): Unit = otherthread = null

  private def assertThread(): Unit = otherthread should (be(null) or equal(thread))

  "A TestActorRef should be an ActorRef, hence it" must {

    "support nested Actor creation" when {

      "used with TestActorRef" in {
        val a = TestActorRef(Props(new Actor {
          val nested = TestActorRef(Props(new Actor { def receive = { case _ ⇒ } }))
          def receive = { case _ ⇒ sender() ! nested }
        }))
        a should not be (null)
        val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
        nested should not be (null)
        a should not be theSameInstanceAs(nested)
      }

      "used with ActorRef" in {
        val a = TestActorRef(Props(new Actor {
          val nested = context.actorOf(Props(new Actor { def receive = { case _ ⇒ } }))
          def receive = { case _ ⇒ sender() ! nested }
        }))
        a should not be (null)
        val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
        nested should not be (null)
        a should not be theSameInstanceAs(nested)
      }

    }

    "support reply via sender()" in {
      val serverRef = TestActorRef(Props[ReplyActor])
      val clientRef = TestActorRef(Props(classOf[SenderActor], serverRef))

      counter = 4

      clientRef ! "complex"
      clientRef ! "simple"
      clientRef ! "simple"
      clientRef ! "simple"

      counter should be(0)

      counter = 4

      clientRef ! "complex2"
      clientRef ! "simple"
      clientRef ! "simple"
      clientRef ! "simple"

      counter should be(0)

      assertThread()
    }

    "stop when sent a poison pill" in {
      EventFilter[ActorKilledException]() intercept {
        val a = TestActorRef(Props[WorkerActor])
        val forwarder = system.actorOf(Props(new Actor {
          context.watch(a)
          def receive = {
            case t: Terminated ⇒ testActor forward WrappedTerminated(t)
            case x             ⇒ testActor forward x
          }
        }))
        a.!(PoisonPill)(testActor)
        expectMsgPF(5 seconds) {
          case WrappedTerminated(Terminated(`a`)) ⇒ true
        }
        a.isTerminated should be(true)
        assertThread()
      }
    }

    "restart when Kill:ed" in {
      EventFilter[ActorKilledException]() intercept {
        counter = 2

        val boss = TestActorRef(Props(new TActor {
          val ref = TestActorRef(Props(new TActor {
            def receiveT = { case _ ⇒ }
            override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 }
            override def postRestart(reason: Throwable) { counter -= 1 }
          }), self, "child")

          override def supervisorStrategy =
            OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 second)(List(classOf[ActorKilledException]))

          def receiveT = { case "sendKill" ⇒ ref ! Kill }
        }))

        boss ! "sendKill"

        counter should be(0)
        assertThread()
      }
    }

    "support futures" in {
      val a = TestActorRef[WorkerActor]
      val f = a ? "work"
      // CallingThreadDispatcher means that there is no delay
      f should be('completed)
      Await.result(f, timeout.duration) should be("workDone")
    }

    "support receive timeout" in {
      val a = TestActorRef(new ReceiveTimeoutActor(testActor))
      expectMsg("timeout")
    }

  }

  "A TestActorRef" must {

    "allow access to internals" in {
      val ref = TestActorRef(new TActor {
        var s: String = _
        def receiveT = {
          case x: String ⇒ s = x
        }
      })
      ref ! "hallo"
      val actor = ref.underlyingActor
      actor.s should be("hallo")
    }

    "set receiveTimeout to None" in {
      val a = TestActorRef[WorkerActor]
      a.underlyingActor.context.receiveTimeout should be theSameInstanceAs Duration.Undefined
    }

    "set CallingThreadDispatcher" in {
      val a = TestActorRef[WorkerActor]
      a.underlying.dispatcher.getClass should be(classOf[CallingThreadDispatcher])
    }

    "allow override of dispatcher" in {
      val a = TestActorRef(Props[WorkerActor].withDispatcher("disp1"))
      a.underlying.dispatcher.getClass should be(classOf[Dispatcher])
    }

    "proxy receive for the underlying actor without sender()" in {
      val ref = TestActorRef[WorkerActor]
      ref.receive("work")
      ref.isTerminated should be(true)
    }

    "proxy receive for the underlying actor with sender()" in {
      val ref = TestActorRef[WorkerActor]
      ref.receive("work", testActor)
      ref.isTerminated should be(true)
      expectMsg("workDone")
    }

  }
}

Other Akka source code examples

Here is a short list of links related to this Akka TestActorRefSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.