alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ClusterSingletonManagerSpec.scala)

This example Akka source code file (ClusterSingletonManagerSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, cluster, clustersingletonmanagerspec, concurrent, int, registerconsumer, reset, rolename, test, testing, testprobe, unexpectedunregistration, unit, unregisterconsumer

The ClusterSingletonManagerSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.contrib.pattern

import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Terminated
import akka.actor.Identify
import akka.actor.ActorIdentity
import akka.actor.ActorSelection
import akka.cluster.MemberStatus

object ClusterSingletonManagerSpec extends MultiNodeConfig {
  val controller = role("controller")
  val observer = role("observer")
  val first = role("first")
  val second = role("second")
  val third = role("third")
  val fourth = role("fourth")
  val fifth = role("fifth")
  val sixth = role("sixth")

  commonConfig(ConfigFactory.parseString("""
    akka.loglevel = INFO
    akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
    akka.remote.log-remote-lifecycle-events = off
    akka.cluster.auto-down-unreachable-after = 0s
                                          """))

  nodeConfig(first, second, third, fourth, fifth, sixth)(
    ConfigFactory.parseString("akka.cluster.roles =[worker]"))

  object PointToPointChannel {

    case object RegisterConsumer

    case object UnregisterConsumer

    case object RegistrationOk

    case object UnexpectedRegistration

    case object UnregistrationOk

    case object UnexpectedUnregistration

    case object Reset

    case object ResetOk

  }

  /**
   * This channel is extremly strict with regards to
   * registration and unregistration of consumer to
   * be able to detect misbehaviour (e.g. two active
   * singleton instances).
   */
  class PointToPointChannel extends Actor with ActorLogging {

    import PointToPointChannel._

    def receive = idle

    def idle: Receive = {
      case RegisterConsumer ⇒
        log.info("RegisterConsumer: [{}]", sender().path)
        sender() ! RegistrationOk
        context.become(active(sender()))
      case UnregisterConsumer ⇒
        log.info("UnexpectedUnregistration: [{}]", sender().path)
        sender() ! UnexpectedUnregistration
        context stop self
      case Reset ⇒ sender() ! ResetOk
      case msg   ⇒ // no consumer, drop
    }

    def active(consumer: ActorRef): Receive = {
      case UnregisterConsumer if sender() == consumer ⇒
        log.info("UnregistrationOk: [{}]", sender().path)
        sender() ! UnregistrationOk
        context.become(idle)
      case UnregisterConsumer ⇒
        log.info("UnexpectedUnregistration: [{}], expected [{}]", sender().path, consumer.path)
        sender() ! UnexpectedUnregistration
        context stop self
      case RegisterConsumer ⇒
        log.info("Unexpected RegisterConsumer [{}], active consumer [{}]", sender().path, consumer.path)
        sender() ! UnexpectedRegistration
        context stop self
      case Reset ⇒
        context.become(idle)
        sender() ! ResetOk
      case msg ⇒ consumer ! msg
    }
  }

  object Consumer {
    case object End
    case object GetCurrent
    case object Ping
    case object Pong
  }

  /**
   * The Singleton actor
   */
  class Consumer(queue: ActorRef, delegateTo: ActorRef) extends Actor {

    import Consumer._
    import PointToPointChannel._

    var current = 0

    override def preStart(): Unit = queue ! RegisterConsumer

    def receive = {
      case n: Int if n <= current ⇒
        context.stop(self)
      case n: Int ⇒
        current = n
        delegateTo ! n
      case x @ (RegistrationOk | UnexpectedRegistration) ⇒
        delegateTo ! x
      case GetCurrent ⇒
        sender() ! current
      //#consumer-end
      case End ⇒
        queue ! UnregisterConsumer
      case UnregistrationOk ⇒
        context stop self
      case Ping ⇒
        sender() ! Pong
      //#consumer-end
    }
  }

  // documentation of how to keep track of the oldest member in user land
  //#singleton-proxy
  class ConsumerProxy extends Actor {

    // subscribe to MemberEvent, re-subscribe when restart
    override def preStart(): Unit =
      Cluster(context.system).subscribe(self, classOf[MemberEvent])

    override def postStop(): Unit =
      Cluster(context.system).unsubscribe(self)

    val role = "worker"
    // sort by age, oldest first
    val ageOrdering = Ordering.fromLessThan[Member] {
      (a, b) ⇒ a.isOlderThan(b)
    }
    var membersByAge: immutable.SortedSet[Member] =
      immutable.SortedSet.empty(ageOrdering)

    def receive = {
      case state: CurrentClusterState ⇒
        membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒
          m.status == MemberStatus.Up && m.hasRole(role))
      case MemberUp(m)         ⇒ if (m.hasRole(role)) membersByAge += m
      case MemberRemoved(m, _) ⇒ if (m.hasRole(role)) membersByAge -= m
      case other ⇒ consumer foreach {
        _.tell(other, sender())
      }
    }

    def consumer: Option[ActorSelection] =
      membersByAge.headOption map (m ⇒ context.actorSelection(
        RootActorPath(m.address) / "user" / "singleton" / "consumer"))
  }

  //#singleton-proxy

}

