|
Akka/Scala example source code file (AccrualFailureDetectorSpec.scala)
The AccrualFailureDetectorSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.remote import akka.testkit.AkkaSpec import scala.collection.immutable.TreeMap import scala.concurrent.duration._ import akka.remote.FailureDetector.Clock @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class AccrualFailureDetectorSpec extends AkkaSpec("akka.loglevel = INFO") { "An AccrualFailureDetector" must { def fakeTimeGenerator(timeIntervals: Seq[Long]): Clock = new Clock { @volatile var times = timeIntervals.tail.foldLeft(List[Long](timeIntervals.head))((acc, c) ⇒ acc ::: List[Long](acc.last + c)) override def apply(): Long = { val currentTime = times.head times = times.tail currentTime } } def createFailureDetector( threshold: Double = 8.0, maxSampleSize: Int = 1000, minStdDeviation: FiniteDuration = 100.millis, acceptableLostDuration: FiniteDuration = Duration.Zero, firstHeartbeatEstimate: FiniteDuration = 1.second, clock: Clock = FailureDetector.defaultClock) = new PhiAccrualFailureDetector( threshold, maxSampleSize, minStdDeviation, acceptableLostDuration, firstHeartbeatEstimate = firstHeartbeatEstimate)(clock = clock) def cdf(phi: Double) = 1.0 - math.pow(10, -phi) "use good enough cumulative distribution function" in { val fd = createFailureDetector() cdf(fd.phi(0, 0, 10)) should be(0.5 +- (0.001)) cdf(fd.phi(6L, 0, 10)) should be(0.7257 +- (0.001)) cdf(fd.phi(15L, 0, 10)) should be(0.9332 +- (0.001)) cdf(fd.phi(20L, 0, 10)) should be(0.97725 +- (0.001)) cdf(fd.phi(25L, 0, 10)) should be(0.99379 +- (0.001)) cdf(fd.phi(35L, 0, 10)) should be(0.99977 +- (0.001)) cdf(fd.phi(40L, 0, 10)) should be(0.99997 +- (0.0001)) for (x :: y :: Nil ← (0 to 40).toList.sliding(2)) { fd.phi(x, 0, 10) should be < (fd.phi(y, 0, 10)) } cdf(fd.phi(22, 20.0, 3)) should be(0.7475 +- (0.001)) } "handle outliers without losing precision or hitting exceptions" in { val fd = createFailureDetector() fd.phi(10L, 0, 1) should be(38.0 +- 1.0) fd.phi(-25L, 0, 1) should be(0.0) } "return realistic phi values" in { val fd = createFailureDetector() val test = TreeMap(0 -> 0.0, 500 -> 0.1, 1000 -> 0.3, 1200 -> 1.6, 1400 -> 4.7, 1600 -> 10.8, 1700 -> 15.3) for ((timeDiff, expectedPhi) ← test) { fd.phi(timeDiff = timeDiff, mean = 1000.0, stdDeviation = 100.0) should be(expectedPhi +- (0.1)) } // larger stdDeviation results => lower phi fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 500.0) should be < ( fd.phi(timeDiff = 1100, mean = 1000.0, stdDeviation = 100.0)) } "return phi value of 0.0 on startup for each address, when no heartbeats" in { val fd = createFailureDetector() fd.phi should be(0.0) fd.phi should be(0.0) } "return phi based on guess when only one heartbeat" in { val timeInterval = List[Long](0, 1000, 1000, 1000, 1000) val fd = createFailureDetector(firstHeartbeatEstimate = 1.seconds, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat() fd.phi should be(0.3 +- 0.2) fd.phi should be(4.5 +- 0.3) fd.phi should be > (15.0) } "return phi value using first interval after second heartbeat" in { val timeInterval = List[Long](0, 100, 100, 100) val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) fd.heartbeat() fd.phi should be > (0.0) fd.heartbeat() fd.phi should be > (0.0) } "mark node as monitored after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) fd.isMonitoring should be(false) fd.heartbeat() fd.heartbeat() fd.heartbeat() fd.isMonitoring should be(true) fd.isAvailable should be(true) } "mark node as dead if heartbeat are missed" in { val timeInterval = List[Long](0, 1000, 100, 100, 7000) val fd = createFailureDetector(threshold = 3, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat() //0 fd.heartbeat() //1000 fd.heartbeat() //1100 fd.isAvailable should be(true) //1200 fd.isAvailable should be(false) //8200 } "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { // 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger unreachable again val regularIntervals = 0L +: Vector.fill(999)(1000L) val timeIntervals = regularIntervals :+ (5 * 60 * 1000L) :+ 100L :+ 900L :+ 100L :+ 7000L :+ 100L :+ 900L :+ 100L :+ 900L val fd = createFailureDetector(threshold = 8, acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeIntervals)) for (_ ← 0 until 1000) fd.heartbeat() fd.isAvailable should be(false) // after the long pause fd.heartbeat() fd.isAvailable should be(true) fd.heartbeat() fd.isAvailable should be(false) // after the 7 seconds pause fd.heartbeat() fd.isAvailable should be(true) fd.heartbeat() fd.isAvailable should be(true) } "accept some configured missing heartbeats" in { val timeInterval = List[Long](0, 1000, 1000, 1000, 4000, 1000, 1000) val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat() fd.heartbeat() fd.heartbeat() fd.heartbeat() fd.isAvailable should be(true) fd.heartbeat() fd.isAvailable should be(true) } "fail after configured acceptable missing heartbeats" in { val timeInterval = List[Long](0, 1000, 1000, 1000, 1000, 1000, 500, 500, 5000) val fd = createFailureDetector(acceptableLostDuration = 3.seconds, clock = fakeTimeGenerator(timeInterval)) fd.heartbeat() fd.heartbeat() fd.heartbeat() fd.heartbeat() fd.heartbeat() fd.heartbeat() fd.isAvailable should be(true) fd.heartbeat() fd.isAvailable should be(false) } "use maxSampleSize heartbeats" in { val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 500, 500, 500, 500, 500) val fd = createFailureDetector(maxSampleSize = 3, clock = fakeTimeGenerator(timeInterval)) // 100 ms interval fd.heartbeat() //0 fd.heartbeat() //100 fd.heartbeat() //200 fd.heartbeat() //300 val phi1 = fd.phi //400 // 500 ms interval, should become same phi when 100 ms intervals have been dropped fd.heartbeat() //1000 fd.heartbeat() //1500 fd.heartbeat() //2000 fd.heartbeat() //2500 val phi2 = fd.phi //3000 phi2 should be(phi1.plusOrMinus(0.001)) } } "Statistics for heartbeats" must { "calculate correct mean and variance" in { val samples = Seq(100, 200, 125, 340, 130) val stats = (HeartbeatHistory(maxSampleSize = 20) /: samples) { (stats, value) ⇒ stats :+ value } stats.mean should be(179.0 +- 0.00001) stats.variance should be(7584.0 +- 0.00001) } "have 0.0 variance for one sample" in { (HeartbeatHistory(600) :+ 1000L).variance should be(0.0 +- 0.00001) } "be capped by the specified maxSampleSize" in { val history3 = HeartbeatHistory(maxSampleSize = 3) :+ 100 :+ 110 :+ 90 history3.mean should be(100.0 +- 0.00001) history3.variance should be(66.6666667 +- 0.00001) val history4 = history3 :+ 140 history4.mean should be(113.333333 +- 0.00001) history4.variance should be(422.222222 +- 0.00001) val history5 = history4 :+ 80 history5.mean should be(103.333333 +- 0.00001) history5.variance should be(688.88888889 +- 0.00001) } } } Other Akka source code examplesHere is a short list of links related to this Akka AccrualFailureDetectorSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.