alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ClusterConsistentHashingRouterSpec.scala)

This example Akka source code file (ClusterConsistentHashingRouterSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

a, actor, actorref, actorrefroutee, address, akka, clusterconsistenthashingrouterspec, concurrent, consistenthashingpool, consistenthashmapping, longrunningtest, none, route, router, routing, string

The ClusterConsistentHashingRouterSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.cluster.routing

import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.cluster.MultiNodeClusterSpec
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.routing.ConsistentHashingRouter
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import akka.routing.GetRoutees
import akka.routing.FromConfig
import akka.testkit._
import akka.routing.ActorRefRoutee
import akka.routing.ConsistentHashingPool
import akka.routing.Routees

object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig {

  class Echo extends Actor {
    def receive = {
      case _ ⇒ sender() ! self
    }
  }

  val first = role("first")
  val second = role("second")
  val third = role("third")

  commonConfig(debugConfig(on = false).
    withFallback(ConfigFactory.parseString("""
      common-router-settings = {
        router = consistent-hashing-pool
        nr-of-instances = 10
        cluster {
          enabled = on
          max-nr-of-instances-per-node = 2
        }
      }

      akka.actor.deployment {
        /router1 = ${common-router-settings}
        /router3 = ${common-router-settings}
        /router4 = ${common-router-settings}
      }
      """)).
    withFallback(MultiNodeClusterSpec.clusterConfig))

}

class ClusterConsistentHashingRouterMultiJvmNode1 extends ClusterConsistentHashingRouterSpec
class ClusterConsistentHashingRouterMultiJvmNode2 extends ClusterConsistentHashingRouterSpec
class ClusterConsistentHashingRouterMultiJvmNode3 extends ClusterConsistentHashingRouterSpec

abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterConsistentHashingRouterMultiJvmSpec)
  with MultiNodeClusterSpec
  with ImplicitSender with DefaultTimeout {
  import ClusterConsistentHashingRouterMultiJvmSpec._

  lazy val router1 = system.actorOf(FromConfig.props(Props[Echo]), "router1")

  def currentRoutees(router: ActorRef) =
    Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees

  /**
   * Fills in self address for local ActorRef
   */
  private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
    case Address(_, _, None, None) ⇒ cluster.selfAddress
    case a                         ⇒ a
  }

  "A cluster router with a consistent hashing pool" must {
    "start cluster with 2 nodes" taggedAs LongRunningTest in {
      awaitClusterUp(first, second)
      enterBarrier("after-1")
    }

    "create routees from configuration" in {
      runOn(first) {
        // it may take some time until router receives cluster member events
        awaitAssert { currentRoutees(router1).size should be(4) }
        val routees = currentRoutees(router1)
        routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(Set(address(first), address(second)))
      }
      enterBarrier("after-2")
    }

    "select destination based on hashKey" in {
      runOn(first) {
        router1 ! ConsistentHashableEnvelope(message = "A", hashKey = "a")
        val destinationA = expectMsgType[ActorRef]
        router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a")
        expectMsg(destinationA)
      }
      enterBarrier("after-2")
    }

    "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in {

      awaitClusterUp(first, second, third)

      runOn(first) {
        // it may take some time until router receives cluster member events
        awaitAssert { currentRoutees(router1).size should be(6) }
        val routees = currentRoutees(router1)
        routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(roles.map(address).toSet)
      }

      enterBarrier("after-3")
    }

    "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {
      runOn(first) {
        val router2 = system.actorOf(ClusterRouterPool(local = ConsistentHashingPool(nrOfInstances = 0),
          settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = None)).
          props(Props[Echo]),
          "router2")
        // it may take some time until router receives cluster member events
        awaitAssert { currentRoutees(router2).size should be(6) }
        val routees = currentRoutees(router2)
        routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(roles.map(address).toSet)
      }

      enterBarrier("after-4")
    }

    "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in {
      runOn(first) {
        def hashMapping: ConsistentHashMapping = {
          case s: String ⇒ s
        }

        val router3 = system.actorOf(ConsistentHashingPool(nrOfInstances = 0, hashMapping = hashMapping).props(Props[Echo]), "router3")

        assertHashMapping(router3)
      }

      enterBarrier("after-5")
    }

    "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in {
      runOn(first) {
        def hashMapping: ConsistentHashMapping = {
          case s: String ⇒ s
        }

        val router4 = system.actorOf(ClusterRouterPool(
          local = ConsistentHashingPool(nrOfInstances = 0, hashMapping = hashMapping),
          settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)).
          props(Props[Echo]),
          "router4")

        assertHashMapping(router4)
      }

      enterBarrier("after-6")
    }

    def assertHashMapping(router: ActorRef): Unit = {
      // it may take some time until router receives cluster member events
      awaitAssert { currentRoutees(router).size should be(6) }
      val routees = currentRoutees(router)
      routees.map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }.toSet should be(roles.map(address).toSet)

      router ! "a"
      val destinationA = expectMsgType[ActorRef]
      router ! "a"
      expectMsg(destinationA)
    }

  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ClusterConsistentHashingRouterSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.