class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec

class ClusterSingletonManagerMultiJvmNode2 extends ClusterSingletonManagerSpec

class ClusterSingletonManagerMultiJvmNode3 extends ClusterSingletonManagerSpec

class ClusterSingletonManagerMultiJvmNode4 extends ClusterSingletonManagerSpec

class ClusterSingletonManagerMultiJvmNode5 extends ClusterSingletonManagerSpec

class ClusterSingletonManagerMultiJvmNode6 extends ClusterSingletonManagerSpec

class ClusterSingletonManagerMultiJvmNode7 extends ClusterSingletonManagerSpec

class ClusterSingletonManagerMultiJvmNode8 extends ClusterSingletonManagerSpec

class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender {

  import ClusterSingletonManagerSpec._
  import ClusterSingletonManagerSpec.PointToPointChannel._
  import ClusterSingletonManagerSpec.Consumer._

  override def initialParticipants = roles.size

  val identifyProbe = TestProbe()

  val controllerRootActorPath = node(controller)

  var _msg = 0

  def msg(): Int = {
    _msg += 1
    _msg
  }

  def queue: ActorRef = {
    // this is used from inside actor construction, i.e. other thread, and must therefore not call `node(controller`
    system.actorSelection(controllerRootActorPath / "user" / "queue").tell(Identify("queue"), identifyProbe.ref)
    identifyProbe.expectMsgType[ActorIdentity].ref.get
  }

  def join(from: RoleName, to: RoleName): Unit = {
    runOn(from) {
      Cluster(system) join node(to).address
      if (Cluster(system).selfRoles.contains("worker")) {
        createSingleton()
        createSingletonProxy()
      }
    }
  }

  def awaitMemberUp(memberProbe: TestProbe, nodes: RoleName*): Unit = {
    runOn(nodes.filterNot(_ == nodes.head): _*) {
      memberProbe.expectMsgType[MemberUp](15.seconds).member.address should be(node(nodes.head).address)
    }
    runOn(nodes.head) {
      memberProbe.receiveN(nodes.size, 15.seconds).collect { case MemberUp(m) ⇒ m.address }.toSet should be(
        nodes.map(node(_).address).toSet)
    }
    enterBarrier(nodes.head.name + "-up")
  }

  def createSingleton(): ActorRef = {
    //#create-singleton-manager
    system.actorOf(ClusterSingletonManager.props(
      singletonProps = Props(classOf[Consumer], queue, testActor),
      singletonName = "consumer",
      terminationMessage = End,
      role = Some("worker")),
      name = "singleton")
    //#create-singleton-manager
  }

  def createSingletonProxy(): ActorRef = {
    //#create-singleton-proxy
    system.actorOf(ClusterSingletonProxy.props(
      singletonPath = "/user/singleton/consumer",
      role = Some("worker")),
      name = "consumerProxy")
    //#create-singleton-proxy
  }

  def verifyProxyMsg(oldest: RoleName, proxyNode: RoleName, msg: Int): Unit = {
    enterBarrier("before-" + msg + "-proxy-verified")

    // send a message to the proxy
    runOn(proxyNode) {
      // make sure that the proxy has received membership changes
      // and points to the current singleton
      val p = TestProbe()
      within(5.seconds) {
        awaitAssert {
          system.actorSelection("/user/consumerProxy").tell(Ping, p.ref)
          p.expectMsg(1.second, Pong)
        }
      }
      // then send the real message
      system.actorSelection("/user/consumerProxy") ! msg
    }

    // expect a message on the oldest node
    runOn(oldest) {
      expectMsg(5.seconds, msg)
    }

    enterBarrier("after-" + msg + "-proxy-verified")
  }

  def consumer(oldest: RoleName): ActorSelection =
    system.actorSelection(RootActorPath(node(oldest).address) / "user" / "singleton" / "consumer")

  def verifyRegistration(oldest: RoleName): Unit = {
    enterBarrier("before-" + oldest.name + "-registration-verified")
    runOn(oldest) {
      expectMsg(RegistrationOk)
      consumer(oldest) ! GetCurrent
      expectMsg(0)
    }
    enterBarrier("after-" + oldest.name + "-registration-verified")
  }

  def verifyMsg(oldest: RoleName, msg: Int): Unit = {
    enterBarrier("before-" + msg + "-verified")

    runOn(controller) {
      queue ! msg
      // make sure it's not terminated, which would be wrong
      expectNoMsg(1 second)
    }
    runOn(oldest) {
      expectMsg(5.seconds, msg)
    }
    runOn(roles.filterNot(r ⇒ r == oldest || r == controller || r == observer): _*) {
      expectNoMsg(1 second)
    }
    enterBarrier("after-" + msg + "-verified")
  }

