alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ActorRefSpec.scala)

This example Akka source code file (ActorRefSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, concurrent, duration, eventfilter, failinginheritinginneractor, failinginheritingouteractor, failingouteractor, inneractor, objectinputstream, outeractor, test, testing, testlatch, time

The ActorRefSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.actor

import language.postfixOps

import akka.testkit._
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.Await
import java.lang.IllegalStateException
import scala.concurrent.Promise
import akka.pattern.ask
import akka.serialization.JavaSerializer
import akka.TestUtils.verifyActorTermination

object ActorRefSpec {

  final case class ReplyTo(sender: ActorRef)

  class ReplyActor extends Actor {
    var replyTo: ActorRef = null

    def receive = {
      case "complexRequest" ⇒ {
        replyTo = sender()
        val worker = context.actorOf(Props[WorkerActor])
        worker ! "work"
      }
      case "complexRequest2" ⇒
        val worker = context.actorOf(Props[WorkerActor])
        worker ! ReplyTo(sender())
      case "workDone"      ⇒ replyTo ! "complexReply"
      case "simpleRequest" ⇒ sender() ! "simpleReply"
    }
  }

  class WorkerActor() extends Actor {
    import context.system
    def receive = {
      case "work" ⇒ {
        work()
        sender() ! "workDone"
        context.stop(self)
      }
      case ReplyTo(replyTo) ⇒ {
        work()
        replyTo ! "complexReply"
      }
    }

    private def work(): Unit = Thread.sleep(1.second.dilated.toMillis)
  }

  class SenderActor(replyActor: ActorRef, latch: TestLatch) extends Actor {

    def receive = {
      case "complex"  ⇒ replyActor ! "complexRequest"
      case "complex2" ⇒ replyActor ! "complexRequest2"
      case "simple"   ⇒ replyActor ! "simpleRequest"
      case "complexReply" ⇒ {
        latch.countDown()
      }
      case "simpleReply" ⇒ {
        latch.countDown()
      }
    }
  }

  class OuterActor(val inner: ActorRef) extends Actor {
    def receive = {
      case "self" ⇒ sender() ! self
      case x      ⇒ inner forward x
    }
  }

  class FailingOuterActor(val inner: ActorRef) extends Actor {
    val fail = new InnerActor

    def receive = {
      case "self" ⇒ sender() ! self
      case x      ⇒ inner forward x
    }
  }

  class FailingInheritingOuterActor(_inner: ActorRef) extends OuterActor(_inner) {
    val fail = new InnerActor
  }

  class InnerActor extends Actor {
    def receive = {
      case "innerself" ⇒ sender() ! self
      case other       ⇒ sender() ! other
    }
  }

  class FailingInnerActor extends Actor {
    val fail = new InnerActor

    def receive = {
      case "innerself" ⇒ sender() ! self
      case other       ⇒ sender() ! other
    }
  }

  class FailingInheritingInnerActor extends InnerActor {
    val fail = new InnerActor
  }
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorRefSpec extends AkkaSpec with DefaultTimeout {
  import akka.actor.ActorRefSpec._

  def promiseIntercept(f: ⇒ Actor)(to: Promise[Actor]): Actor = try {
    val r = f
    to.success(r)
    r
  } catch {
    case e: Throwable ⇒
      to.failure(e)
      throw e
  }

  def wrap[T](f: Promise[Actor] ⇒ T): T = {
    val result = Promise[Actor]()
    val r = f(result)
    Await.result(result.future, 1 minute)
    r
  }

  "An ActorRef" must {

    "not allow Actors to be created outside of an actorOf" in {
      import system.actorOf
      intercept[akka.actor.ActorInitializationException] {
        new Actor { def receive = { case _ ⇒ } }
      }

      def contextStackMustBeEmpty(): Unit = ActorCell.contextStack.get.headOption should be(None)

      EventFilter[ActorInitializationException](occurrences = 1) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new Actor {
              val nested = promiseIntercept(new Actor { def receive = { case _ ⇒ } })(result)
              def receive = { case _ ⇒ }
            })))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 1) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(promiseIntercept(new FailingOuterActor(actorOf(Props(new InnerActor))))(result))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 1) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result)))))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 1) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(promiseIntercept(new FailingInheritingOuterActor(actorOf(Props(new InnerActor))))(result))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 2) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 2) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 2) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result)))))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 1) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new OuterActor(actorOf(Props(new InnerActor {
              val a = promiseIntercept(new InnerActor)(result)
            }))))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 2) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 1) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 1) intercept {
        intercept[akka.actor.ActorInitializationException] {
          wrap(result ⇒
            actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ new InnerActor; new InnerActor })(result)))))))
        }

        contextStackMustBeEmpty()
      }

      EventFilter[ActorInitializationException](occurrences = 1) intercept {
        (intercept[java.lang.IllegalStateException] {
          wrap(result ⇒
            actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result)))))))
        }).getMessage should be("Ur state be b0rked")

        contextStackMustBeEmpty()
      }
    }

    "be serializable using Java Serialization on local node" in {
      val a = system.actorOf(Props[InnerActor])
      val esys = system.asInstanceOf[ExtendedActorSystem]

      import java.io._

      val baos = new ByteArrayOutputStream(8192 * 32)
      val out = new ObjectOutputStream(baos)

      out.writeObject(a)

      out.flush
      out.close

      val bytes = baos.toByteArray

      JavaSerializer.currentSystem.withValue(esys) {
        val in = new ObjectInputStream(new ByteArrayInputStream(bytes))
        val readA = in.readObject

        a.isInstanceOf[ActorRefWithCell] should be(true)
        readA.isInstanceOf[ActorRefWithCell] should be(true)
        (readA eq a) should be(true)
      }

      val ser = new JavaSerializer(esys)
      val readA = ser.fromBinary(bytes, None)
      readA.isInstanceOf[ActorRefWithCell] should be(true)
      (readA eq a) should be(true)
    }

    "throw an exception on deserialize if no system in scope" in {
      val a = system.actorOf(Props[InnerActor])

      import java.io._

      val baos = new ByteArrayOutputStream(8192 * 32)
      val out = new ObjectOutputStream(baos)

      out.writeObject(a)

      out.flush
      out.close

      val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))

      (intercept[java.lang.IllegalStateException] {
        in.readObject
      }).getMessage should be("Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
        " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'")
    }

    "return EmptyLocalActorRef on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
      import java.io._

      val baos = new ByteArrayOutputStream(8192 * 32)
      val out = new ObjectOutputStream(baos)

      val sysImpl = system.asInstanceOf[ActorSystemImpl]
      val ref = system.actorOf(Props[ReplyActor], "non-existing")
      val serialized = SerializedActorRef(ref)

      out.writeObject(serialized)

      out.flush
      out.close

      ref ! PoisonPill

      verifyActorTermination(ref)

      JavaSerializer.currentSystem.withValue(sysImpl) {
        val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
        in.readObject should be(new EmptyLocalActorRef(sysImpl.provider, ref.path, system.eventStream))
      }
    }

    "support nested actorOfs" in {
      val a = system.actorOf(Props(new Actor {
        val nested = system.actorOf(Props(new Actor { def receive = { case _ ⇒ } }))
        def receive = { case _ ⇒ sender() ! nested }
      }))

      val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
      a should not be null
      nested should not be null
      (a ne nested) should be(true)
    }

    "support advanced nested actorOfs" in {
      val a = system.actorOf(Props(new OuterActor(system.actorOf(Props(new InnerActor)))))
      val inner = Await.result(a ? "innerself", timeout.duration)

      Await.result(a ? a, timeout.duration) should be(a)
      Await.result(a ? "self", timeout.duration) should be(a)
      inner should not be a

      Await.result(a ? "msg", timeout.duration) should be("msg")
    }

    "support reply via sender" in {
      val latch = new TestLatch(4)
      val serverRef = system.actorOf(Props[ReplyActor])
      val clientRef = system.actorOf(Props(new SenderActor(serverRef, latch)))

      clientRef ! "complex"
      clientRef ! "simple"
      clientRef ! "simple"
      clientRef ! "simple"

      Await.ready(latch, timeout.duration)

      latch.reset

      clientRef ! "complex2"
      clientRef ! "simple"
      clientRef ! "simple"
      clientRef ! "simple"

      Await.ready(latch, timeout.duration)

      system.stop(clientRef)
      system.stop(serverRef)
    }

    "support actorOfs where the class of the actor isn't public" in {
      val a = system.actorOf(NonPublicClass.createProps())
      a.tell("pigdog", testActor)
      expectMsg("pigdog")
      system stop a
    }

    "stop when sent a poison pill" in {
      val timeout = Timeout(20.seconds)
      val ref = system.actorOf(Props(new Actor {
        def receive = {
          case 5 ⇒ sender() ! "five"
          case 0 ⇒ sender() ! "null"
        }
      }))

      val ffive = (ref.ask(5)(timeout)).mapTo[String]
      val fnull = (ref.ask(0)(timeout)).mapTo[String]
      ref ! PoisonPill

      Await.result(ffive, timeout.duration) should be("five")
      Await.result(fnull, timeout.duration) should be("null")

      verifyActorTermination(ref)
    }

    "restart when Kill:ed" in {
      filterException[ActorKilledException] {
        val latch = TestLatch(2)

        val boss = system.actorOf(Props(new Actor {

          override val supervisorStrategy =
            OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable]))

          val ref = context.actorOf(
            Props(new Actor {
              def receive = { case _ ⇒ }
              override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown()
              override def postRestart(reason: Throwable) = latch.countDown()
            }))

          def receive = { case "sendKill" ⇒ ref ! Kill }
        }))

        boss ! "sendKill"
        Await.ready(latch, 5 seconds)
      }
    }

    "be able to check for existence of children" in {
      val parent = system.actorOf(Props(new Actor {

        val child = context.actorOf(
          Props(new Actor {
            def receive = { case _ ⇒ }
          }), "child")

        def receive = { case name: String ⇒ sender() ! context.child(name).isDefined }
      }), "parent")

      assert(Await.result((parent ? "child"), timeout.duration) === true)
      assert(Await.result((parent ? "whatnot"), timeout.duration) === false)
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ActorRefSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.