alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (RestartStrategySpec.scala)

This example Akka source code file (RestartStrategySpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, concurrent, crash, crashing, exception, oneforonestrategy, ping, props, supervisor, test, testing, testkit, testlatch, throwable, time

The RestartStrategySpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.actor

import language.postfixOps

import java.lang.Thread.sleep
import org.scalatest.BeforeAndAfterAll
import scala.concurrent.Await
import akka.testkit.TestEvent._
import akka.testkit.EventFilter
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch
import scala.concurrent.duration._
import akka.pattern.ask

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RestartStrategySpec extends AkkaSpec("akka.actor.serialize-messages = off") with DefaultTimeout {

  override def atStartup {
    system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
  }

  object Ping
  object Crash

  "A RestartStrategy" must {

    "ensure that slave stays dead after max restarts within time range" in {
      val boss = system.actorOf(Props(new Supervisor(
        OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))

      val restartLatch = new TestLatch
      val secondRestartLatch = new TestLatch
      val countDownLatch = new TestLatch(3)
      val stopLatch = new TestLatch

      val slaveProps = Props(new Actor {

        def receive = {
          case Ping  ⇒ countDownLatch.countDown()
          case Crash ⇒ throw new Exception("Crashing...")
        }

        override def postRestart(reason: Throwable) = {
          if (!restartLatch.isOpen)
            restartLatch.open()
          else
            secondRestartLatch.open()
        }

        override def postStop() = {
          stopLatch.open()
        }
      })
      val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)

      slave ! Ping
      slave ! Crash
      slave ! Ping

      // test restart and post restart ping
      Await.ready(restartLatch, 10 seconds)

      // now crash again... should not restart
      slave ! Crash
      slave ! Ping

      Await.ready(secondRestartLatch, 10 seconds)
      Await.ready(countDownLatch, 10 seconds)

      slave ! Crash
      Await.ready(stopLatch, 10 seconds)
    }

    "ensure that slave is immortal without max restarts and time range" in {
      val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable])))))

      val countDownLatch = new TestLatch(100)

      val slaveProps = Props(new Actor {

        def receive = {
          case Crash ⇒ throw new Exception("Crashing...")
        }

        override def postRestart(reason: Throwable) = {
          countDownLatch.countDown()
        }
      })
      val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)

      (1 to 100) foreach { _ ⇒ slave ! Crash }
      Await.ready(countDownLatch, 2 minutes)
      assert(!slave.isTerminated)
    }

    "ensure that slave restarts after number of crashes not within time range" in {
      val boss = system.actorOf(Props(new Supervisor(
        OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable])))))

      val restartLatch = new TestLatch
      val secondRestartLatch = new TestLatch
      val thirdRestartLatch = new TestLatch
      val pingLatch = new TestLatch
      val secondPingLatch = new TestLatch

      val slaveProps = Props(new Actor {

        def receive = {
          case Ping ⇒
            if (!pingLatch.isOpen) pingLatch.open else secondPingLatch.open
          case Crash ⇒ throw new Exception("Crashing...")
        }
        override def postRestart(reason: Throwable) = {
          if (!restartLatch.isOpen)
            restartLatch.open()
          else if (!secondRestartLatch.isOpen)
            secondRestartLatch.open()
          else
            thirdRestartLatch.open()
        }

        override def postStop() = {
          if (restartLatch.isOpen) {
            secondRestartLatch.open()
          }
        }
      })
      val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)

      slave ! Ping
      slave ! Crash

      Await.ready(restartLatch, 10 seconds)
      Await.ready(pingLatch, 10 seconds)

      slave ! Ping
      slave ! Crash

      Await.ready(secondRestartLatch, 10 seconds)
      Await.ready(secondPingLatch, 10 seconds)

      // sleep to go out of the restart strategy's time range
      sleep(700L)

      // now crash again... should and post restart ping
      slave ! Crash
      slave ! Ping

      Await.ready(thirdRestartLatch, 1 second)

      assert(!slave.isTerminated)
    }

    "ensure that slave is not restarted after max retries" in {
      val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable])))))

      val restartLatch = new TestLatch
      val secondRestartLatch = new TestLatch
      val countDownLatch = new TestLatch(3)
      val stopLatch = new TestLatch

      val slaveProps = Props(new Actor {

        def receive = {
          case Ping  ⇒ countDownLatch.countDown()
          case Crash ⇒ throw new Exception("Crashing...")
        }
        override def postRestart(reason: Throwable) = {
          if (!restartLatch.isOpen)
            restartLatch.open()
          else
            secondRestartLatch.open()
        }

        override def postStop() = {
          stopLatch.open()
        }
      })
      val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)

      slave ! Ping
      slave ! Crash
      slave ! Ping

      // test restart and post restart ping
      Await.ready(restartLatch, 10 seconds)

      assert(!slave.isTerminated)

      // now crash again... should not restart
      slave ! Crash
      slave ! Ping

      Await.ready(secondRestartLatch, 10 seconds)
      Await.ready(countDownLatch, 10 seconds)

      sleep(700L)

      slave ! Crash
      Await.ready(stopLatch, 10 seconds)
      sleep(500L)
      assert(slave.isTerminated)
    }

    "ensure that slave is not restarted within time range" in {
      val restartLatch, stopLatch, maxNoOfRestartsLatch = new TestLatch
      val countDownLatch = new TestLatch(2)

      val boss = system.actorOf(Props(new Actor {
        override val supervisorStrategy = OneForOneStrategy(withinTimeRange = 1 second)(List(classOf[Throwable]))
        def receive = {
          case p: Props      ⇒ sender() ! context.watch(context.actorOf(p))
          case t: Terminated ⇒ maxNoOfRestartsLatch.open()
        }
      }))

      val slaveProps = Props(new Actor {

        def receive = {
          case Ping  ⇒ countDownLatch.countDown()
          case Crash ⇒ throw new Exception("Crashing...")
        }

        override def postRestart(reason: Throwable) = {
          restartLatch.open()
        }

        override def postStop() = {
          stopLatch.open()
        }
      })
      val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)

      slave ! Ping
      slave ! Crash
      slave ! Ping

      // test restart and post restart ping
      Await.ready(restartLatch, 10 seconds)

      assert(!slave.isTerminated)

      // now crash again... should not restart
      slave ! Crash

      // may not be running
      slave ! Ping
      Await.ready(countDownLatch, 10 seconds)

      // may not be running
      slave ! Crash

      Await.ready(stopLatch, 10 seconds)

      Await.ready(maxNoOfRestartsLatch, 10 seconds)
      sleep(500L)
      assert(slave.isTerminated)
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka RestartStrategySpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.