|
Akka/Scala example source code file (RestartStrategySpec.scala)
The RestartStrategySpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import language.postfixOps
import java.lang.Thread.sleep
import org.scalatest.BeforeAndAfterAll
import scala.concurrent.Await
import akka.testkit.TestEvent._
import akka.testkit.EventFilter
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch
import scala.concurrent.duration._
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RestartStrategySpec extends AkkaSpec("akka.actor.serialize-messages = off") with DefaultTimeout {
override def atStartup {
system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
}
object Ping
object Crash
"A RestartStrategy" must {
"ensure that slave stays dead after max restarts within time range" in {
val boss = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))
val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch
val countDownLatch = new TestLatch(3)
val stopLatch = new TestLatch
val slaveProps = Props(new Actor {
def receive = {
case Ping ⇒ countDownLatch.countDown()
case Crash ⇒ throw new Exception("Crashing...")
}
override def postRestart(reason: Throwable) = {
if (!restartLatch.isOpen)
restartLatch.open()
else
secondRestartLatch.open()
}
override def postStop() = {
stopLatch.open()
}
})
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! Ping
slave ! Crash
slave ! Ping
// test restart and post restart ping
Await.ready(restartLatch, 10 seconds)
// now crash again... should not restart
slave ! Crash
slave ! Ping
Await.ready(secondRestartLatch, 10 seconds)
Await.ready(countDownLatch, 10 seconds)
slave ! Crash
Await.ready(stopLatch, 10 seconds)
}
"ensure that slave is immortal without max restarts and time range" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable])))))
val countDownLatch = new TestLatch(100)
val slaveProps = Props(new Actor {
def receive = {
case Crash ⇒ throw new Exception("Crashing...")
}
override def postRestart(reason: Throwable) = {
countDownLatch.countDown()
}
})
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
(1 to 100) foreach { _ ⇒ slave ! Crash }
Await.ready(countDownLatch, 2 minutes)
assert(!slave.isTerminated)
}
"ensure that slave restarts after number of crashes not within time range" in {
val boss = system.actorOf(Props(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable])))))
val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch
val thirdRestartLatch = new TestLatch
val pingLatch = new TestLatch
val secondPingLatch = new TestLatch
val slaveProps = Props(new Actor {
def receive = {
case Ping ⇒
if (!pingLatch.isOpen) pingLatch.open else secondPingLatch.open
case Crash ⇒ throw new Exception("Crashing...")
}
override def postRestart(reason: Throwable) = {
if (!restartLatch.isOpen)
restartLatch.open()
else if (!secondRestartLatch.isOpen)
secondRestartLatch.open()
else
thirdRestartLatch.open()
}
override def postStop() = {
if (restartLatch.isOpen) {
secondRestartLatch.open()
}
}
})
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! Ping
slave ! Crash
Await.ready(restartLatch, 10 seconds)
Await.ready(pingLatch, 10 seconds)
slave ! Ping
slave ! Crash
Await.ready(secondRestartLatch, 10 seconds)
Await.ready(secondPingLatch, 10 seconds)
// sleep to go out of the restart strategy's time range
sleep(700L)
// now crash again... should and post restart ping
slave ! Crash
slave ! Ping
Await.ready(thirdRestartLatch, 1 second)
assert(!slave.isTerminated)
}
"ensure that slave is not restarted after max retries" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable])))))
val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch
val countDownLatch = new TestLatch(3)
val stopLatch = new TestLatch
val slaveProps = Props(new Actor {
def receive = {
case Ping ⇒ countDownLatch.countDown()
case Crash ⇒ throw new Exception("Crashing...")
}
override def postRestart(reason: Throwable) = {
if (!restartLatch.isOpen)
restartLatch.open()
else
secondRestartLatch.open()
}
override def postStop() = {
stopLatch.open()
}
})
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! Ping
slave ! Crash
slave ! Ping
// test restart and post restart ping
Await.ready(restartLatch, 10 seconds)
assert(!slave.isTerminated)
// now crash again... should not restart
slave ! Crash
slave ! Ping
Await.ready(secondRestartLatch, 10 seconds)
Await.ready(countDownLatch, 10 seconds)
sleep(700L)
slave ! Crash
Await.ready(stopLatch, 10 seconds)
sleep(500L)
assert(slave.isTerminated)
}
"ensure that slave is not restarted within time range" in {
val restartLatch, stopLatch, maxNoOfRestartsLatch = new TestLatch
val countDownLatch = new TestLatch(2)
val boss = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy(withinTimeRange = 1 second)(List(classOf[Throwable]))
def receive = {
case p: Props ⇒ sender() ! context.watch(context.actorOf(p))
case t: Terminated ⇒ maxNoOfRestartsLatch.open()
}
}))
val slaveProps = Props(new Actor {
def receive = {
case Ping ⇒ countDownLatch.countDown()
case Crash ⇒ throw new Exception("Crashing...")
}
override def postRestart(reason: Throwable) = {
restartLatch.open()
}
override def postStop() = {
stopLatch.open()
}
})
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! Ping
slave ! Crash
slave ! Ping
// test restart and post restart ping
Await.ready(restartLatch, 10 seconds)
assert(!slave.isTerminated)
// now crash again... should not restart
slave ! Crash
// may not be running
slave ! Ping
Await.ready(countDownLatch, 10 seconds)
// may not be running
slave ! Crash
Await.ready(stopLatch, 10 seconds)
Await.ready(maxNoOfRestartsLatch, 10 seconds)
sleep(500L)
assert(slave.isTerminated)
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka RestartStrategySpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.