alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (RoutingSpec.scala)

This example Akka source code file (RoutingSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actor1, akka, boolean, concurrent, eventfilter, fromconfig, getroutees, int, oneforonestrategy, props, route, router, routing, testing, testlatch

The RoutingSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.routing

import language.postfixOps

import akka.actor._
import scala.collection.immutable
import akka.testkit._
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.ConfigurationException
import com.typesafe.config.ConfigFactory
import akka.pattern.{ ask, pipe }
import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config
import akka.dispatch.Dispatchers
import akka.util.Collections.EmptyImmutableSeq
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger
import akka.routing._

object RoutingSpec {

  val config = """
    akka.actor.serialize-messages = off
    akka.actor.deployment {
      /router1 {
        router = round-robin-pool
        nr-of-instances = 3
      }
      /router2 {
        router = round-robin-pool
        nr-of-instances = 3
      }
      /router3 {
        router = round-robin-pool
        nr-of-instances = 0
      }
    }
    """

  class TestActor extends Actor {
    def receive = { case _ ⇒ }
  }

  class Echo extends Actor {
    def receive = {
      case _ ⇒ sender() ! self
    }
  }

}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with ImplicitSender {
  implicit val ec = system.dispatcher
  import RoutingSpec._

  muteDeadLetters(classOf[akka.dispatch.sysmsg.DeathWatchNotification])()

  "routers in general" must {

    "evict terminated routees" in {
      val router = system.actorOf(RoundRobinPool(2).props(routeeProps = Props[Echo]))
      router ! ""
      router ! ""
      val c1, c2 = expectMsgType[ActorRef]
      watch(router)
      watch(c2)
      system.stop(c2)
      expectTerminated(c2).existenceConfirmed should be(true)
      // it might take a while until the Router has actually processed the Terminated message
      awaitCond {
        router ! ""
        router ! ""
        val res = receiveWhile(100 millis, messages = 2) {
          case x: ActorRef ⇒ x
        }
        res == Seq(c1, c1)
      }
      system.stop(c1)
      expectTerminated(router).existenceConfirmed should be(true)
    }

    "not terminate when resizer is used" in {
      val latch = TestLatch(1)
      val resizer = new Resizer {
        def isTimeForResize(messageCounter: Long): Boolean = messageCounter == 0
        def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int = {
          latch.countDown()
          2
        }
      }
      val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)).props(
        routeeProps = Props[TestActor]))
      watch(router)
      Await.ready(latch, remainingOrDefault)
      router ! GetRoutees
      val routees = expectMsgType[Routees].routees
      routees.size should be(2)
      routees foreach { _.send(PoisonPill, testActor) }
      // expect no Terminated
      expectNoMsg(2.seconds)
    }

    "use configured nr-of-instances when FromConfig" in {
      val router = system.actorOf(FromConfig.props(routeeProps = Props[TestActor]), "router1")
      router ! GetRoutees
      expectMsgType[Routees].routees.size should be(3)
      watch(router)
      system.stop(router)
      expectTerminated(router)
    }

    "use configured nr-of-instances when router is specified" in {
      val router = system.actorOf(RoundRobinPool(nrOfInstances = 2).props(routeeProps = Props[TestActor]), "router2")
      router ! GetRoutees
      expectMsgType[Routees].routees.size should be(3)
      system.stop(router)
    }

    "use specified resizer when resizer not configured" in {
      val latch = TestLatch(1)
      val resizer = new Resizer {
        def isTimeForResize(messageCounter: Long): Boolean = messageCounter == 0
        def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int = {
          latch.countDown()
          3
        }
      }
      val router = system.actorOf(RoundRobinPool(nrOfInstances = 0, resizer = Some(resizer)).props(
        routeeProps = Props[TestActor]), "router3")
      Await.ready(latch, remainingOrDefault)
      router ! GetRoutees
      expectMsgType[Routees].routees.size should be(3)
      system.stop(router)
    }

    "set supplied supervisorStrategy" in {
      //#supervision
      val escalator = OneForOneStrategy() {
        //#custom-strategy
        case e ⇒ testActor ! e; SupervisorStrategy.Escalate
        //#custom-strategy
      }
      val router = system.actorOf(RoundRobinPool(1, supervisorStrategy = escalator).props(
        routeeProps = Props[TestActor]))
      //#supervision
      router ! GetRoutees
      EventFilter[ActorKilledException](occurrences = 1) intercept {
        expectMsgType[Routees].routees.head.send(Kill, testActor)
      }
      expectMsgType[ActorKilledException]

      val router2 = system.actorOf(RoundRobinPool(1).withSupervisorStrategy(escalator).props(
        routeeProps = Props[TestActor]))
      router2 ! GetRoutees
      EventFilter[ActorKilledException](occurrences = 1) intercept {
        expectMsgType[Routees].routees.head.send(Kill, testActor)
      }
      expectMsgType[ActorKilledException]
    }

    "set supplied supervisorStrategy for FromConfig" in {
      val escalator = OneForOneStrategy() {
        case e ⇒ testActor ! e; SupervisorStrategy.Escalate
      }
      val router = system.actorOf(FromConfig.withSupervisorStrategy(escalator).props(
        routeeProps = Props[TestActor]), "router1")
      router ! GetRoutees
      EventFilter[ActorKilledException](occurrences = 1) intercept {
        expectMsgType[Routees].routees.head.send(Kill, testActor)
      }
      expectMsgType[ActorKilledException]
    }

    "default to all-for-one-always-escalate strategy" in {
      val restarter = OneForOneStrategy() {
        case e ⇒ testActor ! e; SupervisorStrategy.Restart
      }
      val supervisor = system.actorOf(Props(new Supervisor(restarter)))
      supervisor ! RoundRobinPool(3).props(routeeProps = Props(new Actor {
        def receive = {
          case x: String ⇒ throw new Exception(x)
        }
        override def postRestart(reason: Throwable): Unit = testActor ! "restarted"
      }))
      val router = expectMsgType[ActorRef]
      EventFilter[Exception]("die", occurrences = 1) intercept {
        router ! "die"
      }
      expectMsgType[Exception].getMessage should be("die")
      expectMsg("restarted")
      expectMsg("restarted")
      expectMsg("restarted")
    }

    "start in-line for context.actorOf()" in {
      system.actorOf(Props(new Actor {
        def receive = {
          case "start" ⇒
            context.actorOf(RoundRobinPool(2).props(routeeProps = Props(new Actor {
              def receive = { case x ⇒ sender() ! x }
            }))) ? "hello" pipeTo sender()
        }
      })) ! "start"
      expectMsg("hello")
    }

  }

  "no router" must {

    "send message to connection" in {
      class Actor1 extends Actor {
        def receive = {
          case msg ⇒ testActor forward msg
        }
      }

      val routedActor = system.actorOf(NoRouter.props(routeeProps = Props(new Actor1)))
      routedActor ! "hello"
      routedActor ! "end"

      expectMsg("hello")
      expectMsg("end")
    }
  }

  "router FromConfig" must {
    "throw suitable exception when not configured" in {
      val e = intercept[ConfigurationException] {
        system.actorOf(FromConfig.props(routeeProps = Props[TestActor]), "routerNotDefined")
      }
      e.getMessage should include("routerNotDefined")
    }

    "allow external configuration" in {
      val sys = ActorSystem("FromConfig", ConfigFactory
        .parseString("akka.actor.deployment./routed.router=round-robin-pool")
        .withFallback(system.settings.config))
      try {
        sys.actorOf(FromConfig.props(routeeProps = Props[TestActor]), "routed")
      } finally {
        shutdown(sys)
      }
    }

  }

}

Other Akka source code examples

Here is a short list of links related to this Akka RoutingSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.