alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (SupervisorMiscSpec.scala)

This example Akka source code file (SupervisorMiscSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, concurrent, eventfilter, exception, kill, ok, oneforonestrategy, supervisor, supervisormiscspec, terminated, test, testing, testkit, time, unit

The SupervisorMiscSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.actor

import language.postfixOps

import akka.testkit.{ filterEvents, EventFilter }
import scala.concurrent.Await
import akka.dispatch.{ PinnedDispatcher, Dispatchers }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.pattern.ask
import scala.concurrent.duration._
import scala.util.control.NonFatal

object SupervisorMiscSpec {
  val config = """
    akka.actor.serialize-messages = off
    pinned-dispatcher {
      executor = thread-pool-executor
      type = PinnedDispatcher
    }
    test-dispatcher {
    }
    """
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with DefaultTimeout {

  "A Supervisor" must {

    "restart a crashing actor and its dispatcher for any dispatcher" in {
      filterEvents(EventFilter[Exception]("Kill")) {
        val countDownLatch = new CountDownLatch(4)

        val supervisor = system.actorOf(Props(new Supervisor(
          OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds)(List(classOf[Exception])))))

        val workerProps = Props(new Actor {
          override def postRestart(cause: Throwable) { countDownLatch.countDown() }
          def receive = {
            case "status" ⇒ this.sender() ! "OK"
            case _        ⇒ this.context.stop(self)
          }
        })

        val actor1, actor2 = Await.result((supervisor ? workerProps.withDispatcher("pinned-dispatcher")).mapTo[ActorRef], timeout.duration)

        val actor3 = Await.result((supervisor ? workerProps.withDispatcher("test-dispatcher")).mapTo[ActorRef], timeout.duration)

        val actor4 = Await.result((supervisor ? workerProps.withDispatcher("pinned-dispatcher")).mapTo[ActorRef], timeout.duration)

        actor1 ! Kill
        actor2 ! Kill
        actor3 ! Kill
        actor4 ! Kill

        countDownLatch.await(10, TimeUnit.SECONDS)

        Seq("actor1" -> actor1, "actor2" -> actor2, "actor3" -> actor3, "actor4" -> actor4) map {
          case (id, ref) ⇒ (id, ref ? "status")
        } foreach {
          case (id, f) ⇒ (id, Await.result(f, timeout.duration)) should be((id, "OK"))
        }
      }
    }

    "be able to create named children in its constructor" in {
      val a = system.actorOf(Props(new Actor {
        context.actorOf(Props.empty, "bob")
        def receive = { case x: Exception ⇒ throw x }
        override def preStart(): Unit = testActor ! "preStart"
      }))
      val m = "weird message"
      EventFilter[Exception](m, occurrences = 1) intercept {
        a ! new Exception(m)
      }
      expectMsg("preStart")
      expectMsg("preStart")
      a.isTerminated should be(false)
    }

    "be able to recreate child when old child is Terminated" in {
      val parent = system.actorOf(Props(new Actor {
        val kid = context.watch(context.actorOf(Props.empty, "foo"))
        def receive = {
          case Terminated(`kid`) ⇒
            try {
              val newKid = context.actorOf(Props.empty, "foo")
              val result =
                if (newKid eq kid) "Failure: context.actorOf returned the same instance!"
                else if (!kid.isTerminated) "Kid is zombie"
                else if (newKid.isTerminated) "newKid was stillborn"
                else if (kid.path != newKid.path) "The kids do not share the same path"
                else "green"
              testActor ! result
            } catch {
              case NonFatal(e) ⇒ testActor ! e
            }
          case "engage" ⇒ context.stop(kid)
        }
      }))
      parent ! "engage"
      expectMsg("green")
    }

    "not be able to recreate child when old child is alive" in {
      val parent = system.actorOf(Props(new Actor {
        def receive = {
          case "engage" ⇒
            try {
              val kid = context.actorOf(Props.empty, "foo")
              context.stop(kid)
              context.actorOf(Props.empty, "foo")
              testActor ! "red"
            } catch {
              case e: InvalidActorNameException ⇒ testActor ! "green"
            }
        }
      }))
      parent ! "engage"
      expectMsg("green")
    }

    "be able to create a similar kid in the fault handling strategy" in {
      val parent = system.actorOf(Props(new Actor {
        override val supervisorStrategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) {
          override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
            val newKid = context.actorOf(Props.empty, child.path.name)
            testActor ! { if ((newKid ne child) && newKid.path == child.path) "green" else "red" }
          }
        }

        def receive = { case "engage" ⇒ context.stop(context.actorOf(Props.empty, "Robert")) }
      }))
      parent ! "engage"
      expectMsg("green")
      EventFilter[IllegalStateException]("handleChildTerminated failed", occurrences = 1) intercept {
        system.stop(parent)
      }
    }

    "have access to the failing child’s reference in supervisorStrategy" in {
      val parent = system.actorOf(Props(new Actor {
        override val supervisorStrategy = OneForOneStrategy() {
          case _: Exception ⇒ testActor ! sender(); SupervisorStrategy.Stop
        }
        def receive = {
          case "doit" ⇒ context.actorOf(Props.empty, "child") ! Kill
        }
      }))
      EventFilter[ActorKilledException](occurrences = 1) intercept {
        parent ! "doit"
      }
      val p = expectMsgType[ActorRef].path
      p.parent should be(parent.path)
      p.name should be("child")
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka SupervisorMiscSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.