|
Akka/Scala example source code file (CircuitBreakerSpec.scala)
The CircuitBreakerSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.pattern
import language.postfixOps
import scala.concurrent.duration._
import akka.testkit._
import org.scalatest.BeforeAndAfter
import akka.actor.{ ActorSystem, Scheduler }
import scala.concurrent.{ ExecutionContext, Future, Await }
object CircuitBreakerSpec {
class TestException extends RuntimeException
class Breaker(val instance: CircuitBreaker)(implicit system: ActorSystem) {
val halfOpenLatch = new TestLatch(1)
val openLatch = new TestLatch(1)
val closedLatch = new TestLatch(1)
def apply(): CircuitBreaker = instance
instance.onClose(closedLatch.countDown()).onHalfOpen(halfOpenLatch.countDown()).onOpen(openLatch.countDown())
}
def shortCallTimeoutCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
new Breaker(new CircuitBreaker(system.scheduler, 1, 50.millis.dilated, 500.millis.dilated))
def shortResetTimeoutCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
new Breaker(new CircuitBreaker(system.scheduler, 1, 1000.millis.dilated, 50.millis.dilated))
def longCallTimeoutCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
new Breaker(new CircuitBreaker(system.scheduler, 1, 5 seconds, 500.millis.dilated))
val longResetTimeout = 5.seconds
def longResetTimeoutCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
new Breaker(new CircuitBreaker(system.scheduler, 1, 100.millis.dilated, longResetTimeout))
def multiFailureCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
new Breaker(new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated))
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter {
import CircuitBreakerSpec.TestException
implicit def ec = system.dispatcher
implicit def s = system
val awaitTimeout = 2.seconds.dilated
def checkLatch(latch: TestLatch): Unit = Await.ready(latch, awaitTimeout)
def throwException = throw new TestException
def sayHi = "hi"
"A synchronous circuit breaker that is open" must {
"throw exceptions when called before reset timeout" in {
val breaker = CircuitBreakerSpec.longResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.openLatch)
val e = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) }
(e.remainingDuration > Duration.Zero) should be(true)
(e.remainingDuration <= CircuitBreakerSpec.longResetTimeout) should be(true)
}
"transition to half-open on reset timeout" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)
}
}
"A synchronous circuit breaker that is half-open" must {
"pass through next call and close on success" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)
assert("hi" == breaker().withSyncCircuitBreaker(sayHi))
checkLatch(breaker.closedLatch)
}
"open on exception in call" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.openLatch)
}
}
"A synchronous circuit breaker that is closed" must {
"allow calls through" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().withSyncCircuitBreaker(sayHi) should be("hi")
}
"increment failure count on failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().currentFailureCount should be(0)
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.openLatch)
breaker().currentFailureCount should be(1)
}
"reset failure count after success" in {
val breaker = CircuitBreakerSpec.multiFailureCb()
breaker().currentFailureCount should be(0)
intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" })
}
breaker().currentFailureCount should be(1)
breaker().withSyncCircuitBreaker(sayHi)
breaker().currentFailureCount should be(0)
}
"increment failure count on callTimeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb()
breaker().withSyncCircuitBreaker(Thread.sleep(100.millis.dilated.toMillis))
awaitCond(breaker().currentFailureCount == 1)
}
}
"An asynchronous circuit breaker that is open" must {
"throw exceptions when called before reset timeout" in {
val breaker = CircuitBreakerSpec.longResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.openLatch)
intercept[CircuitBreakerOpenException] { Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) }
}
"transition to half-open on reset timeout" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch)
}
}
"An asynchronous circuit breaker that is half-open" must {
"pass through next call and close on success" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch)
Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) should be("hi")
checkLatch(breaker.closedLatch)
}
"re-open on exception in call" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch)
intercept[TestException] { Await.result(breaker().withCircuitBreaker(Future(throwException)), awaitTimeout) }
checkLatch(breaker.openLatch)
}
"re-open on async failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch)
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.openLatch)
}
}
"An asynchronous circuit breaker that is closed" must {
"allow calls through" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
Await.result(breaker().withCircuitBreaker(Future(sayHi)), awaitTimeout) should be("hi")
}
"increment failure count on exception" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
intercept[TestException] { Await.result(breaker().withCircuitBreaker(Future(throwException)), awaitTimeout) }
checkLatch(breaker.openLatch)
breaker().currentFailureCount should be(1)
}
"increment failure count on async failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.openLatch)
breaker().currentFailureCount should be(1)
}
"reset failure count after success" in {
val breaker = CircuitBreakerSpec.multiFailureCb()
breaker().withCircuitBreaker(Future(sayHi))
for (n ← 1 to 4) breaker().withCircuitBreaker(Future(throwException))
awaitCond(breaker().currentFailureCount == 4, awaitTimeout)
breaker().withCircuitBreaker(Future(sayHi))
awaitCond(breaker().currentFailureCount == 0, awaitTimeout)
}
"increment failure count on callTimeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb()
breaker().withCircuitBreaker(Future { Thread.sleep(100.millis.dilated.toMillis); sayHi })
checkLatch(breaker.openLatch)
breaker().currentFailureCount should be(1)
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka CircuitBreakerSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.