alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (RouterDocSpec.scala)

This example Akka source code file (RouterDocSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, address, addressfromuristring, akka, broadcast, concurrent, kill, poisonpill, randompool, route, router, routerdocspec, routing, testing, work

The RouterDocSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package docs.routing

import scala.concurrent.duration._
import akka.testkit._
import akka.actor.{ ActorRef, Props, Actor }
import akka.actor.Terminated
import akka.routing.FromConfig
import akka.routing.RoundRobinPool
import akka.routing.RandomPool
import akka.routing.RoundRobinGroup
import akka.routing.SmallestMailboxPool
import akka.routing.BroadcastPool
import akka.routing.BroadcastGroup
import akka.routing.ConsistentHashingGroup
import akka.routing.ConsistentHashingPool
import akka.routing.DefaultResizer
import akka.routing.ScatterGatherFirstCompletedGroup
import akka.routing.RandomGroup
import akka.routing.ScatterGatherFirstCompletedPool
import akka.routing.BalancingPool

object RouterDocSpec {

  val config = """
#//#config-round-robin-pool
akka.actor.deployment {
  /parent/router1 {
    router = round-robin-pool
    nr-of-instances = 5
  }
}
#//#config-round-robin-pool

#//#config-round-robin-group
akka.actor.deployment {
  /parent/router3 {
    router = round-robin-group
    routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  }
}
#//#config-round-robin-group

#//#config-random-pool
akka.actor.deployment {
  /parent/router5 {
    router = random-pool
    nr-of-instances = 5
  }
}
#//#config-random-pool
    
#//#config-random-group
akka.actor.deployment {
  /parent/router7 {
    router = random-group
    routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  }
}
#//#config-random-group

#//#config-balancing-pool
akka.actor.deployment {
  /parent/router9 {
    router = balancing-pool
    nr-of-instances = 5
  }
}
#//#config-balancing-pool
    
#//#config-balancing-pool2
akka.actor.deployment {
  /parent/router9b {
    router = balancing-pool
    nr-of-instances = 5
    pool-dispatcher {
      attempt-teamwork = off
    }
  }
}
#//#config-balancing-pool2
    
#//#config-smallest-mailbox-pool
akka.actor.deployment {
  /parent/router11 {
    router = smallest-mailbox-pool
    nr-of-instances = 5
  }
}
#//#config-smallest-mailbox-pool
    
#//#config-broadcast-pool
akka.actor.deployment {
  /parent/router13 {
    router = broadcast-pool
    nr-of-instances = 5
  }
}
#//#config-broadcast-pool
    
#//#config-broadcast-group
akka.actor.deployment {
  /parent/router15 {
    router = broadcast-group
    routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  }
}
#//#config-broadcast-group

#//#config-scatter-gather-pool
akka.actor.deployment {
  /parent/router17 {
    router = scatter-gather-pool
    nr-of-instances = 5
    within = 10 seconds
  }
}
#//#config-scatter-gather-pool
    
#//#config-scatter-gather-group
akka.actor.deployment {
  /parent/router19 {
    router = scatter-gather-group
    routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
    within = 10 seconds
  }
}
#//#config-scatter-gather-group
    
#//#config-consistent-hashing-pool
akka.actor.deployment {
  /parent/router21 {
    router = consistent-hashing-pool
    nr-of-instances = 5
    virtual-nodes-factor = 10
  }
}
#//#config-consistent-hashing-pool
    
#//#config-consistent-hashing-group
akka.actor.deployment {
  /parent/router23 {
    router = consistent-hashing-group
    routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
    virtual-nodes-factor = 10
  }
}
#//#config-consistent-hashing-group

#//#config-remote-round-robin-pool
akka.actor.deployment {
  /parent/remotePool {
    router = round-robin-pool
    nr-of-instances = 10
    target.nodes = ["akka.tcp://app@10.0.0.2:2552", "akka://app@10.0.0.3:2552"]
  }
}
#//#config-remote-round-robin-pool
    
#//#config-remote-round-robin-group
akka.actor.deployment {
  /parent/remoteGroup {
    router = round-robin-group
    routees.paths = [
      "akka.tcp://app@10.0.0.1:2552/user/workers/w1", 
      "akka.tcp://app@10.0.0.2:2552/user/workers/w1",
      "akka.tcp://app@10.0.0.3:2552/user/workers/w1"]
  }
}
#//#config-remote-round-robin-group
    
#//#config-resize-pool
akka.actor.deployment {
  /parent/router25 {
    router = round-robin-pool
    resizer {
      lower-bound = 2
      upper-bound = 15
      messages-per-resize = 100
    }
  }
}
#//#config-resize-pool

#//#config-pool-dispatcher
akka.actor.deployment {
  /poolWithDispatcher {
    router = random-pool
    nr-of-instances = 5
    pool-dispatcher {
      fork-join-executor.parallelism-min = 5
      fork-join-executor.parallelism-max = 5
    }
  }
}
#//#config-pool-dispatcher
    
router-dispatcher {}
"""

  final case class Work(payload: String)

  //#router-in-actor
  import akka.routing.ActorRefRoutee
  import akka.routing.Router
  import akka.routing.RoundRobinRoutingLogic

  class Master extends Actor {
    var router = {
      val routees = Vector.fill(5) {
        val r = context.actorOf(Props[Worker])
        context watch r
        ActorRefRoutee(r)
      }
      Router(RoundRobinRoutingLogic(), routees)
    }

    def receive = {
      case w: Work =>
        router.route(w, sender())
      case Terminated(a) =>
        router = router.removeRoutee(a)
        val r = context.actorOf(Props[Worker])
        context watch r
        router = router.addRoutee(r)
    }
  }
  //#router-in-actor

  class Worker extends Actor {
    def receive = {
      case _ =>
    }
  }

  //#create-worker-actors
  class Workers extends Actor {
    context.actorOf(Props[Worker], name = "w1")
    context.actorOf(Props[Worker], name = "w2")
    context.actorOf(Props[Worker], name = "w3")
    // ...
    //#create-worker-actors

    def receive = {
      case _ =>
    }
  }

  class Parent extends Actor {

    //#paths
    val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
    //#paths

    //#round-robin-pool-1
    val router1: ActorRef =
      context.actorOf(FromConfig.props(Props[Worker]), "router1")
    //#round-robin-pool-1

    //#round-robin-pool-2
    val router2: ActorRef =
      context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")
    //#round-robin-pool-2

    //#round-robin-group-1
    val router3: ActorRef =
      context.actorOf(FromConfig.props(), "router3")
    //#round-robin-group-1

    //#round-robin-group-2
    val router4: ActorRef =
      context.actorOf(RoundRobinGroup(paths).props(), "router4")
    //#round-robin-group-2  

    //#random-pool-1
    val router5: ActorRef =
      context.actorOf(FromConfig.props(Props[Worker]), "router5")
    //#random-pool-1

    //#random-pool-2
    val router6: ActorRef =
      context.actorOf(RandomPool(5).props(Props[Worker]), "router6")
    //#random-pool-2

    //#random-group-1
    val router7: ActorRef =
      context.actorOf(FromConfig.props(), "router7")
    //#random-group-1

    //#random-group-2
    val router8: ActorRef =
      context.actorOf(RandomGroup(paths).props(), "router8")
    //#random-group-2

    //#balancing-pool-1
    val router9: ActorRef =
      context.actorOf(FromConfig.props(Props[Worker]), "router9")
    //#balancing-pool-1

    //#balancing-pool-2
    val router10: ActorRef =
      context.actorOf(BalancingPool(5).props(Props[Worker]), "router10")
    //#balancing-pool-2  

    //#smallest-mailbox-pool-1
    val router11: ActorRef =
      context.actorOf(FromConfig.props(Props[Worker]), "router11")
    //#smallest-mailbox-pool-1

    //#smallest-mailbox-pool-2
    val router12: ActorRef =
      context.actorOf(SmallestMailboxPool(5).props(Props[Worker]), "router12")
    //#smallest-mailbox-pool-2

    //#broadcast-pool-1
    val router13: ActorRef =
      context.actorOf(FromConfig.props(Props[Worker]), "router13")
    //#broadcast-pool-1

    //#broadcast-pool-2
    val router14: ActorRef =
      context.actorOf(BroadcastPool(5).props(Props[Worker]), "router14")
    //#broadcast-pool-2

    //#broadcast-group-1
    val router15: ActorRef =
      context.actorOf(FromConfig.props(), "router15")
    //#broadcast-group-1

    //#broadcast-group-2
    val router16: ActorRef =
      context.actorOf(BroadcastGroup(paths).props(), "router16")
    //#broadcast-group-2

    //#scatter-gather-pool-1
    val router17: ActorRef =
      context.actorOf(FromConfig.props(Props[Worker]), "router17")
    //#scatter-gather-pool-1

    //#scatter-gather-pool-2
    val router18: ActorRef =
      context.actorOf(ScatterGatherFirstCompletedPool(5, within = 10.seconds).
        props(Props[Worker]), "router18")
    //#scatter-gather-pool-2

    //#scatter-gather-group-1
    val router19: ActorRef =
      context.actorOf(FromConfig.props(), "router19")
    //#scatter-gather-group-1

    //#scatter-gather-group-2
    val router20: ActorRef =
      context.actorOf(ScatterGatherFirstCompletedGroup(paths,
        within = 10.seconds).props(), "router20")
    //#scatter-gather-group-2  

    //#consistent-hashing-pool-1
    val router21: ActorRef =
      context.actorOf(FromConfig.props(Props[Worker]), "router21")
    //#consistent-hashing-pool-1

    //#consistent-hashing-pool-2
    val router22: ActorRef =
      context.actorOf(ConsistentHashingPool(5).props(Props[Worker]),
        "router22")
    //#consistent-hashing-pool-2

    //#consistent-hashing-group-1
    val router23: ActorRef =
      context.actorOf(FromConfig.props(), "router23")
    //#consistent-hashing-group-1

    //#consistent-hashing-group-2
    val router24: ActorRef =
      context.actorOf(ConsistentHashingGroup(paths).props(), "router24")
    //#consistent-hashing-group-2  

    //#resize-pool-1
    val router25: ActorRef =
      context.actorOf(FromConfig.props(Props[Worker]), "router25")
    //#resize-pool-1

    //#resize-pool-2
    val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
    val router26: ActorRef =
      context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]),
        "router26")
    //#resize-pool-2  

    def receive = {
      case _ =>
    }

  }

  class Echo extends Actor {
    def receive = {
      case m => sender() ! m
    }
  }
}

