|
Akka/Scala example source code file (FailureDetectorRegistrySpec.scala)
The FailureDetectorRegistrySpec.scala Akka example source code
package akka.remote
import akka.remote.FailureDetector.Clock
import scala.concurrent.duration._
import akka.testkit.AkkaSpec
class FailureDetectorRegistrySpec extends AkkaSpec("akka.loglevel = INFO") {
def fakeTimeGenerator(timeIntervals: Seq[Long]): Clock = new Clock {
@volatile var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) ⇒ acc ::: List[Long](acc.last + c))
override def apply(): Long = {
val currentTime = times.head
times = times.tail
currentTime
}
}
def createFailureDetector(
threshold: Double = 8.0,
maxSampleSize: Int = 1000,
minStdDeviation: FiniteDuration = 10.millis,
acceptableLostDuration: FiniteDuration = Duration.Zero,
firstHeartbeatEstimate: FiniteDuration = 1.second,
clock: Clock = FailureDetector.defaultClock) =
new PhiAccrualFailureDetector(
threshold,
maxSampleSize,
minStdDeviation,
acceptableLostDuration,
firstHeartbeatEstimate = firstHeartbeatEstimate)(clock = clock)
def createFailureDetectorRegistry(threshold: Double = 8.0,
maxSampleSize: Int = 1000,
minStdDeviation: FiniteDuration = 10.millis,
acceptableLostDuration: FiniteDuration = Duration.Zero,
firstHeartbeatEstimate: FiniteDuration = 1.second,
clock: Clock = FailureDetector.defaultClock): FailureDetectorRegistry[String] = {
new DefaultFailureDetectorRegistry[String](() ⇒ createFailureDetector(
threshold,
maxSampleSize,
minStdDeviation,
acceptableLostDuration,
firstHeartbeatEstimate,
clock))
}
"mark node as available after a series of successful heartbeats" in {
val timeInterval = List[Long](0, 1000, 100, 100)
val fd = createFailureDetectorRegistry(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.isAvailable("resource1") should be(true)
}
"mark node as dead if heartbeat are missed" in {
val timeInterval = List[Long](0, 1000, 100, 100, 4000, 3000)
val fd = createFailureDetectorRegistry(threshold = 3, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat("resource1") //0
fd.heartbeat("resource1") //1000
fd.heartbeat("resource1") //1100
fd.isAvailable("resource1") should be(true) //1200
fd.heartbeat("resource2") //5200, but unrelated resource
fd.isAvailable("resource1") should be(false) //8200
}
"accept some configured missing heartbeats" in {
val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000)
val fd = createFailureDetectorRegistry(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.isAvailable("resource1") should be(true)
fd.heartbeat("resource1")
fd.isAvailable("resource1") should be(true)
}
"fail after configured acceptable missing heartbeats" in {
val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000)
val fd = createFailureDetectorRegistry(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval))
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.isAvailable("resource1") should be(true)
fd.heartbeat("resource1")
fd.isAvailable("resource1") should be(false)
}
"mark node as available after explicit removal of connection" in {
val timeInterval = List[Long](0, 1000, 100, 100, 100)
val fd = createFailureDetectorRegistry(clock = fakeTimeGenerator(timeInterval))
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.heartbeat("resource1")
fd.isAvailable("resource1") should be(true)
fd.remove("resource1")
fd.isAvailable("resource1") should be(true)
}
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
val fd = createFailureDetectorRegistry(clock = fakeTimeGenerator(timeInterval))
fd.isMonitoring("resource1") should be(false)
fd.heartbeat("resource1") //0
fd.heartbeat("resource1") //1000
fd.heartbeat("resource1") //1100
fd.isAvailable("resource1") should be(true) //2200
fd.isMonitoring("resource1") should be(true)
fd.remove("resource1")
fd.isMonitoring("resource1") should be(false)
fd.isAvailable("resource1") should be(true) //3300
// it receives heartbeat from an explicitly removed node
fd.heartbeat("resource1") //4400
fd.heartbeat("resource1") //5500
fd.heartbeat("resource1") //6600
fd.isAvailable("resource1") should be(true) //6700
fd.isMonitoring("resource1") should be(true)
}
}
Other Akka source code examplesHere is a short list of links related to this Akka FailureDetectorRegistrySpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.