|
Akka/Scala example source code file (ClusterHeartbeatSenderStateSpec.scala)
The ClusterHeartbeatSenderStateSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import org.scalatest.WordSpec import org.scalatest.Matchers import akka.actor.Address import scala.concurrent.duration._ import scala.collection.immutable import akka.remote.FailureDetector import akka.remote.DefaultFailureDetectorRegistry import scala.concurrent.forkjoin.ThreadLocalRandom object ClusterHeartbeatSenderStateSpec { class FailureDetectorStub extends FailureDetector { trait Status object Up extends Status object Down extends Status object Unknown extends Status private var status: Status = Unknown def markNodeAsUnavailable(): Unit = status = Down def markNodeAsAvailable(): Unit = status = Up override def isAvailable: Boolean = status match { case Unknown | Up ⇒ true case Down ⇒ false } override def isMonitoring: Boolean = status != Unknown override def heartbeat(): Unit = status = Up } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { import ClusterHeartbeatSenderStateSpec._ val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1) val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2) val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3) val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4) val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5) def emptyState: ClusterHeartbeatSenderState = emptyState(aa) def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState( ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), monitoredByNrOfMembers = 3), unreachable = Set.empty[UniqueAddress], failureDetector = new DefaultFailureDetectorRegistry[Address](() ⇒ new FailureDetectorStub)) def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub = state.failureDetector.asInstanceOf[DefaultFailureDetectorRegistry[Address]].failureDetector(node.address). get.asInstanceOf[FailureDetectorStub] "A ClusterHeartbeatSenderState" must { "return empty active set when no nodes" in { emptyState.activeReceivers.isEmpty should be(true) } "init with empty" in { emptyState.init(Set.empty).activeReceivers should be(Set.empty) } "init with self" in { emptyState.init(Set(aa, bb, cc)).activeReceivers should be(Set(bb, cc)) } "init without self" in { emptyState.init(Set(bb, cc)).activeReceivers should be(Set(bb, cc)) } "use added members" in { emptyState.addMember(bb).addMember(cc).activeReceivers should be(Set(bb, cc)) } "not use removed members" in { emptyState.addMember(bb).addMember(cc).removeMember(bb).activeReceivers should be(Set(cc)) } "use specified number of members" in { // they are sorted by the hash (uid) of the UniqueAddress emptyState.addMember(cc).addMember(dd).addMember(bb).addMember(ee).activeReceivers should be(Set(bb, cc, dd)) } "update failure detector in active set" in { val s1 = emptyState.addMember(bb).addMember(cc).addMember(dd) val s2 = s1.heartbeatRsp(bb).heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) s2.failureDetector.isMonitoring(bb.address) should be(true) s2.failureDetector.isMonitoring(cc.address) should be(true) s2.failureDetector.isMonitoring(dd.address) should be(true) s2.failureDetector.isMonitoring(ee.address) should be(false) } "continue to use unreachable" in { val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee) val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) fd(s2, ee).markNodeAsUnavailable() s2.failureDetector.isAvailable(ee.address) should be(false) s2.addMember(bb).activeReceivers should be(Set(bb, cc, dd, ee)) } "remove unreachable when coming back" in { val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee) val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) fd(s2, dd).markNodeAsUnavailable() fd(s2, ee).markNodeAsUnavailable() val s3 = s2.addMember(bb) s3.activeReceivers should be(Set(bb, cc, dd, ee)) val s4 = s3.heartbeatRsp(bb).heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) s4.activeReceivers should be(Set(bb, cc, dd)) s4.failureDetector.isMonitoring(ee.address) should be(false) } "remove unreachable when member removed" in { val s1 = emptyState.addMember(cc).addMember(dd).addMember(ee) val s2 = s1.heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) fd(s2, cc).markNodeAsUnavailable() fd(s2, ee).markNodeAsUnavailable() val s3 = s2.addMember(bb).heartbeatRsp(bb) s3.activeReceivers should be(Set(bb, cc, dd, ee)) val s4 = s3.removeMember(cc).removeMember(ee) s4.activeReceivers should be(Set(bb, dd)) s4.failureDetector.isMonitoring(cc.address) should be(false) s4.failureDetector.isMonitoring(ee.address) should be(false) } "behave correctly for random operations" in { val rnd = ThreadLocalRandom.current val nodes = (1 to rnd.nextInt(10, 200)).map(n ⇒ UniqueAddress(Address("akka.tcp", "sys", "n" + n, 2552), n)).toVector def rndNode() = nodes(rnd.nextInt(0, nodes.size)) val selfUniqueAddress = rndNode() var state = emptyState(selfUniqueAddress) val Add = 0 val Remove = 1 val Unreachable = 2 val HeartbeatRsp = 3 for (i ← 1 to 100000) { val operation = rnd.nextInt(Add, HeartbeatRsp + 1) val node = rndNode() try { operation match { case Add ⇒ if (node != selfUniqueAddress && !state.ring.nodes.contains(node)) { val oldUnreachable = state.unreachable state = state.addMember(node) // keep unreachable (oldUnreachable -- state.activeReceivers) should be(Set.empty) state.failureDetector.isMonitoring(node.address) should be(false) state.failureDetector.isAvailable(node.address) should be(true) } case Remove ⇒ if (node != selfUniqueAddress && state.ring.nodes.contains(node)) { val oldUnreachable = state.unreachable state = state.removeMember(node) // keep unreachable, unless it was the removed if (oldUnreachable(node)) (oldUnreachable -- state.activeReceivers) should be(Set(node)) else (oldUnreachable -- state.activeReceivers) should be(Set.empty) state.failureDetector.isMonitoring(node.address) should be(false) state.failureDetector.isAvailable(node.address) should be(true) state.activeReceivers should not contain (node) } case Unreachable ⇒ if (node != selfUniqueAddress && state.activeReceivers(node)) { state.failureDetector.heartbeat(node.address) // make sure the fd is created fd(state, node).markNodeAsUnavailable() state.failureDetector.isMonitoring(node.address) should be(true) state.failureDetector.isAvailable(node.address) should be(false) } case HeartbeatRsp ⇒ if (node != selfUniqueAddress && state.ring.nodes.contains(node)) { val oldUnreachable = state.unreachable val oldReceivers = state.activeReceivers val oldRingReceivers = state.ring.myReceivers state = state.heartbeatRsp(node) if (oldUnreachable(node)) state.unreachable should not contain (node) if (oldUnreachable(node) && !oldRingReceivers(node)) state.failureDetector.isMonitoring(node.address) should be(false) if (oldRingReceivers(node)) state.failureDetector.isMonitoring(node.address) should be(true) state.ring.myReceivers should be(oldRingReceivers) state.failureDetector.isAvailable(node.address) should be(true) } } } catch { case e: Throwable ⇒ println(s"Failure context: i=$i, node=$node, op=$operation, unreachable=${state.unreachable}, " + s"ringReceivers=${state.ring.myReceivers}, ringNodes=${state.ring.nodes}") throw e } } } } } Other Akka source code examplesHere is a short list of links related to this Akka ClusterHeartbeatSenderStateSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.