alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (BalancingSpec.scala)

This example Akka source code file (BalancingSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, akkaspec, balancingspec, beforeandaftereach, concurrent, implicitsender, props, test, testing, testlatch, time, unit, worker

The BalancingSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.routing

import language.postfixOps
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.{ Props, Actor }
import akka.testkit.{ TestLatch, ImplicitSender, AkkaSpec }
import akka.actor.ActorRef
import org.scalatest.BeforeAndAfterEach

object BalancingSpec {
  val counter = new AtomicInteger(1)

  class Worker(latch: TestLatch) extends Actor {
    lazy val id = counter.getAndIncrement()
    def receive = {
      case msg ⇒
        if (id == 1) Thread.sleep(10) // dispatch to other routees
        else Await.ready(latch, 1.minute)
        sender() ! id
    }
  }
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class BalancingSpec extends AkkaSpec(
  """
    akka.actor.deployment {
      /balancingPool-2 {
        router = balancing-pool
        nr-of-instances = 5
        pool-dispatcher {
          attempt-teamwork = on
        }
      }
      /balancingPool-3 {
        router = balancing-pool
        nr-of-instances = 5
        pool-dispatcher {
          attempt-teamwork = on
        }
      }
    }
    """) with ImplicitSender with BeforeAndAfterEach {
  import BalancingSpec._

  val poolSize = 5 // must be less than fork-join parallelism-min, which is 8 in AkkaSpec

  override def beforeEach(): Unit = {
    counter.set(1)
  }

  def test(pool: ActorRef, latch: TestLatch): Unit = {
    val iterationCount = 100

    for (i ← 1 to iterationCount) {
      pool ! "hit-" + i
    }

    // all but one worker are blocked
    val replies1 = receiveN(iterationCount - poolSize + 1)
    expectNoMsg(1.second)
    // all replies from the unblocked worker so far
    replies1.toSet should be(Set(1))

    latch.countDown()
    val replies2 = receiveN(poolSize - 1)
    // the remaining replies come from the blocked 
    replies2.toSet should be((2 to poolSize).toSet)
    expectNoMsg(500.millis)

  }

  "balancing pool" must {

    "deliver messages in a balancing fashion when defined programatically" in {
      val latch = TestLatch(1)
      val pool = system.actorOf(BalancingPool(poolSize).props(routeeProps =
        Props(classOf[Worker], latch)), name = "balancingPool-1")
      test(pool, latch)
    }

    "deliver messages in a balancing fashion when defined in config" in {
      val latch = TestLatch(1)
      val pool = system.actorOf(FromConfig().props(routeeProps =
        Props(classOf[Worker], latch)), name = "balancingPool-2")
      test(pool, latch)
    }

    "deliver messages in a balancing fashion when overridden in config" in {
      val latch = TestLatch(1)
      val pool = system.actorOf(BalancingPool(1).props(routeeProps =
        Props(classOf[Worker], latch)), name = "balancingPool-3")
      test(pool, latch)
    }

  }
}

Other Akka source code examples

Here is a short list of links related to this Akka BalancingSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.