class RouterDocSpec extends AkkaSpec(RouterDocSpec.config) with ImplicitSender {

  import RouterDocSpec._

  //#create-workers
  system.actorOf(Props[Workers], "workers")
  //#create-workers

  //#create-parent
  system.actorOf(Props[Parent], "parent")
  //#create-parent

  "demonstrate dispatcher" in {
    //#dispatchers
    val router: ActorRef = system.actorOf(
      // “head” router actor will run on "router-dispatcher" dispatcher
      // Worker routees will run on "pool-dispatcher" dispatcher  
      RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]),
      name = "poolWithDispatcher")
    //#dispatchers
  }

  "demonstrate broadcast" in {
    val router = system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[Echo]))
    //#broadcastDavyJonesWarning
    import akka.routing.Broadcast
    router ! Broadcast("Watch out for Davy Jones' locker")
    //#broadcastDavyJonesWarning
    receiveN(5, 5.seconds.dilated) should have length (5)
  }

  "demonstrate PoisonPill" in {
    val router = watch(system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[Echo])))
    //#poisonPill
    import akka.actor.PoisonPill
    router ! PoisonPill
    //#poisonPill
    expectTerminated(router)
  }

  "demonstrate broadcast of PoisonPill" in {
    val router = watch(system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[Echo])))
    //#broadcastPoisonPill
    import akka.actor.PoisonPill
    import akka.routing.Broadcast
    router ! Broadcast(PoisonPill)
    //#broadcastPoisonPill
    expectTerminated(router)
  }

  "demonstrate Kill" in {
    val router = watch(system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[Echo])))
    //#kill
    import akka.actor.Kill
    router ! Kill
    //#kill
    expectTerminated(router)
  }

  "demonstrate broadcast of Kill" in {
    val router = watch(system.actorOf(RoundRobinPool(nrOfInstances = 5).props(Props[Echo])))
    //#broadcastKill
    import akka.actor.Kill
    import akka.routing.Broadcast
    router ! Broadcast(Kill)
    //#broadcastKill
    expectTerminated(router)
  }

  "demonstrate remote deploy" in {
    //#remoteRoutees
    import akka.actor.{ Address, AddressFromURIString }
    import akka.remote.routing.RemoteRouterConfig
    val addresses = Seq(
      Address("akka.tcp", "remotesys", "otherhost", 1234),
      AddressFromURIString("akka.tcp://othersys@anotherhost:1234"))
    val routerRemote = system.actorOf(
      RemoteRouterConfig(RoundRobinPool(5), addresses).props(Props[Echo]))
    //#remoteRoutees
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka RouterDocSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.