|
Akka/Scala example source code file (ClusterSingletonManagerSpec.scala)
The ClusterSingletonManagerSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.contrib.pattern import language.postfixOps import scala.collection.immutable import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.Address import akka.actor.Props import akka.actor.RootActorPath import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.Member import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import akka.testkit.TestEvent._ import akka.actor.Terminated import akka.actor.Identify import akka.actor.ActorIdentity import akka.actor.ActorSelection import akka.cluster.MemberStatus object ClusterSingletonManagerSpec extends MultiNodeConfig { val controller = role("controller") val observer = role("observer") val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") val fifth = role("fifth") val sixth = role("sixth") commonConfig(ConfigFactory.parseString(""" akka.loglevel = INFO akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s """)) nodeConfig(first, second, third, fourth, fifth, sixth)( ConfigFactory.parseString("akka.cluster.roles =[worker]")) object PointToPointChannel { case object RegisterConsumer case object UnregisterConsumer case object RegistrationOk case object UnexpectedRegistration case object UnregistrationOk case object UnexpectedUnregistration case object Reset case object ResetOk } /** * This channel is extremly strict with regards to * registration and unregistration of consumer to * be able to detect misbehaviour (e.g. two active * singleton instances). */ class PointToPointChannel extends Actor with ActorLogging { import PointToPointChannel._ def receive = idle def idle: Receive = { case RegisterConsumer ⇒ log.info("RegisterConsumer: [{}]", sender().path) sender() ! RegistrationOk context.become(active(sender())) case UnregisterConsumer ⇒ log.info("UnexpectedUnregistration: [{}]", sender().path) sender() ! UnexpectedUnregistration context stop self case Reset ⇒ sender() ! ResetOk case msg ⇒ // no consumer, drop } def active(consumer: ActorRef): Receive = { case UnregisterConsumer if sender() == consumer ⇒ log.info("UnregistrationOk: [{}]", sender().path) sender() ! UnregistrationOk context.become(idle) case UnregisterConsumer ⇒ log.info("UnexpectedUnregistration: [{}], expected [{}]", sender().path, consumer.path) sender() ! UnexpectedUnregistration context stop self case RegisterConsumer ⇒ log.info("Unexpected RegisterConsumer [{}], active consumer [{}]", sender().path, consumer.path) sender() ! UnexpectedRegistration context stop self case Reset ⇒ context.become(idle) sender() ! ResetOk case msg ⇒ consumer ! msg } } object Consumer { case object End case object GetCurrent case object Ping case object Pong } /** * The Singleton actor */ class Consumer(queue: ActorRef, delegateTo: ActorRef) extends Actor { import Consumer._ import PointToPointChannel._ var current = 0 override def preStart(): Unit = queue ! RegisterConsumer def receive = { case n: Int if n <= current ⇒ context.stop(self) case n: Int ⇒ current = n delegateTo ! n case x @ (RegistrationOk | UnexpectedRegistration) ⇒ delegateTo ! x case GetCurrent ⇒ sender() ! current //#consumer-end case End ⇒ queue ! UnregisterConsumer case UnregistrationOk ⇒ context stop self case Ping ⇒ sender() ! Pong //#consumer-end } } // documentation of how to keep track of the oldest member in user land //#singleton-proxy class ConsumerProxy extends Actor { // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = Cluster(context.system).subscribe(self, classOf[MemberEvent]) override def postStop(): Unit = Cluster(context.system).unsubscribe(self) val role = "worker" // sort by age, oldest first val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) def receive = { case state: CurrentClusterState ⇒ membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒ m.status == MemberStatus.Up && m.hasRole(role)) case MemberUp(m) ⇒ if (m.hasRole(role)) membersByAge += m case MemberRemoved(m, _) ⇒ if (m.hasRole(role)) membersByAge -= m case other ⇒ consumer foreach { _.tell(other, sender()) } } def consumer: Option[ActorSelection] = membersByAge.headOption map (m ⇒ context.actorSelection( RootActorPath(m.address) / "user" / "singleton" / "consumer")) } //#singleton-proxy } class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode2 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode3 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode4 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode5 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode6 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode7 extends ClusterSingletonManagerSpec class ClusterSingletonManagerMultiJvmNode8 extends ClusterSingletonManagerSpec class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender { import ClusterSingletonManagerSpec._ import ClusterSingletonManagerSpec.PointToPointChannel._ import ClusterSingletonManagerSpec.Consumer._ override def initialParticipants = roles.size val identifyProbe = TestProbe() val controllerRootActorPath = node(controller) var _msg = 0 def msg(): Int = { _msg += 1 _msg } def queue: ActorRef = { // this is used from inside actor construction, i.e. other thread, and must therefore not call `node(controller` system.actorSelection(controllerRootActorPath / "user" / "queue").tell(Identify("queue"), identifyProbe.ref) identifyProbe.expectMsgType[ActorIdentity].ref.get } def join(from: RoleName, to: RoleName): Unit = { runOn(from) { Cluster(system) join node(to).address if (Cluster(system).selfRoles.contains("worker")) { createSingleton() createSingletonProxy() } } } def awaitMemberUp(memberProbe: TestProbe, nodes: RoleName*): Unit = { runOn(nodes.filterNot(_ == nodes.head): _*) { memberProbe.expectMsgType[MemberUp](15.seconds).member.address should be(node(nodes.head).address) } runOn(nodes.head) { memberProbe.receiveN(nodes.size, 15.seconds).collect { case MemberUp(m) ⇒ m.address }.toSet should be( nodes.map(node(_).address).toSet) } enterBarrier(nodes.head.name + "-up") } def createSingleton(): ActorRef = { //#create-singleton-manager system.actorOf(ClusterSingletonManager.props( singletonProps = Props(classOf[Consumer], queue, testActor), singletonName = "consumer", terminationMessage = End, role = Some("worker")), name = "singleton") //#create-singleton-manager } def createSingletonProxy(): ActorRef = { //#create-singleton-proxy system.actorOf(ClusterSingletonProxy.props( singletonPath = "/user/singleton/consumer", role = Some("worker")), name = "consumerProxy") //#create-singleton-proxy } def verifyProxyMsg(oldest: RoleName, proxyNode: RoleName, msg: Int): Unit = { enterBarrier("before-" + msg + "-proxy-verified") // send a message to the proxy runOn(proxyNode) { // make sure that the proxy has received membership changes // and points to the current singleton val p = TestProbe() within(5.seconds) { awaitAssert { system.actorSelection("/user/consumerProxy").tell(Ping, p.ref) p.expectMsg(1.second, Pong) } } // then send the real message system.actorSelection("/user/consumerProxy") ! msg } // expect a message on the oldest node runOn(oldest) { expectMsg(5.seconds, msg) } enterBarrier("after-" + msg + "-proxy-verified") } def consumer(oldest: RoleName): ActorSelection = system.actorSelection(RootActorPath(node(oldest).address) / "user" / "singleton" / "consumer") def verifyRegistration(oldest: RoleName): Unit = { enterBarrier("before-" + oldest.name + "-registration-verified") runOn(oldest) { expectMsg(RegistrationOk) consumer(oldest) ! GetCurrent expectMsg(0) } enterBarrier("after-" + oldest.name + "-registration-verified") } def verifyMsg(oldest: RoleName, msg: Int): Unit = { enterBarrier("before-" + msg + "-verified") runOn(controller) { queue ! msg // make sure it's not terminated, which would be wrong expectNoMsg(1 second) } runOn(oldest) { expectMsg(5.seconds, msg) } runOn(roles.filterNot(r ⇒ r == oldest || r == controller || r == observer): _*) { expectNoMsg(1 second) } enterBarrier("after-" + msg + "-verified") } def crash(roles: RoleName*): Unit = { runOn(controller) { queue ! Reset expectMsg(ResetOk) roles foreach { r ⇒ log.info("Shutdown [{}]", node(r).address) testConductor.exit(r, 0).await } } } "A ClusterSingletonManager" must { "startup 6 node cluster" in within(60 seconds) { val memberProbe = TestProbe() Cluster(system).subscribe(memberProbe.ref, classOf[MemberUp]) memberProbe.expectMsgClass(classOf[CurrentClusterState]) runOn(controller) { // watch that it is not terminated, which would indicate misbehaviour watch(system.actorOf(Props[PointToPointChannel], "queue")) } enterBarrier("queue-started") join(first, first) awaitMemberUp(memberProbe, first) verifyRegistration(first) verifyMsg(first, msg = msg()) // join the observer node as well, which should not influence since it doesn't have the "worker" role join(observer, first) awaitMemberUp(memberProbe, observer, first) verifyProxyMsg(first, first, msg = msg()) join(second, first) awaitMemberUp(memberProbe, second, observer, first) verifyMsg(first, msg = msg()) verifyProxyMsg(first, second, msg = msg()) join(third, first) awaitMemberUp(memberProbe, third, second, observer, first) verifyMsg(first, msg = msg()) verifyProxyMsg(first, third, msg = msg()) join(fourth, first) awaitMemberUp(memberProbe, fourth, third, second, observer, first) verifyMsg(first, msg = msg()) verifyProxyMsg(first, fourth, msg = msg()) join(fifth, first) awaitMemberUp(memberProbe, fifth, fourth, third, second, observer, first) verifyMsg(first, msg = msg()) verifyProxyMsg(first, fifth, msg = msg()) join(sixth, first) awaitMemberUp(memberProbe, sixth, fifth, fourth, third, second, observer, first) verifyMsg(first, msg = msg()) verifyProxyMsg(first, sixth, msg = msg()) enterBarrier("after-1") } "let the proxy route messages to the singleton in a 6 node cluster" in within(60 seconds) { verifyProxyMsg(first, first, msg = msg()) verifyProxyMsg(first, second, msg = msg()) verifyProxyMsg(first, third, msg = msg()) verifyProxyMsg(first, fourth, msg = msg()) verifyProxyMsg(first, fifth, msg = msg()) verifyProxyMsg(first, sixth, msg = msg()) } "hand over when oldest leaves in 6 nodes cluster " in within(30 seconds) { val leaveRole = first val newOldestRole = second runOn(leaveRole) { Cluster(system) leave node(leaveRole).address } verifyRegistration(second) verifyMsg(second, msg = msg()) verifyProxyMsg(second, second, msg = msg()) verifyProxyMsg(second, third, msg = msg()) verifyProxyMsg(second, fourth, msg = msg()) verifyProxyMsg(second, fifth, msg = msg()) verifyProxyMsg(second, sixth, msg = msg()) runOn(leaveRole) { system.actorSelection("/user/singleton").tell(Identify("singleton"), identifyProbe.ref) identifyProbe.expectMsgPF() { case ActorIdentity("singleton", None) ⇒ // already terminated case ActorIdentity("singleton", Some(singleton)) ⇒ watch(singleton) expectTerminated(singleton) } } enterBarrier("after-leave") } "take over when oldest crashes in 5 nodes cluster" in within(60 seconds) { // mute logging of deadLetters during shutdown of systems if (!log.isDebugEnabled) system.eventStream.publish(Mute(DeadLettersFilter[Any])) enterBarrier("logs-muted") crash(second) verifyRegistration(third) verifyMsg(third, msg = msg()) verifyProxyMsg(third, third, msg = msg()) verifyProxyMsg(third, fourth, msg = msg()) verifyProxyMsg(third, fifth, msg = msg()) verifyProxyMsg(third, sixth, msg = msg()) } "take over when two oldest crash in 3 nodes cluster" in within(60 seconds) { crash(third, fourth) verifyRegistration(fifth) verifyMsg(fifth, msg = msg()) verifyProxyMsg(fifth, fifth, msg = msg()) verifyProxyMsg(fifth, sixth, msg = msg()) } "take over when oldest crashes in 2 nodes cluster" in within(60 seconds) { crash(fifth) verifyRegistration(sixth) verifyMsg(sixth, msg = msg()) verifyProxyMsg(sixth, sixth, msg = msg()) } } } Other Akka source code examplesHere is a short list of links related to this Akka ClusterSingletonManagerSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.