alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ClusterRoundRobinSpec.scala)

This example Akka source code file (ClusterRoundRobinSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, actorrefroutee, address, akka, clusterroundrobinspec, concurrent, grouproutee, int, longrunningtest, none, reply, route, routeetype, router, routing

The ClusterRoundRobinSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.cluster.routing

import language.postfixOps
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.MultiNodeClusterSpec
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.routing.FromConfig
import akka.routing.RoundRobinPool
import akka.routing.ActorRefRoutee
import akka.routing.ActorSelectionRoutee
import akka.routing.RoutedActorRef
import akka.routing.GetRoutees
import akka.routing.Routees

object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig {

  class SomeActor(routeeType: RouteeType) extends Actor {
    def this() = this(PoolRoutee)

    def receive = {
      case "hit" ⇒ sender() ! Reply(routeeType, self)
    }
  }

  final case class Reply(routeeType: RouteeType, ref: ActorRef)

  sealed trait RouteeType extends Serializable
  object PoolRoutee extends RouteeType
  object GroupRoutee extends RouteeType

  val first = role("first")
  val second = role("second")
  val third = role("third")
  val fourth = role("fourth")

  commonConfig(debugConfig(on = false).
    withFallback(ConfigFactory.parseString("""
      akka.actor.deployment {
        /router1 {
          router = round-robin-pool
          nr-of-instances = 10
          cluster {
            enabled = on
            max-nr-of-instances-per-node = 2
          }
        }
        /router3 {
          router = round-robin-pool
          nr-of-instances = 10
          cluster {
            enabled = on
            max-nr-of-instances-per-node = 1
            allow-local-routees = off
          }
        }
        /router4 {
          router = round-robin-group
          nr-of-instances = 10
          routees.paths = ["/user/myserviceA", "/user/myserviceB"]
          cluster.enabled = on
        }
        /router5 {
          router = round-robin-pool
          nr-of-instances = 10
          cluster {
            enabled = on
            use-role = a
          }
        }
      }
      """)).
    withFallback(MultiNodeClusterSpec.clusterConfig))

  nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]"""))
  nodeConfig(third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]"""))

  testTransport(on = true)

}

class ClusterRoundRobinMultiJvmNode1 extends ClusterRoundRobinSpec
class ClusterRoundRobinMultiJvmNode2 extends ClusterRoundRobinSpec
class ClusterRoundRobinMultiJvmNode3 extends ClusterRoundRobinSpec
class ClusterRoundRobinMultiJvmNode4 extends ClusterRoundRobinSpec

abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMultiJvmSpec)
  with MultiNodeClusterSpec
  with ImplicitSender with DefaultTimeout {
  import ClusterRoundRobinMultiJvmSpec._

  lazy val router1 = system.actorOf(FromConfig.props(Props[SomeActor]), "router1")
  lazy val router2 = system.actorOf(ClusterRouterPool(RoundRobinPool(nrOfInstances = 0),
    ClusterRouterPoolSettings(totalInstances = 3, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)).
    props(Props[SomeActor]),
    "router2")
  lazy val router3 = system.actorOf(FromConfig.props(Props[SomeActor]), "router3")
  lazy val router4 = system.actorOf(FromConfig.props(), "router4")
  lazy val router5 = system.actorOf(RoundRobinPool(nrOfInstances = 0).props(Props[SomeActor]), "router5")

  def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = {
    val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
    (receiveWhile(5 seconds, messages = expectedReplies) {
      case Reply(`routeeType`, ref) ⇒ fullAddress(ref)
    }).foldLeft(zero) {
      case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1))
    }
  }

  /**
   * Fills in self address for local ActorRef
   */
  private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
    case Address(_, _, None, None) ⇒ cluster.selfAddress
    case a                         ⇒ a
  }

  def currentRoutees(router: ActorRef) =
    Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees

  "A cluster router with a RoundRobin router" must {
    "start cluster with 2 nodes" taggedAs LongRunningTest in {
      awaitClusterUp(first, second)
      enterBarrier("after-1")
    }

    "deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in {

      runOn(first) {
        router1.isInstanceOf[RoutedActorRef] should be(true)

        // max-nr-of-instances-per-node=2 times 2 nodes
        awaitAssert(currentRoutees(router1).size should be(4))

        val iterationCount = 10
        for (i ← 0 until iterationCount) {
          router1 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should be > (0)
        replies(second) should be > (0)
        replies(third) should be(0)
        replies(fourth) should be(0)
        replies.values.sum should be(iterationCount)
      }

      enterBarrier("after-2")
    }

    "lookup routees on the member nodes in the cluster" taggedAs LongRunningTest in {

      // cluster consists of first and second

      system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceA")
      system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceB")
      enterBarrier("myservice-started")

      runOn(first) {
        // 2 nodes, 2 routees on each node
        within(10.seconds) {
          awaitAssert(currentRoutees(router4).size should be(4))
        }

        val iterationCount = 10
        for (i ← 0 until iterationCount) {
          router4 ! "hit"
        }

        val replies = receiveReplies(GroupRoutee, iterationCount)

        replies(first) should be > (0)
        replies(second) should be > (0)
        replies(third) should be(0)
        replies(fourth) should be(0)
        replies.values.sum should be(iterationCount)
      }

      enterBarrier("after-3")
    }

    "deploy routees to new nodes in the cluster" taggedAs LongRunningTest in {

      // add third and fourth
      awaitClusterUp(first, second, third, fourth)

      runOn(first) {
        // max-nr-of-instances-per-node=2 times 4 nodes
        awaitAssert(currentRoutees(router1).size should be(8))

        val iterationCount = 10
        for (i ← 0 until iterationCount) {
          router1 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies.values.foreach { _ should be > (0) }
        replies.values.sum should be(iterationCount)
      }

      enterBarrier("after-4")
    }

    "lookup routees on new nodes in the cluster" taggedAs LongRunningTest in {

      // cluster consists of first, second, third and fourth

      runOn(first) {
        // 4 nodes, 2 routee on each node
        awaitAssert(currentRoutees(router4).size should be(8))

        val iterationCount = 10
        for (i ← 0 until iterationCount) {
          router4 ! "hit"
        }

        val replies = receiveReplies(GroupRoutee, iterationCount)

        replies.values.foreach { _ should be > (0) }
        replies.values.sum should be(iterationCount)
      }

      enterBarrier("after-5")
    }

    "deploy routees to only remote nodes when allow-local-routees = off" taggedAs LongRunningTest in {

      runOn(first) {
        // max-nr-of-instances-per-node=1 times 3 nodes
        awaitAssert(currentRoutees(router3).size should be(3))

        val iterationCount = 10
        for (i ← 0 until iterationCount) {
          router3 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should be(0)
        replies(second) should be > (0)
        replies(third) should be > (0)
        replies(fourth) should be > (0)
        replies.values.sum should be(iterationCount)
      }

      enterBarrier("after-6")
    }

    "deploy routees to specified node role" taggedAs LongRunningTest in {

      runOn(first) {
        awaitAssert(currentRoutees(router5).size should be(2))

        val iterationCount = 10
        for (i ← 0 until iterationCount) {
          router5 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should be > (0)
        replies(second) should be > (0)
        replies(third) should be(0)
        replies(fourth) should be(0)
        replies.values.sum should be(iterationCount)
      }

      enterBarrier("after-7")
    }

    "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {

      runOn(first) {
        router2.isInstanceOf[RoutedActorRef] should be(true)

        // totalInstances = 3, maxInstancesPerNode = 1
        awaitAssert(currentRoutees(router2).size should be(3))

        val iterationCount = 10
        for (i ← 0 until iterationCount) {
          router2 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        // note that router2 has totalInstances = 3, maxInstancesPerNode = 1
        val routees = currentRoutees(router2)
        val routeeAddresses = routees map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }

        routeeAddresses.size should be(3)
        replies.values.sum should be(iterationCount)
      }

      enterBarrier("after-8")
    }

    "remove routees for unreachable nodes, and add when reachable again" taggedAs LongRunningTest in within(30.seconds) {

      // myservice is already running

      def routees = currentRoutees(router4)
      def routeeAddresses = (routees map { case ActorSelectionRoutee(sel) ⇒ fullAddress(sel.anchor) }).toSet

      runOn(first) {
        // 4 nodes, 2 routees on each node
        awaitAssert(currentRoutees(router4).size should be(8))

        testConductor.blackhole(first, second, Direction.Both).await

        awaitAssert(routees.size should be(6))
        routeeAddresses should not contain (address(second))

        testConductor.passThrough(first, second, Direction.Both).await
        awaitAssert(routees.size should be(8))
        routeeAddresses should contain(address(second))

      }

      enterBarrier("after-9")
    }

    "deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in {
      muteMarkingAsUnreachable()

      runOn(first) {
        def routees = currentRoutees(router2)
        def routeeAddresses = (routees map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }).toSet

        routees foreach { case ActorRefRoutee(ref) ⇒ watch(ref) }
        val notUsedAddress = ((roles map address).toSet -- routeeAddresses).head
        val downAddress = routeeAddresses.find(_ != address(first)).get
        val downRouteeRef = routees.collectFirst {
          case ActorRefRoutee(ref) if ref.path.address == downAddress ⇒ ref
        }.get

        cluster.down(downAddress)
        expectMsgType[Terminated](15.seconds).actor should be(downRouteeRef)
        awaitAssert {
          routeeAddresses should contain(notUsedAddress)
          routeeAddresses should not contain (downAddress)
        }

        val iterationCount = 10
        for (i ← 0 until iterationCount) {
          router2 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        routeeAddresses.size should be(3)
        replies.values.sum should be(iterationCount)
      }

      enterBarrier("after-10")
    }

  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ClusterRoundRobinSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.