|
Akka/Scala example source code file (StressSpec.scala)
The StressSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicReference
import org.scalatest.BeforeAndAfterEach
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.Deploy
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.RootActorPath
import akka.actor.SupervisorStrategy._
import akka.actor.Terminated
import akka.cluster.ClusterEvent.ClusterMetricsChanged
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.CurrentInternalStats
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.StandardMetrics.Cpu
import akka.cluster.StandardMetrics.HeapMemory
import akka.remote.DefaultFailureDetectorRegistry
import akka.remote.PhiAccrualFailureDetector
import akka.remote.RemoteScope
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.routing.FromConfig
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Identify
import akka.actor.ActorIdentity
import akka.util.Helpers.ConfigOps
import akka.util.Helpers.Requiring
import java.lang.management.ManagementFactory
/**
* This test is intended to be used as long running stress test
* of cluster related features. Number of nodes and duration of
* the test steps can be configured. The test scenario is organized as
* follows:
* 1. join nodes in various ways up to the configured total number of nodes
* 2 while nodes are joining a few cluster aware routers are also working
* 3. exercise concurrent joining and shutdown of nodes repeatedly
* 4. exercise cluster aware routers, including high throughput
* 5. exercise many actors in a tree structure
* 6. exercise remote supervision
* 7. gossip without any changes to the membership
* 8. leave and shutdown nodes in various ways
* 9. while nodes are removed remote death watch is also exercised
* 10. while nodes are removed a few cluster aware routers are also working
*
* By default it uses 13 nodes.
* Example of sbt command line parameters to double that:
* `-DMultiJvm.akka.cluster.Stress.nrOfNodes=26 -Dmultinode.Dakka.test.cluster-stress-spec.nr-of-nodes-factor=2`
*/
private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
val totalNumberOfNodes =
System.getProperty("MultiJvm.akka.cluster.Stress.nrOfNodes") match {
case null ⇒ 13
case value ⇒ value.toInt requiring (_ >= 10, "nrOfNodes should be >= 10")
}
for (n ← 1 to totalNumberOfNodes) role("node-" + n)
// Note that this test uses default configuration,
// not MultiNodeClusterSpec.clusterConfig
commonConfig(ConfigFactory.parseString("""
akka.test.cluster-stress-spec {
infolog = off
# scale the nr-of-nodes* settings with this factor
nr-of-nodes-factor = 1
# not scaled
nr-of-seed-nodes = 3
nr-of-nodes-joining-to-seed-initally = 2
nr-of-nodes-joining-one-by-one-small = 2
nr-of-nodes-joining-one-by-one-large = 2
nr-of-nodes-joining-to-one = 2
nr-of-nodes-leaving-one-by-one-small = 1
nr-of-nodes-leaving-one-by-one-large = 2
nr-of-nodes-leaving = 2
nr-of-nodes-shutdown-one-by-one-small = 1
nr-of-nodes-shutdown-one-by-one-large = 2
nr-of-nodes-shutdown = 2
nr-of-nodes-join-remove = 2
# not scaled
# scale the *-duration settings with this factor
duration-factor = 1
join-remove-duration = 90s
work-batch-size = 100
work-batch-interval = 2s
payload-size = 1000
normal-throughput-duration = 30s
high-throughput-duration = 10s
supervision-duration = 10s
supervision-one-iteration = 2.5s
idle-gossip-duration = 10s
expected-test-duration = 600s
# actors are created in a tree structure defined
# by tree-width (number of children for each actor) and
# tree-levels, total number of actors can be calculated by
# (width * math.pow(width, levels) - 1) / (width - 1)
tree-width = 4
tree-levels = 4
report-metrics-interval = 10s
# scale convergence within timeouts with this factor
convergence-within-factor = 1.0
# set to off to only test cluster membership
exercise-actors = on
}
akka.actor.serialize-messages = off
akka.actor.serialize-creators = off
akka.actor.provider = akka.cluster.ClusterActorRefProvider
akka.cluster {
auto-down-unreachable-after = 1s
publish-stats-interval = 1s
}
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
akka.actor.default-dispatcher.fork-join-executor {
parallelism-min = 8
parallelism-max = 8
}
akka.actor.deployment {
/master-node-1/workers {
router = round-robin-pool
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 1
allow-local-routees = on
}
}
/master-node-2/workers {
router = round-robin-group
nr-of-instances = 100
routees.paths = ["/user/worker"]
cluster {
enabled = on
allow-local-routees = on
}
}
/master-node-3/workers = {
router = adaptive-pool
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 1
allow-local-routees = on
}
}
}
"""))
class Settings(conf: Config) {
private val testConfig = conf.getConfig("akka.test.cluster-stress-spec")
import testConfig._
val infolog = getBoolean("infolog")
val nFactor = getInt("nr-of-nodes-factor")
val numberOfSeedNodes = getInt("nr-of-seed-nodes") // not scaled by nodes factor
val numberOfNodesJoiningToSeedNodesInitially = getInt("nr-of-nodes-joining-to-seed-initally") * nFactor
val numberOfNodesJoiningOneByOneSmall = getInt("nr-of-nodes-joining-one-by-one-small") * nFactor
val numberOfNodesJoiningOneByOneLarge = getInt("nr-of-nodes-joining-one-by-one-large") * nFactor
val numberOfNodesJoiningToOneNode = getInt("nr-of-nodes-joining-to-one") * nFactor
// remaining will join to seed nodes
val numberOfNodesJoiningToSeedNodes = (totalNumberOfNodes - numberOfSeedNodes -
numberOfNodesJoiningToSeedNodesInitially - numberOfNodesJoiningOneByOneSmall -
numberOfNodesJoiningOneByOneLarge - numberOfNodesJoiningToOneNode) requiring (_ >= 0,
s"too many configured nr-of-nodes-joining-*, total should be <= ${totalNumberOfNodes}")
val numberOfNodesLeavingOneByOneSmall = getInt("nr-of-nodes-leaving-one-by-one-small") * nFactor
val numberOfNodesLeavingOneByOneLarge = getInt("nr-of-nodes-leaving-one-by-one-large") * nFactor
val numberOfNodesLeaving = getInt("nr-of-nodes-leaving") * nFactor
val numberOfNodesShutdownOneByOneSmall = getInt("nr-of-nodes-shutdown-one-by-one-small") * nFactor
val numberOfNodesShutdownOneByOneLarge = getInt("nr-of-nodes-shutdown-one-by-one-large") * nFactor
val numberOfNodesShutdown = getInt("nr-of-nodes-shutdown") * nFactor
val numberOfNodesJoinRemove = getInt("nr-of-nodes-join-remove") // not scaled by nodes factor
val workBatchSize = getInt("work-batch-size")
val workBatchInterval = testConfig.getMillisDuration("work-batch-interval")
val payloadSize = getInt("payload-size")
val dFactor = getInt("duration-factor")
val joinRemoveDuration = testConfig.getMillisDuration("join-remove-duration") * dFactor
val normalThroughputDuration = testConfig.getMillisDuration("normal-throughput-duration") * dFactor
val highThroughputDuration = testConfig.getMillisDuration("high-throughput-duration") * dFactor
val supervisionDuration = testConfig.getMillisDuration("supervision-duration") * dFactor
val supervisionOneIteration = testConfig.getMillisDuration("supervision-one-iteration") * dFactor
val idleGossipDuration = testConfig.getMillisDuration("idle-gossip-duration") * dFactor
val expectedTestDuration = testConfig.getMillisDuration("expected-test-duration") * dFactor
val treeWidth = getInt("tree-width")
val treeLevels = getInt("tree-levels")
val reportMetricsInterval = testConfig.getMillisDuration("report-metrics-interval")
val convergenceWithinFactor = getDouble("convergence-within-factor")
val exerciseActors = getBoolean("exercise-actors")
require(numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall +
numberOfNodesJoiningOneByOneLarge + numberOfNodesJoiningToOneNode + numberOfNodesJoiningToSeedNodes <= totalNumberOfNodes,
s"specified number of joining nodes <= ${totalNumberOfNodes}")
// don't shutdown the 3 nodes hosting the master actors
require(numberOfNodesLeavingOneByOneSmall + numberOfNodesLeavingOneByOneLarge + numberOfNodesLeaving +
numberOfNodesShutdownOneByOneSmall + numberOfNodesShutdownOneByOneLarge + numberOfNodesShutdown <= totalNumberOfNodes - 3,
s"specified number of leaving/shutdown nodes <= ${totalNumberOfNodes - 3}")
require(numberOfNodesJoinRemove <= totalNumberOfNodes, s"nr-of-nodes-join-remove should be <= ${totalNumberOfNodes}")
override def toString: String = {
testConfig.withFallback(ConfigFactory.parseString(s"nrOfNodes=${totalNumberOfNodes}")).root.render
}
}
implicit class FormattedDouble(val d: Double) extends AnyVal {
def form: String = d.formatted("%.2f")
}
final case class ClusterResult(
address: Address,
duration: Duration,
clusterStats: GossipStats)
final case class AggregatedClusterResult(title: String, duration: Duration, clusterStats: GossipStats)
/**
* Central aggregator of cluster statistics and metrics.
* Reports the result via log periodically and when all
* expected results has been collected. It shuts down
* itself when expected results has been collected.
*/
class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging {
import settings.reportMetricsInterval
import settings.infolog
val cluster = Cluster(context.system)
var reportTo: Option[ActorRef] = None
var results = Vector.empty[ClusterResult]
var nodeMetrics = Set.empty[NodeMetrics]
var phiValuesObservedByNode = {
import akka.cluster.Member.addressOrdering
immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]]
}
var clusterStatsObservedByNode = {
import akka.cluster.Member.addressOrdering
immutable.SortedMap.empty[Address, CurrentInternalStats]
}
import context.dispatcher
val reportMetricsTask = context.system.scheduler.schedule(
reportMetricsInterval, reportMetricsInterval, self, ReportTick)
// subscribe to ClusterMetricsChanged, re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged])
override def postStop(): Unit = {
cluster.unsubscribe(self)
reportMetricsTask.cancel()
super.postStop()
}
def receive = {
case ClusterMetricsChanged(clusterMetrics) ⇒ nodeMetrics = clusterMetrics
case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues
case StatsResult(from, stats) ⇒ clusterStatsObservedByNode += from -> stats
case ReportTick ⇒
if (infolog)
log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
case r: ClusterResult ⇒
results :+= r
if (results.size == expectedResults) {
val aggregated = AggregatedClusterResult(title, maxDuration, totalGossipStats)
if (infolog)
log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
reportTo foreach { _ ! aggregated }
context stop self
}
case _: CurrentClusterState ⇒
case ReportTo(ref) ⇒ reportTo = ref
}
def maxDuration = results.map(_.duration).max
def totalGossipStats = results.foldLeft(GossipStats()) { _ :+ _.clusterStats }
def formatMetrics: String = {
import akka.cluster.Member.addressOrdering
(formatMetricsHeader +: (nodeMetrics.toSeq.sortBy(_.address) map formatMetricsLine)).mkString("\n")
}
def formatMetricsHeader: String = "[Node]\t[Heap (MB)]\t[CPU (%)]\t[Load]"
def formatMetricsLine(nodeMetrics: NodeMetrics): String = {
val heap = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max) ⇒
(used.doubleValue / 1024 / 1024).form
case _ ⇒ ""
}
val cpuAndLoad = nodeMetrics match {
case Cpu(address, timestamp, loadOption, cpuOption, processors) ⇒
format(cpuOption) + "\t" + format(loadOption)
case _ ⇒ "N/A\tN/A"
}
s"${nodeMetrics.address}\t${heap}\t${cpuAndLoad}"
}
def format(opt: Option[Double]) = opt match {
case None ⇒ "N/A"
case Some(x) ⇒ x.form
}
def formatPhi: String = {
if (phiValuesObservedByNode.isEmpty) ""
else {
import akka.cluster.Member.addressOrdering
val lines =
for {
(monitor, phiValues) ← phiValuesObservedByNode
phi ← phiValues
} yield formatPhiLine(monitor, phi.address, phi)
lines.mkString(formatPhiHeader + "\n", "\n", "")
}
}
def formatPhiHeader: String = "[Monitor]\t[Subject]\t[count]\t[count phi > 1.0]\t[max phi]"
def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String =
s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}"
def formatStats: String = {
def f(stats: CurrentInternalStats) = {
import stats.gossipStats._
import stats.vclockStats._
s"ClusterStats($receivedGossipCount, $mergeCount, $sameCount, $newerCount, $olderCount, $versionSize, $seenLatest)"
}
(clusterStatsObservedByNode map { case (monitor, stats) ⇒ s"${monitor}\t${f(stats)}" }).
mkString("ClusterStats(gossip, merge, same, newer, older, vclockSize, seenLatest)\n", "\n", "")
}
}
/**
* Keeps cluster statistics and metrics reported by
* ClusterResultAggregator. Logs the list of historical
* results when a new AggregatedClusterResult is received.
*/
class ClusterResultHistory extends Actor with ActorLogging {
var history = Vector.empty[AggregatedClusterResult]
def receive = {
case result: AggregatedClusterResult ⇒
history :+= result
log.info("Cluster result history\n" + formatHistory)
}
def formatHistory: String =
(formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n")
def formatHistoryHeader: String = "[Title]\t[Duration (ms)]\t[GossipStats(gossip, merge, same, newer, older)]"
def formatHistoryLine(result: AggregatedClusterResult): String =
s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}"
}
/**
* Collect phi values of the failure detector and report to the
* central ClusterResultAggregator.
*/
class PhiObserver extends Actor with ActorLogging {
val cluster = Cluster(context.system)
var reportTo: Option[ActorRef] = None
val emptyPhiByNode = Map.empty[Address, PhiValue].withDefault(address ⇒ PhiValue(address, 0, 0, 0.0))
var phiByNode = emptyPhiByNode
var nodes = Set.empty[Address]
def phi(address: Address): Double = cluster.failureDetector match {
case reg: DefaultFailureDetectorRegistry[Address] ⇒ reg.failureDetector(address) match {
case Some(fd: PhiAccrualFailureDetector) ⇒ fd.phi
case _ ⇒ 0.0
}
case _ ⇒ 0.0
}
import context.dispatcher
val checkPhiTask = context.system.scheduler.schedule(
1.second, 1.second, self, PhiTick)
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])
override def postStop(): Unit = {
cluster.unsubscribe(self)
checkPhiTask.cancel()
super.postStop()
}
def receive = {
case PhiTick ⇒
nodes foreach { node ⇒
val previous = phiByNode(node)
val φ = phi(node)
if (φ > 0 || cluster.failureDetector.isMonitoring(node)) {
val aboveOne = if (!φ.isInfinite && φ > 1.0) 1 else 0
phiByNode += node -> PhiValue(node, previous.countAboveOne + aboveOne, previous.count + 1,
math.max(previous.max, φ))
}
}
val phiSet = immutable.SortedSet.empty[PhiValue] ++ phiByNode.values
reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) }
case state: CurrentClusterState ⇒ nodes = state.members.map(_.address)
case memberEvent: MemberEvent ⇒ nodes += memberEvent.member.address
case ReportTo(ref) ⇒
reportTo foreach context.unwatch
reportTo = ref
reportTo foreach context.watch
case Terminated(ref) ⇒
reportTo match {
case Some(`ref`) ⇒ reportTo = None
case _ ⇒
}
case Reset ⇒
phiByNode = emptyPhiByNode
nodes = Set.empty[Address]
cluster.unsubscribe(self)
cluster.subscribe(self, classOf[MemberEvent])
}
}
class StatsObserver extends Actor {
val cluster = Cluster(context.system)
var reportTo: Option[ActorRef] = None
var startStats: Option[GossipStats] = None
override def preStart(): Unit = cluster.subscribe(self, classOf[CurrentInternalStats])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case CurrentInternalStats(gossipStats, vclockStats) ⇒
val diff = startStats match {
case None ⇒ { startStats = Some(gossipStats); gossipStats }
case Some(start) ⇒ gossipStats :- start
}
val res = StatsResult(cluster.selfAddress, CurrentInternalStats(diff, vclockStats))
reportTo foreach { _ ! res }
case ReportTo(ref) ⇒
reportTo foreach context.unwatch
reportTo = ref
reportTo foreach context.watch
case Terminated(ref) ⇒
reportTo match {
case Some(`ref`) ⇒ reportTo = None
case _ ⇒
}
case Reset ⇒
startStats = None
case _: CurrentClusterState ⇒ // not interesting here
}
}
/**
* Master of routers
*
* Flow control, to not flood the consumers, is handled by scheduling a
* batch of messages to be sent to the router when half of the number
* of outstanding messages remains.
*
* It uses a simple message retry mechanism. If an ack of a sent message
* is not received within a timeout, that message will be resent to the router,
* infinite number of times.
*
* When it receives the `End` command it will stop sending messages to the router,
* resends continuous, until all outstanding acks have been received, and then
* finally it replies with `WorkResult` to the sender of the `End` command, and stops
* itself.
*/
class Master(settings: StressMultiJvmSpec.Settings, batchInterval: FiniteDuration, tree: Boolean) extends Actor {
val workers = context.actorOf(FromConfig.props(Props[Worker]), "workers")
val payload = Array.fill(settings.payloadSize)(ThreadLocalRandom.current.nextInt(127).toByte)
val retryTimeout = 5.seconds.dilated(context.system)
val idCounter = Iterator from 0
var sendCounter = 0L
var ackCounter = 0L
var outstanding = Map.empty[JobId, JobState]
var startTime = 0L
import context.dispatcher
val resendTask = context.system.scheduler.schedule(3.seconds, 3.seconds, self, RetryTick)
override def postStop(): Unit = {
resendTask.cancel()
super.postStop()
}
def receive = {
case Begin ⇒
startTime = System.nanoTime
self ! SendBatch
context.become(working)
case RetryTick ⇒
}
def working: Receive = {
case Ack(id) ⇒
outstanding -= id
ackCounter += 1
if (outstanding.size == settings.workBatchSize / 2)
if (batchInterval == Duration.Zero) self ! SendBatch
else context.system.scheduler.scheduleOnce(batchInterval, self, SendBatch)
case SendBatch ⇒ sendJobs()
case RetryTick ⇒ resend()
case End ⇒
done(sender())
context.become(ending(sender()))
}
def ending(replyTo: ActorRef): Receive = {
case Ack(id) ⇒
outstanding -= id
ackCounter += 1
done(replyTo)
case SendBatch ⇒
case RetryTick ⇒ resend()
}
def done(replyTo: ActorRef): Unit =
if (outstanding.isEmpty) {
val duration = (System.nanoTime - startTime).nanos
replyTo ! WorkResult(duration, sendCounter, ackCounter)
context stop self
}
def sendJobs(): Unit = {
0 until settings.workBatchSize foreach { _ ⇒
send(createJob())
}
}
def createJob(): Job = {
if (tree) TreeJob(idCounter.next(), payload, ThreadLocalRandom.current.nextInt(settings.treeWidth),
settings.treeLevels, settings.treeWidth)
else SimpleJob(idCounter.next(), payload)
}
def resend(): Unit = {
outstanding.values foreach { jobState ⇒
if (jobState.deadline.isOverdue)
send(jobState.job)
}
}
def send(job: Job): Unit = {
outstanding += job.id -> JobState(Deadline.now + retryTimeout, job)
sendCounter += 1
workers ! job
}
}
/**
* Used by Master as routee
*/
class Worker extends Actor with ActorLogging {
def receive = {
case SimpleJob(id, payload) ⇒ sender() ! Ack(id)
case TreeJob(id, payload, idx, levels, width) ⇒
// create the actors when first TreeJob message is received
val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt
log.debug("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children",
totalActors, levels, width)
val tree = context.actorOf(Props(classOf[TreeNode], levels, width), "tree")
tree forward ((idx, SimpleJob(id, payload)))
context.become(treeWorker(tree))
}
def treeWorker(tree: ActorRef): Receive = {
case SimpleJob(id, payload) ⇒ sender() ! Ack(id)
case TreeJob(id, payload, idx, _, _) ⇒
tree forward ((idx, SimpleJob(id, payload)))
}
}
class TreeNode(level: Int, width: Int) extends Actor {
require(level >= 1)
def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width)
val indexedChildren =
0 until width map { i ⇒ context.actorOf(Props(createChild()).withDeploy(Deploy.local), name = i.toString) } toVector
def receive = {
case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward ((idx, job))
}
}
class Leaf extends Actor {
def receive = {
case (_: Int, job: SimpleJob) ⇒ sender() ! Ack(job.id)
}
}
/**
* Used for remote death watch testing
*/
class Watchee extends Actor {
def receive = Actor.emptyBehavior
}
/**
* Used for remote supervision testing
*/
class Supervisor extends Actor {
var restartCount = 0
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) {
case _: Exception ⇒
restartCount += 1
Restart
}
def receive = {
case props: Props ⇒ context.actorOf(props)
case e: Exception ⇒ context.children foreach { _ ! e }
case GetChildrenCount ⇒ sender() ! ChildrenCount(context.children.size, restartCount)
case Reset ⇒
require(context.children.isEmpty,
s"ResetChildrenCount not allowed when children exists, [${context.children.size}]")
restartCount = 0
}
}
/**
* Child of Supervisor for remote supervision testing
*/
class RemoteChild extends Actor {
def receive = {
case e: Exception ⇒ throw e
}
}
case object Begin
case object End
case object RetryTick
case object ReportTick
case object PhiTick
final case class PhiResult(from: Address, phiValues: immutable.SortedSet[PhiValue])
final case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) extends Ordered[PhiValue] {
import akka.cluster.Member.addressOrdering
def compare(that: PhiValue) = addressOrdering.compare(this.address, that.address)
}
final case class ReportTo(ref: Option[ActorRef])
final case class StatsResult(from: Address, stats: CurrentInternalStats)
type JobId = Int
trait Job { def id: JobId }
final case class SimpleJob(id: JobId, payload: Any) extends Job
final case class TreeJob(id: JobId, payload: Any, idx: Int, levels: Int, width: Int) extends Job
final case class Ack(id: JobId)
final case class JobState(deadline: Deadline, job: Job)
final case class WorkResult(duration: Duration, sendCount: Long, ackCount: Long) {
def retryCount: Long = sendCount - ackCount
def jobsPerSecond: Double = ackCount * 1000.0 / duration.toMillis
}
case object SendBatch
final case class CreateTree(levels: Int, width: Int)
case object GetChildrenCount
final case class ChildrenCount(numberOfChildren: Int, numberOfChildRestarts: Int)
case object Reset
}
class StressMultiJvmNode1 extends StressSpec
class StressMultiJvmNode2 extends StressSpec
class StressMultiJvmNode3 extends StressSpec
class StressMultiJvmNode4 extends StressSpec
class StressMultiJvmNode5 extends StressSpec
class StressMultiJvmNode6 extends StressSpec
class StressMultiJvmNode7 extends StressSpec
class StressMultiJvmNode8 extends StressSpec
class StressMultiJvmNode9 extends StressSpec
class StressMultiJvmNode10 extends StressSpec
class StressMultiJvmNode11 extends StressSpec
class StressMultiJvmNode12 extends StressSpec
class StressMultiJvmNode13 extends StressSpec
abstract class StressSpec
extends MultiNodeSpec(StressMultiJvmSpec)
with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender {
import StressMultiJvmSpec._
import ClusterEvent._
val settings = new Settings(system.settings.config)
import settings._
val identifyProbe = TestProbe()
var step = 0
var nbrUsedRoles = 0
override def beforeEach(): Unit = { step += 1 }
override def expectedTestDuration = settings.expectedTestDuration
override def shutdownTimeout: FiniteDuration = 30.seconds.dilated
override def muteLog(sys: ActorSystem = system): Unit = {
super.muteLog(sys)
sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*")))
muteDeadLetters(classOf[SimpleJob], classOf[AggregatedClusterResult], SendBatch.getClass,
classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys)
}
def jvmInfo(): String = {
val runtime = ManagementFactory.getRuntimeMXBean
val os = ManagementFactory.getOperatingSystemMXBean
val threads = ManagementFactory.getThreadMXBean
val mem = ManagementFactory.getMemoryMXBean
val heap = mem.getHeapMemoryUsage
val sb = new StringBuilder
sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion)
sb.append("\n")
sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor).
append(" ").append(runtime.getVmVersion)
sb.append("\n")
sb.append("Processors: ").append(os.getAvailableProcessors)
sb.append("\n")
sb.append("Load average: ").append(os.getSystemLoadAverage)
sb.append("\n")
sb.append("Thread count: ").append(threads.getThreadCount).append(" (").append(threads.getPeakThreadCount).append(")")
sb.append("\n")
sb.append("Heap: ").append((heap.getUsed.toDouble / 1024 / 1024).form).
append(" (").append((heap.getInit.toDouble / 1024 / 1024).form).
append(" - ").
append((heap.getMax.toDouble / 1024 / 1024).form).
append(")").append(" MB")
sb.append("\n")
import scala.collection.JavaConverters._
val args = runtime.getInputArguments.asScala.filterNot(_.contains("classpath")).mkString("\n ")
sb.append("Args:\n ").append(args)
sb.append("\n")
sb.toString
}
val seedNodes = roles.take(numberOfSeedNodes)
def latestGossipStats = cluster.readView.latestStats.gossipStats
override def cluster: Cluster = {
createWorker
super.cluster
}
// always create one worker when the cluster is started
lazy val createWorker: Unit =
system.actorOf(Props[Worker], "worker")
def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = {
runOn(roles.head) {
val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings).withDeploy(Deploy.local),
name = "result" + step)
if (includeInHistory && infolog) aggregator ! ReportTo(Some(clusterResultHistory))
else aggregator ! ReportTo(None)
}
enterBarrier("result-aggregator-created-" + step)
runOn(roles.take(nbrUsedRoles): _*) {
phiObserver ! ReportTo(clusterResultAggregator)
statsObserver ! Reset
statsObserver ! ReportTo(clusterResultAggregator)
}
}
def clusterResultAggregator: Option[ActorRef] = {
system.actorSelection(node(roles.head) / "user" / ("result" + step)).tell(Identify(step), identifyProbe.ref)
identifyProbe.expectMsgType[ActorIdentity].ref
}
lazy val clusterResultHistory =
if (settings.infolog) system.actorOf(Props[ClusterResultHistory], "resultHistory")
else system.deadLetters
lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver")
lazy val statsObserver = system.actorOf(Props[StatsObserver], "statsObserver")
def awaitClusterResult(): Unit = {
runOn(roles.head) {
clusterResultAggregator match {
case Some(r) ⇒
watch(r)
expectMsgPF() { case Terminated(a) if a.path == r.path ⇒ true }
case None ⇒ // ok, already terminated
}
}
enterBarrier("cluster-result-done-" + step)
}
def joinOneByOne(numberOfNodes: Int): Unit = {
0 until numberOfNodes foreach { _ ⇒
joinOne()
nbrUsedRoles += 1
step += 1
}
}
def convergenceWithin(base: FiniteDuration, nodes: Int): FiniteDuration =
(base.toMillis * convergenceWithinFactor * nodes).millis
def joinOne(): Unit = within(5.seconds + convergenceWithin(2.seconds, nbrUsedRoles + 1)) {
val currentRoles = roles.take(nbrUsedRoles + 1)
val title = s"join one to ${nbrUsedRoles} nodes cluster"
createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
runOn(currentRoles: _*) {
reportResult {
runOn(currentRoles.last) {
cluster.join(roles.head)
}
awaitMembersUp(currentRoles.size, timeout = remainingOrDefault)
}
}
awaitClusterResult()
enterBarrier("join-one-" + step)
}
def joinSeveral(numberOfNodes: Int, toSeedNodes: Boolean): Unit =
within(10.seconds + convergenceWithin(3.seconds, nbrUsedRoles + numberOfNodes)) {
val currentRoles = roles.take(nbrUsedRoles + numberOfNodes)
val joiningRoles = currentRoles.takeRight(numberOfNodes)
val title = s"join ${numberOfNodes} to ${if (toSeedNodes) "seed nodes" else "one node"}, in ${nbrUsedRoles} nodes cluster"
createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
runOn(currentRoles: _*) {
reportResult {
runOn(joiningRoles: _*) {
if (toSeedNodes) cluster.joinSeedNodes(seedNodes.toIndexedSeq map address)
else cluster.join(roles.head)
}
awaitMembersUp(currentRoles.size, timeout = remainingOrDefault)
}
}
awaitClusterResult()
enterBarrier("join-several-" + step)
}
def removeOneByOne(numberOfNodes: Int, shutdown: Boolean): Unit = {
0 until numberOfNodes foreach { _ ⇒
removeOne(shutdown)
nbrUsedRoles -= 1
step += 1
}
}
def removeOne(shutdown: Boolean): Unit = within(25.seconds + convergenceWithin(3.seconds, nbrUsedRoles - 1)) {
val currentRoles = roles.take(nbrUsedRoles - 1)
val title = s"${if (shutdown) "shutdown" else "remove"} one from ${nbrUsedRoles} nodes cluster"
createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
val removeRole = roles(nbrUsedRoles - 1)
val removeAddress = address(removeRole)
runOn(removeRole) {
system.actorOf(Props[Watchee], "watchee")
if (!shutdown) cluster.leave(myself)
}
enterBarrier("watchee-created-" + step)
runOn(roles.head) {
system.actorSelection(node(removeRole) / "user" / "watchee").tell(Identify("watchee"), identifyProbe.ref)
val watchee = identifyProbe.expectMsgType[ActorIdentity].ref.get
watch(watchee)
}
enterBarrier("watch-estabilished-" + step)
runOn(currentRoles: _*) {
reportResult {
runOn(roles.head) {
if (shutdown) {
if (infolog)
log.info("Shutting down [{}]", removeAddress)
testConductor.exit(removeRole, 0).await
}
}
awaitMembersUp(currentRoles.size, timeout = remainingOrDefault)
awaitAllReachable()
}
}
runOn(roles.head) {
val expectedPath = RootActorPath(removeAddress) / "user" / "watchee"
expectMsgPF() {
case Terminated(a) if a.path == expectedPath ⇒ true
}
}
enterBarrier("watch-verified-" + step)
awaitClusterResult()
enterBarrier("remove-one-" + step)
}
def removeSeveral(numberOfNodes: Int, shutdown: Boolean): Unit =
within(25.seconds + convergenceWithin(5.seconds, nbrUsedRoles - numberOfNodes)) {
val currentRoles = roles.take(nbrUsedRoles - numberOfNodes)
val removeRoles = roles.slice(currentRoles.size, nbrUsedRoles)
val title = s"${if (shutdown) "shutdown" else "leave"} ${numberOfNodes} in ${nbrUsedRoles} nodes cluster"
createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true)
runOn(removeRoles: _*) {
if (!shutdown) cluster.leave(myself)
}
runOn(currentRoles: _*) {
reportResult {
runOn(roles.head) {
if (shutdown) removeRoles.foreach { r ⇒
if (infolog)
log.info("Shutting down [{}]", address(r))
testConductor.exit(r, 0).await
}
}
awaitMembersUp(currentRoles.size, timeout = remainingOrDefault)
awaitAllReachable()
}
}
awaitClusterResult()
enterBarrier("remove-several-" + step)
}
def reportResult[T](thunk: ⇒ T): T = {
val startTime = System.nanoTime
val startStats = clusterView.latestStats.gossipStats
val returnValue = thunk
clusterResultAggregator foreach {
_ ! ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, latestGossipStats :- startStats)
}
returnValue
}
def exerciseJoinRemove(title: String, duration: FiniteDuration): Unit = {
val activeRoles = roles.take(numberOfNodesJoinRemove)
val loopDuration = 10.seconds + convergenceWithin(4.seconds, nbrUsedRoles + activeRoles.size)
val rounds = ((duration - loopDuration).toMillis / loopDuration.toMillis).max(1).toInt
val usedRoles = roles.take(nbrUsedRoles)
val usedAddresses = usedRoles.map(address(_)).toSet
@tailrec def loop(counter: Int, previousAS: Option[ActorSystem], allPreviousAddresses: Set[Address]): Option[ActorSystem] = {
if (counter > rounds) previousAS
else {
val t = title + " round " + counter
runOn(usedRoles: _*) {
phiObserver ! Reset
statsObserver ! Reset
}
createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true)
val (nextAS, nextAddresses) = within(loopDuration) {
reportResult {
val nextAS =
if (activeRoles contains myself) {
previousAS foreach { as ⇒ TestKit.shutdownActorSystem(as) }
val sys = ActorSystem(system.name, system.settings.config)
muteLog(sys)
Cluster(sys).joinSeedNodes(seedNodes.toIndexedSeq map address)
Some(sys)
} else previousAS
runOn(usedRoles: _*) {
awaitMembersUp(
nbrUsedRoles + activeRoles.size,
canNotBePartOfMemberRing = allPreviousAddresses,
timeout = remainingOrDefault)
awaitAllReachable()
}
val nextAddresses = clusterView.members.map(_.address) -- usedAddresses
runOn(usedRoles: _*) {
nextAddresses.size should be(numberOfNodesJoinRemove)
}
enterBarrier("join-remove-" + step)
(nextAS, nextAddresses)
}
}
awaitClusterResult()
step += 1
loop(counter + 1, nextAS, nextAddresses)
}
}
loop(1, None, Set.empty) foreach { as ⇒ TestKit.shutdownActorSystem(as) }
within(loopDuration) {
runOn(usedRoles: _*) {
awaitMembersUp(nbrUsedRoles, timeout = remainingOrDefault)
awaitAllReachable()
phiObserver ! Reset
statsObserver ! Reset
}
}
enterBarrier("join-remove-shutdown-" + step)
}
def masterName: String = "master-" + myself.name
def master: Option[ActorRef] = {
system.actorSelection("/user/" + masterName).tell(Identify("master"), identifyProbe.ref)
identifyProbe.expectMsgType[ActorIdentity].ref
}
def exerciseRouters(title: String, duration: FiniteDuration, batchInterval: FiniteDuration,
expectDroppedMessages: Boolean, tree: Boolean): Unit =
within(duration + 10.seconds) {
nbrUsedRoles should be(totalNumberOfNodes)
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3)
runOn(masterRoles: _*) {
reportResult {
val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local),
name = masterName)
m ! Begin
import system.dispatcher
system.scheduler.scheduleOnce(duration) {
m.tell(End, testActor)
}
val workResult = awaitWorkResult(m)
workResult.sendCount should be > (0L)
workResult.ackCount should be > (0L)
if (!expectDroppedMessages)
workResult.retryCount should be(0)
enterBarrier("routers-done-" + step)
}
}
runOn(otherRoles: _*) {
reportResult {
enterBarrier("routers-done-" + step)
}
}
awaitClusterResult()
}
def awaitWorkResult(m: ActorRef): WorkResult = {
val workResult = expectMsgType[WorkResult]
if (settings.infolog)
log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName,
workResult.jobsPerSecond.form,
workResult.retryCount, workResult.sendCount)
watch(m)
expectTerminated(m)
workResult
}
def exerciseSupervision(title: String, duration: FiniteDuration, oneIteration: Duration): Unit =
within(duration + 10.seconds) {
val rounds = (duration.toMillis / oneIteration.toMillis).max(1).toInt
val supervisor = system.actorOf(Props[Supervisor], "supervisor")
for (count ← 0 until rounds) {
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3)
runOn(masterRoles: _*) {
reportResult {
roles.take(nbrUsedRoles) foreach { r ⇒
supervisor ! Props[RemoteChild].withDeploy(Deploy(scope = RemoteScope(address(r))))
}
supervisor ! GetChildrenCount
expectMsgType[ChildrenCount] should be(ChildrenCount(nbrUsedRoles, 0))
1 to 5 foreach { _ ⇒ supervisor ! new RuntimeException("Simulated exception") }
awaitAssert {
supervisor ! GetChildrenCount
val c = expectMsgType[ChildrenCount]
c should be(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles))
}
// after 5 restart attempts the children should be stopped
supervisor ! new RuntimeException("Simulated exception")
awaitAssert {
supervisor ! GetChildrenCount
val c = expectMsgType[ChildrenCount]
// zero children
c should be(ChildrenCount(0, 6 * nbrUsedRoles))
}
supervisor ! Reset
}
enterBarrier("supervision-done-" + step)
}
runOn(otherRoles: _*) {
reportResult {
enterBarrier("supervision-done-" + step)
}
}
awaitClusterResult()
step += 1
}
}
def idleGossip(title: String): Unit = {
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = true)
reportResult {
clusterView.members.size should be(nbrUsedRoles)
Thread.sleep(idleGossipDuration.toMillis)
clusterView.members.size should be(nbrUsedRoles)
}
awaitClusterResult()
}
"A cluster under stress" must {
"log settings" taggedAs LongRunningTest in {
if (infolog) {
log.info("StressSpec JVM:\n{}", jvmInfo)
runOn(roles.head) {
log.info("StressSpec settings:\n{}", settings)
}
}
enterBarrier("after-" + step)
}
"join seed nodes" taggedAs LongRunningTest in within(30 seconds) {
val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially)
val size = seedNodes.size + otherNodesJoiningSeedNodes.size
createResultAggregator("join seed nodes", expectedResults = size, includeInHistory = true)
runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) {
reportResult {
cluster.joinSeedNodes(seedNodes.toIndexedSeq map address)
awaitMembersUp(size, timeout = remainingOrDefault)
}
}
awaitClusterResult()
nbrUsedRoles += size
enterBarrier("after-" + step)
}
"start routers that are running while nodes are joining" taggedAs LongRunningTest in {
runOn(roles.take(3): _*) {
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
name = masterName) ! Begin
}
}
"join nodes one-by-one to small cluster" taggedAs LongRunningTest in {
joinOneByOne(numberOfNodesJoiningOneByOneSmall)
enterBarrier("after-" + step)
}
"join several nodes to one node" taggedAs LongRunningTest in {
joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = false)
nbrUsedRoles += numberOfNodesJoiningToOneNode
enterBarrier("after-" + step)
}
"join several nodes to seed nodes" taggedAs LongRunningTest in {
if (numberOfNodesJoiningToSeedNodes > 0) {
joinSeveral(numberOfNodesJoiningToSeedNodes, toSeedNodes = true)
nbrUsedRoles += numberOfNodesJoiningToSeedNodes
}
enterBarrier("after-" + step)
}
"join nodes one-by-one to large cluster" taggedAs LongRunningTest in {
joinOneByOne(numberOfNodesJoiningOneByOneLarge)
enterBarrier("after-" + step)
}
"end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) {
if (exerciseActors) {
runOn(roles.take(3): _*) {
master match {
case Some(m) ⇒
m.tell(End, testActor)
val workResult = awaitWorkResult(m)
workResult.retryCount should be(0)
workResult.sendCount should be > (0L)
workResult.ackCount should be > (0L)
case None ⇒ fail("master not running")
}
}
}
enterBarrier("after-" + step)
}
"use routers with normal throughput" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseRouters("use routers with normal throughput", normalThroughputDuration,
batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false)
}
enterBarrier("after-" + step)
}
"use routers with high throughput" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseRouters("use routers with high throughput", highThroughputDuration,
batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false)
}
enterBarrier("after-" + step)
}
"use many actors with normal throughput" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseRouters("use many actors with normal throughput", normalThroughputDuration,
batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true)
}
enterBarrier("after-" + step)
}
"use many actors with high throughput" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseRouters("use many actors with high throughput", highThroughputDuration,
batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true)
}
enterBarrier("after-" + step)
}
"exercise join/remove/join/remove" taggedAs LongRunningTest in {
exerciseJoinRemove("exercise join/remove", joinRemoveDuration)
enterBarrier("after-" + step)
}
"exercise supervision" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration)
}
enterBarrier("after-" + step)
}
"gossip when idle" taggedAs LongRunningTest in {
idleGossip("idle gossip")
enterBarrier("after-" + step)
}
"start routers that are running while nodes are removed" taggedAs LongRunningTest in {
if (exerciseActors) {
runOn(roles.take(3): _*) {
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
name = masterName) ! Begin
}
}
enterBarrier("after-" + step)
}
"leave nodes one-by-one from large cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false)
enterBarrier("after-" + step)
}
"shutdown nodes one-by-one from large cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesShutdownOneByOneLarge, shutdown = true)
enterBarrier("after-" + step)
}
"leave several nodes" taggedAs LongRunningTest in {
removeSeveral(numberOfNodesLeaving, shutdown = false)
nbrUsedRoles -= numberOfNodesLeaving
enterBarrier("after-" + step)
}
"shutdown several nodes" taggedAs LongRunningTest in {
removeSeveral(numberOfNodesShutdown, shutdown = true)
nbrUsedRoles -= numberOfNodesShutdown
enterBarrier("after-" + step)
}
"shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true)
enterBarrier("after-" + step)
}
"leave nodes one-by-one from small cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false)
enterBarrier("after-" + step)
}
"end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) {
if (exerciseActors) {
runOn(roles.take(3): _*) {
master match {
case Some(m) ⇒
m.tell(End, testActor)
val workResult = awaitWorkResult(m)
workResult.sendCount should be > (0L)
workResult.ackCount should be > (0L)
case None ⇒ fail("master not running")
}
}
}
enterBarrier("after-" + step)
}
"log jvm info" taggedAs LongRunningTest in {
if (infolog) {
log.info("StressSpec JVM:\n{}", jvmInfo)
}
enterBarrier("after-" + step)
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka StressSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.