alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (RemoteRoundRobinSpec.scala)

This example Akka source code file (RemoteRoundRobinSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

a, actor, actorref, akka, broadcast, concurrent, getroutees, int, longrunningtest, map, multinodeconfig, remoteroundrobinspec, resizer, route, router, routing

The RemoteRoundRobinSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.remote.routing

import language.postfixOps
import scala.collection.immutable
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.Address
import scala.concurrent.Await
import akka.pattern.ask
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.routing.Broadcast
import akka.routing.GetRoutees
import akka.routing.Routees
import akka.routing.RoundRobinPool
import akka.routing.RoundRobinGroup
import akka.routing.RoutedActorRef
import akka.routing.Resizer
import akka.routing.Routee
import akka.routing.FromConfig
import akka.testkit._
import scala.concurrent.duration._

object RemoteRoundRobinMultiJvmSpec extends MultiNodeConfig {

  class SomeActor extends Actor {
    def receive = {
      case "hit" ⇒ sender() ! self
    }
  }

  class TestResizer extends Resizer {
    override def isTimeForResize(messageCounter: Long): Boolean = messageCounter <= 10
    override def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int = 1
  }

  val first = role("first")
  val second = role("second")
  val third = role("third")
  val fourth = role("fourth")

  commonConfig(debugConfig(on = false))

  deployOnAll("""
      /service-hello {
        router = round-robin-pool
        nr-of-instances = 3
        target.nodes = ["@first@", "@second@", "@third@"]
      }

      /service-hello2 {
        router = round-robin-pool
        target.nodes = ["@first@", "@second@", "@third@"]
      }
      
      /service-hello3 {
        router = round-robin-group
        routees.paths = [
          "@first@/user/target-first",
          "@second@/user/target-second",
          "@third@/user/target-third"]
      }
    """)
}

class RemoteRoundRobinMultiJvmNode1 extends RemoteRoundRobinSpec
class RemoteRoundRobinMultiJvmNode2 extends RemoteRoundRobinSpec
class RemoteRoundRobinMultiJvmNode3 extends RemoteRoundRobinSpec
class RemoteRoundRobinMultiJvmNode4 extends RemoteRoundRobinSpec

class RemoteRoundRobinSpec extends MultiNodeSpec(RemoteRoundRobinMultiJvmSpec)
  with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
  import RemoteRoundRobinMultiJvmSpec._

  def initialParticipants = roles.size

  "A remote round robin pool" must {
    "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in {

      runOn(first, second, third) {
        enterBarrier("start", "broadcast-end", "end")
      }

      runOn(fourth) {
        enterBarrier("start")
        val actor = system.actorOf(RoundRobinPool(nrOfInstances = 0).props(Props[SomeActor]), "service-hello")
        actor.isInstanceOf[RoutedActorRef] should be(true)

        val connectionCount = 3
        val iterationCount = 10

        for (i ← 0 until iterationCount; k ← 0 until connectionCount) {
          actor ! "hit"
        }

        val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
          case ref: ActorRef ⇒ ref.path.address
        }).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
          case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1))
        }

        enterBarrier("broadcast-end")
        actor ! Broadcast(PoisonPill)

        enterBarrier("end")
        replies.values foreach { _ should be(iterationCount) }
        replies.get(node(fourth).address) should be(None)

        // shut down the actor before we let the other node(s) shut down so we don't try to send
        // "Terminate" to a shut down node
        system.stop(actor)
      }

      enterBarrier("done")
    }
  }

  "A remote round robin pool with resizer" must {
    "be locally instantiated on a remote node after several resize rounds" taggedAs LongRunningTest in within(5 seconds) {

      runOn(first, second, third) {
        enterBarrier("start", "broadcast-end", "end")
      }

      runOn(fourth) {
        enterBarrier("start")
        val actor = system.actorOf(RoundRobinPool(
          nrOfInstances = 1,
          resizer = Some(new TestResizer)).props(Props[SomeActor]), "service-hello2")
        actor.isInstanceOf[RoutedActorRef] should be(true)

        actor ! GetRoutees
        // initial nrOfInstances 1 + inital resize => 2
        expectMsgType[Routees].routees.size should be(2)

        val repliesFrom: Set[ActorRef] =
          (for (n ← 3 to 9) yield {
            // each message trigger a resize, incrementing number of routees with 1
            actor ! "hit"
            Await.result(actor ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees.size should be(n)
            expectMsgType[ActorRef]
          }).toSet

        enterBarrier("broadcast-end")
        actor ! Broadcast(PoisonPill)

        enterBarrier("end")
        repliesFrom.size should be(7)
        val repliesFromAddresses = repliesFrom.map(_.path.address)
        repliesFromAddresses should be(Set(node(first), node(second), node(third)).map(_.address))

        // shut down the actor before we let the other node(s) shut down so we don't try to send
        // "Terminate" to a shut down node
        system.stop(actor)
      }

      enterBarrier("done")
    }
  }

  "A remote round robin group" must {
    "send messages with actor selection to remote paths" taggedAs LongRunningTest in {

      runOn(first, second, third) {
        system.actorOf(Props[SomeActor], name = "target-" + myself.name)
        enterBarrier("start", "end")
      }

      runOn(fourth) {
        enterBarrier("start")
        val actor = system.actorOf(FromConfig.props(), "service-hello3")
        actor.isInstanceOf[RoutedActorRef] should be(true)

        val connectionCount = 3
        val iterationCount = 10

        for (i ← 0 until iterationCount; k ← 0 until connectionCount) {
          actor ! "hit"
        }

        val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
          case ref: ActorRef ⇒ ref.path.address
        }).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
          case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1))
        }

        enterBarrier("end")
        replies.values foreach { _ should be(iterationCount) }
        replies.get(node(fourth).address) should be(None)
      }

      enterBarrier("done")
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka RemoteRoundRobinSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.