  def crash(roles: RoleName*): Unit = {
    runOn(controller) {
      queue ! Reset
      expectMsg(ResetOk)
      roles foreach { r ⇒
        log.info("Shutdown [{}]", node(r).address)
        testConductor.exit(r, 0).await
      }
    }
  }

  "A ClusterSingletonManager" must {

    "startup 6 node cluster" in within(60 seconds) {

      val memberProbe = TestProbe()
      Cluster(system).subscribe(memberProbe.ref, classOf[MemberUp])
      memberProbe.expectMsgClass(classOf[CurrentClusterState])

      runOn(controller) {
        // watch that it is not terminated, which would indicate misbehaviour
        watch(system.actorOf(Props[PointToPointChannel], "queue"))
      }
      enterBarrier("queue-started")

      join(first, first)
      awaitMemberUp(memberProbe, first)
      verifyRegistration(first)
      verifyMsg(first, msg = msg())

      // join the observer node as well, which should not influence since it doesn't have the "worker" role
      join(observer, first)
      awaitMemberUp(memberProbe, observer, first)
      verifyProxyMsg(first, first, msg = msg())

      join(second, first)
      awaitMemberUp(memberProbe, second, observer, first)
      verifyMsg(first, msg = msg())
      verifyProxyMsg(first, second, msg = msg())

      join(third, first)
      awaitMemberUp(memberProbe, third, second, observer, first)
      verifyMsg(first, msg = msg())
      verifyProxyMsg(first, third, msg = msg())

      join(fourth, first)
      awaitMemberUp(memberProbe, fourth, third, second, observer, first)
      verifyMsg(first, msg = msg())
      verifyProxyMsg(first, fourth, msg = msg())

      join(fifth, first)
      awaitMemberUp(memberProbe, fifth, fourth, third, second, observer, first)
      verifyMsg(first, msg = msg())
      verifyProxyMsg(first, fifth, msg = msg())

      join(sixth, first)
      awaitMemberUp(memberProbe, sixth, fifth, fourth, third, second, observer, first)
      verifyMsg(first, msg = msg())
      verifyProxyMsg(first, sixth, msg = msg())

      enterBarrier("after-1")
    }

    "let the proxy route messages to the singleton in a 6 node cluster" in within(60 seconds) {
      verifyProxyMsg(first, first, msg = msg())
      verifyProxyMsg(first, second, msg = msg())
      verifyProxyMsg(first, third, msg = msg())
      verifyProxyMsg(first, fourth, msg = msg())
      verifyProxyMsg(first, fifth, msg = msg())
      verifyProxyMsg(first, sixth, msg = msg())
    }

    "hand over when oldest leaves in 6 nodes cluster " in within(30 seconds) {
      val leaveRole = first
      val newOldestRole = second

      runOn(leaveRole) {
        Cluster(system) leave node(leaveRole).address
      }

      verifyRegistration(second)
      verifyMsg(second, msg = msg())
      verifyProxyMsg(second, second, msg = msg())
      verifyProxyMsg(second, third, msg = msg())
      verifyProxyMsg(second, fourth, msg = msg())
      verifyProxyMsg(second, fifth, msg = msg())
      verifyProxyMsg(second, sixth, msg = msg())

      runOn(leaveRole) {
        system.actorSelection("/user/singleton").tell(Identify("singleton"), identifyProbe.ref)
        identifyProbe.expectMsgPF() {
          case ActorIdentity("singleton", None) ⇒ // already terminated
          case ActorIdentity("singleton", Some(singleton)) ⇒
            watch(singleton)
            expectTerminated(singleton)
        }
      }

      enterBarrier("after-leave")
    }

    "take over when oldest crashes in 5 nodes cluster" in within(60 seconds) {
      // mute logging of deadLetters during shutdown of systems
      if (!log.isDebugEnabled)
        system.eventStream.publish(Mute(DeadLettersFilter[Any]))
      enterBarrier("logs-muted")

      crash(second)
      verifyRegistration(third)
      verifyMsg(third, msg = msg())
      verifyProxyMsg(third, third, msg = msg())
      verifyProxyMsg(third, fourth, msg = msg())
      verifyProxyMsg(third, fifth, msg = msg())
      verifyProxyMsg(third, sixth, msg = msg())
    }

    "take over when two oldest crash in 3 nodes cluster" in within(60 seconds) {
      crash(third, fourth)
      verifyRegistration(fifth)
      verifyMsg(fifth, msg = msg())
      verifyProxyMsg(fifth, fifth, msg = msg())
      verifyProxyMsg(fifth, sixth, msg = msg())
    }

    "take over when oldest crashes in 2 nodes cluster" in within(60 seconds) {
      crash(fifth)
      verifyRegistration(sixth)
      verifyMsg(sixth, msg = msg())
      verifyProxyMsg(sixth, sixth, msg = msg())
    }

  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ClusterSingletonManagerSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.