alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (MultiNodeSpec.scala)

This example Akka source code file (MultiNodeSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorsystem, akka, boolean, concurrent, config, illegalstateexception, int, map, rolename, string, test, testing, time, unit, vector

The MultiNodeSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.remote.testkit

import language.implicitConversions
import language.postfixOps
import java.net.{ InetAddress, InetSocketAddress }
import java.util.concurrent.TimeoutException
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
import scala.concurrent.{ Await, Awaitable }
import scala.util.control.NonFatal
import scala.collection.immutable
import akka.actor._
import akka.util.Timeout
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
import akka.remote.RemoteActorRefProvider
import akka.testkit._
import akka.testkit.TestEvent._
import scala.concurrent.duration._
import akka.remote.testconductor.RoleName
import akka.actor.RootActorPath
import akka.event.{ Logging, LoggingAdapter }

/**
 * Configure the role names and participants of the test, including configuration settings.
 */
abstract class MultiNodeConfig {

  private var _commonConf: Option[Config] = None
  private var _nodeConf = Map[RoleName, Config]()
  private var _roles = Vector[RoleName]()
  private var _deployments = Map[RoleName, immutable.Seq[String]]()
  private var _allDeploy = Vector[String]()
  private var _testTransport = false

  /**
   * Register a common base config for all test participants, if so desired.
   */
  def commonConfig(config: Config): Unit = _commonConf = Some(config)

  /**
   * Register a config override for a specific participant.
   */
  def nodeConfig(roles: RoleName*)(configs: Config*): Unit = {
    val c = configs.reduceLeft(_ withFallback _)
    _nodeConf ++= roles map { _ -> c }
  }

  /**
   * Include for verbose debug logging
   * @param on when `true` debug Config is returned, otherwise config with info logging
   */
  def debugConfig(on: Boolean): Config =
    if (on)
      ConfigFactory.parseString("""
        akka.loglevel = DEBUG
        akka.remote {
          log-received-messages = on
          log-sent-messages = on
        }
        akka.actor.debug {
          receive = on
          fsm = on
        }
        akka.remote.log-remote-lifecycle-events = on
        """)
    else
      ConfigFactory.empty

  /**
   * Construct a RoleName and return it, to be used as an identifier in the
   * test. Registration of a role name creates a role which then needs to be
   * filled.
   */
  def role(name: String): RoleName = {
    if (_roles exists (_.name == name)) throw new IllegalArgumentException("non-unique role name " + name)
    val r = RoleName(name)
    _roles :+= r
    r
  }

  def deployOn(role: RoleName, deployment: String): Unit =
    _deployments += role -> ((_deployments get role getOrElse Vector()) :+ deployment)

  def deployOnAll(deployment: String): Unit = _allDeploy :+= deployment

  /**
   * To be able to use `blackhole`, `passThrough`, and `throttle` you must
   * activate the failure injector and throttler transport adapters by
   * specifying `testTransport(on = true)` in your MultiNodeConfig.
   */
  def testTransport(on: Boolean): Unit = _testTransport = on

  private[testkit] lazy val myself: RoleName = {
    require(_roles.size > MultiNodeSpec.selfIndex, "not enough roles declared for this test")
    _roles(MultiNodeSpec.selfIndex)
  }

  private[testkit] def config: Config = {
    val transportConfig =
      if (_testTransport) ConfigFactory.parseString(
        """
           akka.remote.netty.tcp.applied-adapters = [trttl, gremlin]
        """)
      else ConfigFactory.empty

    val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: transportConfig :: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil
    configs reduceLeft (_ withFallback _)
  }

  private[testkit] def deployments(node: RoleName): immutable.Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy

  private[testkit] def roles: immutable.Seq[RoleName] = _roles

}

object MultiNodeSpec {

  /**
   * Number of nodes node taking part in this test.
   *
   * {{{
   * -Dmultinode.max-nodes=4
   * }}}
   */
  val maxNodes: Int = Option(Integer.getInteger("multinode.max-nodes")) getOrElse
    (throw new IllegalStateException("need system property multinode.max-nodes to be set"))

  require(maxNodes > 0, "multinode.max-nodes must be greater than 0")

  /**
   * Name (or IP address; must be resolvable using InetAddress.getByName)
   * of the host this node is running on.
   *
   * {{{
   * -Dmultinode.host=host.example.com
   * }}}
   *
   * InetAddress.getLocalHost.getHostAddress is used if empty or "localhost"
   * is defined as system property "multinode.host".
   */
  val selfName: String = Option(System.getProperty("multinode.host")) match {
    case None       ⇒ throw new IllegalStateException("need system property multinode.host to be set")
    case Some("")   ⇒ InetAddress.getLocalHost.getHostAddress
    case Some(host) ⇒ host
  }

  require(selfName != "", "multinode.host must not be empty")

  /**
   * Port number of this node. Defaults to 0 which means a random port.
   *
   * {{{
   * -Dmultinode.port=0
   * }}}
   */
  val selfPort: Int = Integer.getInteger("multinode.port", 0)

  require(selfPort >= 0 && selfPort < 65535, "multinode.port is out of bounds: " + selfPort)

  /**
   * Name (or IP address; must be resolvable using InetAddress.getByName)
   * of the host that the server node is running on.
   *
   * {{{
   * -Dmultinode.server-host=server.example.com
   * }}}
   */
  val serverName: String = Option(System.getProperty("multinode.server-host")) getOrElse
    (throw new IllegalStateException("need system property multinode.server-host to be set"))

  require(serverName != "", "multinode.server-host must not be empty")

  /**
   * Port number of the node that's running the server system. Defaults to 4711.
   *
   * {{{
   * -Dmultinode.server-port=4711
   * }}}
   */
  val serverPort: Int = Integer.getInteger("multinode.server-port", 4711)

  require(serverPort > 0 && serverPort < 65535, "multinode.server-port is out of bounds: " + serverPort)

  /**
   * Index of this node in the roles sequence. The TestConductor
   * is started in “controller” mode on selfIndex 0, i.e. there you can inject
   * failures and shutdown other nodes etc.
   *
   * {{{
   * -Dmultinode.index=0
   * }}}
   */
  val selfIndex = Option(Integer.getInteger("multinode.index")) getOrElse
    (throw new IllegalStateException("need system property multinode.index to be set"))

  require(selfIndex >= 0 && selfIndex < maxNodes, "multinode.index is out of bounds: " + selfIndex)

  private[testkit] val nodeConfig = mapToConfig(Map(
    "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
    "akka.remote.netty.tcp.hostname" -> selfName,
    "akka.remote.netty.tcp.port" -> selfPort))

  private[testkit] val baseConfig: Config = ConfigFactory.parseString("""
      akka {
        loggers = ["akka.testkit.TestEventListener"]
        loglevel = "WARNING"
        stdout-loglevel = "WARNING"
        actor {
          default-dispatcher {
            executor = "fork-join-executor"
            fork-join-executor {
              parallelism-min = 8
              parallelism-factor = 2.0
              parallelism-max = 8
            }
          }
        }
      }
      """)

  private def mapToConfig(map: Map[String, Any]): Config = {
    import scala.collection.JavaConverters._
    ConfigFactory.parseMap(map.asJava)
  }

  private def getCallerName(clazz: Class[_]): String = {
    val s = Thread.currentThread.getStackTrace map (_.getClassName) drop 1 dropWhile (_ matches ".*MultiNodeSpec.?$")
    val reduced = s.lastIndexWhere(_ == clazz.getName) match {
      case -1 ⇒ s
      case z  ⇒ s drop (z + 1)
    }
    reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_")
  }
}

/**
 * Note: To be able to run tests with everything ignored or excluded by tags
 * you must not use `testconductor`, or helper methods that use `testconductor`,
 * from the constructor of your test class. Otherwise the controller node might
 * be shutdown before other nodes have completed and you will see errors like:
 * `AskTimeoutException: sending to terminated ref breaks promises`. Using lazy
 * val is fine.
 */
abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: immutable.Seq[RoleName], deployments: RoleName ⇒ Seq[String])
  extends TestKit(_system) with MultiNodeSpecCallbacks {

  import MultiNodeSpec._

  def this(config: MultiNodeConfig) =
    this(config.myself, ActorSystem(MultiNodeSpec.getCallerName(classOf[MultiNodeSpec]), ConfigFactory.load(config.config)),
      config.roles, config.deployments)

  val log: LoggingAdapter = Logging(system, this.getClass)

  /**
   * Enrich `.await()` onto all Awaitables, using remaining duration from the innermost
   * enclosing `within` block or QueryTimeout.
   */
  implicit def awaitHelper[T](w: Awaitable[T]) = new AwaitHelper(w)
  class AwaitHelper[T](w: Awaitable[T]) {
    def await: T = Await.result(w, remainingOr(testConductor.Settings.QueryTimeout.duration))
  }

  final override def multiNodeSpecBeforeAll {
    atStartup()
  }

  final override def multiNodeSpecAfterAll {
    // wait for all nodes to remove themselves before we shut the conductor down
    if (selfIndex == 0) {
      testConductor.removeNode(myself)
      within(testConductor.Settings.BarrierTimeout.duration) {
        awaitCond {
          // Await.result(testConductor.getNodes, remaining).filterNot(_ == myself).isEmpty
          testConductor.getNodes.await.filterNot(_ == myself).isEmpty
        }
      }
    }
    shutdown(system)
    afterTermination()
  }

  def shutdownTimeout: FiniteDuration = 5.seconds.dilated

  /**
   * Override this and return `true` to assert that the
   * shutdown of the `ActorSystem` was done properly.
   */
  def verifySystemShutdown: Boolean = false

  /*
  * Test Class Interface
  */

  /**
   * Override this method to do something when the whole test is starting up.
   */
  protected def atStartup(): Unit = ()

  /**
   * Override this method to do something when the whole test is terminating.
   */
  protected def afterTermination(): Unit = ()

  /**
   * All registered roles
   */
  def roles: immutable.Seq[RoleName] = _roles

  /**
   * TO BE DEFINED BY USER: Defines the number of participants required for starting the test. This
   * might not be equals to the number of nodes available to the test.
   *
   * Must be a `def`:
   * {{{
   * def initialParticipants = 5
   * }}}
   */
  def initialParticipants: Int
  require(initialParticipants > 0, "initialParticipants must be a 'def' or early initializer, and it must be greater zero")
  require(initialParticipants <= maxNodes, "not enough nodes to run this test")

  /**
   * Access to the barriers, failure injection, etc. The extension will have
   * been started either in Conductor or Player mode when the constructor of
   * MultiNodeSpec finishes, i.e. do not call the start*() methods yourself!
   */
  var testConductor: TestConductorExt = null

  /**
   * Execute the given block of code only on the given nodes (names according
   * to the `roleMap`).
   */
  def runOn(nodes: RoleName*)(thunk: ⇒ Unit): Unit = {
    if (isNode(nodes: _*)) {
      thunk
    }
  }

  /**
   * Verify that the running node matches one of the given nodes
   */
  def isNode(nodes: RoleName*): Boolean = nodes contains myself

  /**
   * Enter the named barriers in the order given. Use the remaining duration from
   * the innermost enclosing `within` block or the default `BarrierTimeout`
   */
  def enterBarrier(name: String*): Unit =
    testConductor.enter(
      Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)),
      name.to[immutable.Seq])

  /**
   * Query the controller for the transport address of the given node (by role name) and
   * return that as an ActorPath for easy composition:
   *
   * {{{
   * val serviceA = system.actorSelection(node("master") / "user" / "serviceA")
   * }}}
   */
  def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await)

  def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit =
    if (!sys.log.isDebugEnabled) {
      def mute(clazz: Class[_]): Unit =
        sys.eventStream.publish(Mute(DeadLettersFilter(clazz)(occurrences = Int.MaxValue)))
      if (messageClasses.isEmpty) mute(classOf[AnyRef])
      else messageClasses foreach mute
    }

  /*
   * Implementation (i.e. wait for start etc.)
   */

  private val controllerAddr = new InetSocketAddress(serverName, serverPort)

  protected def attachConductor(tc: TestConductorExt): Unit = {
    val timeout = tc.Settings.BarrierTimeout.duration
    val startFuture =
      if (selfIndex == 0) tc.startController(initialParticipants, myself, controllerAddr)
      else tc.startClient(myself, controllerAddr)
    try Await.result(startFuture, timeout)
    catch {
      case NonFatal(x) ⇒ throw new RuntimeException("failure while attaching new conductor", x)
    }
    testConductor = tc
  }

  attachConductor(TestConductor(system))

  // now add deployments, if so desired

  private final case class Replacement(tag: String, role: RoleName) {
    lazy val addr = node(role).address.toString
  }

  private val replacements = roles map (r ⇒ Replacement("@" + r.name + "@", r))

  protected def injectDeployments(sys: ActorSystem, role: RoleName): Unit = {
    val deployer = sys.asInstanceOf[ExtendedActorSystem].provider.deployer
    deployments(role) foreach { str ⇒
      val deployString = (str /: replacements) {
        case (base, r @ Replacement(tag, _)) ⇒
          base.indexOf(tag) match {
            case -1 ⇒ base
            case start ⇒
              val replaceWith = try
                r.addr
              catch {
                case NonFatal(e) ⇒
                  // might happen if all test cases are ignored (excluded) and
                  // controller node is finished/exited before r.addr is run
                  // on the other nodes
                  val unresolved = "akka://unresolved-replacement-" + r.role.name
                  log.warning(unresolved + " due to: " + e.getMessage)
                  unresolved
              }
              base.replace(tag, replaceWith)
          }
      }
      import scala.collection.JavaConverters._
      ConfigFactory.parseString(deployString).root.asScala foreach {
        case (key, value: ConfigObject) ⇒ deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
        case (key, x)                   ⇒ throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
      }
    }
  }

  injectDeployments(system, myself)

  protected val myAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress

  // useful to see which jvm is running which role, used by LogRoleReplace utility
  log.info("Role [{}] started with address [{}]", myself.name, myAddress)

  /**
   * This method starts a new ActorSystem with the same configuration as the
   * previous one on the current node, including deployments. It also creates
   * a new TestConductor client and registers itself with the conductor so
   * that it is possible to use barriers etc. normally after this method has
   * been called.
   *
   * NOTICE: you MUST start a new system before trying to enter a barrier or
   * otherwise using the TestConductor after having terminated this node’s
   * system.
   */
  protected def startNewSystem(): ActorSystem = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp{port=${myAddress.port.get}\nhostname=${myAddress.host.get}}")
      .withFallback(system.settings.config)
    val sys = ActorSystem(system.name, config)
    injectDeployments(sys, myself)
    attachConductor(TestConductor(sys))
    sys
  }
}

/**
 * Use this to hook MultiNodeSpec into your test framework lifecycle, either by having your test extend MultiNodeSpec
 * and call these methods or by creating a trait that calls them and then mixing that trait with your test together
 * with MultiNodeSpec.
 *
 * Example trait for MultiNodeSpec with ScalaTest
 *
 * {{{
 * trait STMultiNodeSpec extends MultiNodeSpecCallbacks with WordSpecLike with MustMatchers with BeforeAndAfterAll {
 *   override def beforeAll() = multiNodeSpecBeforeAll()
 *   override def afterAll() = multiNodeSpecAfterAll()
 * }
 * }}}
 */
trait MultiNodeSpecCallbacks {
  /**
   * Call this before the start of the test run. NOT before every test case.
   */
  def multiNodeSpecBeforeAll(): Unit

  /**
   * Call this after the all test cases have run. NOT after every test case.
   */
  def multiNodeSpecAfterAll(): Unit
}

Other Akka source code examples

Here is a short list of links related to this Akka MultiNodeSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.