|
Akka/Scala example source code file (StressSpec.scala)
The StressSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import language.postfixOps import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import java.util.concurrent.atomic.AtomicReference import org.scalatest.BeforeAndAfterEach import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Address import akka.actor.Deploy import akka.actor.OneForOneStrategy import akka.actor.Props import akka.actor.RootActorPath import akka.actor.SupervisorStrategy._ import akka.actor.Terminated import akka.cluster.ClusterEvent.ClusterMetricsChanged import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.CurrentInternalStats import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.StandardMetrics.Cpu import akka.cluster.StandardMetrics.HeapMemory import akka.remote.DefaultFailureDetectorRegistry import akka.remote.PhiAccrualFailureDetector import akka.remote.RemoteScope import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.routing.FromConfig import akka.testkit._ import akka.testkit.TestEvent._ import akka.actor.Identify import akka.actor.ActorIdentity import akka.util.Helpers.ConfigOps import akka.util.Helpers.Requiring import java.lang.management.ManagementFactory /** * This test is intended to be used as long running stress test * of cluster related features. Number of nodes and duration of * the test steps can be configured. The test scenario is organized as * follows: * 1. join nodes in various ways up to the configured total number of nodes * 2 while nodes are joining a few cluster aware routers are also working * 3. exercise concurrent joining and shutdown of nodes repeatedly * 4. exercise cluster aware routers, including high throughput * 5. exercise many actors in a tree structure * 6. exercise remote supervision * 7. gossip without any changes to the membership * 8. leave and shutdown nodes in various ways * 9. while nodes are removed remote death watch is also exercised * 10. while nodes are removed a few cluster aware routers are also working * * By default it uses 13 nodes. * Example of sbt command line parameters to double that: * `-DMultiJvm.akka.cluster.Stress.nrOfNodes=26 -Dmultinode.Dakka.test.cluster-stress-spec.nr-of-nodes-factor=2` */ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { val totalNumberOfNodes = System.getProperty("MultiJvm.akka.cluster.Stress.nrOfNodes") match { case null ⇒ 13 case value ⇒ value.toInt requiring (_ >= 10, "nrOfNodes should be >= 10") } for (n ← 1 to totalNumberOfNodes) role("node-" + n) // Note that this test uses default configuration, // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" akka.test.cluster-stress-spec { infolog = off # scale the nr-of-nodes* settings with this factor nr-of-nodes-factor = 1 # not scaled nr-of-seed-nodes = 3 nr-of-nodes-joining-to-seed-initally = 2 nr-of-nodes-joining-one-by-one-small = 2 nr-of-nodes-joining-one-by-one-large = 2 nr-of-nodes-joining-to-one = 2 nr-of-nodes-leaving-one-by-one-small = 1 nr-of-nodes-leaving-one-by-one-large = 2 nr-of-nodes-leaving = 2 nr-of-nodes-shutdown-one-by-one-small = 1 nr-of-nodes-shutdown-one-by-one-large = 2 nr-of-nodes-shutdown = 2 nr-of-nodes-join-remove = 2 # not scaled # scale the *-duration settings with this factor duration-factor = 1 join-remove-duration = 90s work-batch-size = 100 work-batch-interval = 2s payload-size = 1000 normal-throughput-duration = 30s high-throughput-duration = 10s supervision-duration = 10s supervision-one-iteration = 2.5s idle-gossip-duration = 10s expected-test-duration = 600s # actors are created in a tree structure defined # by tree-width (number of children for each actor) and # tree-levels, total number of actors can be calculated by # (width * math.pow(width, levels) - 1) / (width - 1) tree-width = 4 tree-levels = 4 report-metrics-interval = 10s # scale convergence within timeouts with this factor convergence-within-factor = 1.0 # set to off to only test cluster membership exercise-actors = on } akka.actor.serialize-messages = off akka.actor.serialize-creators = off akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.cluster { auto-down-unreachable-after = 1s publish-stats-interval = 1s } akka.loggers = ["akka.testkit.TestEventListener"] akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off akka.actor.default-dispatcher.fork-join-executor { parallelism-min = 8 parallelism-max = 8 } akka.actor.deployment { /master-node-1/workers { router = round-robin-pool nr-of-instances = 100 cluster { enabled = on max-nr-of-instances-per-node = 1 allow-local-routees = on } } /master-node-2/workers { router = round-robin-group nr-of-instances = 100 routees.paths = ["/user/worker"] cluster { enabled = on allow-local-routees = on } } /master-node-3/workers = { router = adaptive-pool nr-of-instances = 100 cluster { enabled = on max-nr-of-instances-per-node = 1 allow-local-routees = on } } } """)) class Settings(conf: Config) { private val testConfig = conf.getConfig("akka.test.cluster-stress-spec") import testConfig._ val infolog = getBoolean("infolog") val nFactor = getInt("nr-of-nodes-factor") val numberOfSeedNodes = getInt("nr-of-seed-nodes") // not scaled by nodes factor val numberOfNodesJoiningToSeedNodesInitially = getInt("nr-of-nodes-joining-to-seed-initally") * nFactor val numberOfNodesJoiningOneByOneSmall = getInt("nr-of-nodes-joining-one-by-one-small") * nFactor val numberOfNodesJoiningOneByOneLarge = getInt("nr-of-nodes-joining-one-by-one-large") * nFactor val numberOfNodesJoiningToOneNode = getInt("nr-of-nodes-joining-to-one") * nFactor // remaining will join to seed nodes val numberOfNodesJoiningToSeedNodes = (totalNumberOfNodes - numberOfSeedNodes - numberOfNodesJoiningToSeedNodesInitially - numberOfNodesJoiningOneByOneSmall - numberOfNodesJoiningOneByOneLarge - numberOfNodesJoiningToOneNode) requiring (_ >= 0, s"too many configured nr-of-nodes-joining-*, total should be <= ${totalNumberOfNodes}") val numberOfNodesLeavingOneByOneSmall = getInt("nr-of-nodes-leaving-one-by-one-small") * nFactor val numberOfNodesLeavingOneByOneLarge = getInt("nr-of-nodes-leaving-one-by-one-large") * nFactor val numberOfNodesLeaving = getInt("nr-of-nodes-leaving") * nFactor val numberOfNodesShutdownOneByOneSmall = getInt("nr-of-nodes-shutdown-one-by-one-small") * nFactor val numberOfNodesShutdownOneByOneLarge = getInt("nr-of-nodes-shutdown-one-by-one-large") * nFactor val numberOfNodesShutdown = getInt("nr-of-nodes-shutdown") * nFactor val numberOfNodesJoinRemove = getInt("nr-of-nodes-join-remove") // not scaled by nodes factor val workBatchSize = getInt("work-batch-size") val workBatchInterval = testConfig.getMillisDuration("work-batch-interval") val payloadSize = getInt("payload-size") val dFactor = getInt("duration-factor") val joinRemoveDuration = testConfig.getMillisDuration("join-remove-duration") * dFactor val normalThroughputDuration = testConfig.getMillisDuration("normal-throughput-duration") * dFactor val highThroughputDuration = testConfig.getMillisDuration("high-throughput-duration") * dFactor val supervisionDuration = testConfig.getMillisDuration("supervision-duration") * dFactor val supervisionOneIteration = testConfig.getMillisDuration("supervision-one-iteration") * dFactor val idleGossipDuration = testConfig.getMillisDuration("idle-gossip-duration") * dFactor val expectedTestDuration = testConfig.getMillisDuration("expected-test-duration") * dFactor val treeWidth = getInt("tree-width") val treeLevels = getInt("tree-levels") val reportMetricsInterval = testConfig.getMillisDuration("report-metrics-interval") val convergenceWithinFactor = getDouble("convergence-within-factor") val exerciseActors = getBoolean("exercise-actors") require(numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall + numberOfNodesJoiningOneByOneLarge + numberOfNodesJoiningToOneNode + numberOfNodesJoiningToSeedNodes <= totalNumberOfNodes, s"specified number of joining nodes <= ${totalNumberOfNodes}") // don't shutdown the 3 nodes hosting the master actors require(numberOfNodesLeavingOneByOneSmall + numberOfNodesLeavingOneByOneLarge + numberOfNodesLeaving + numberOfNodesShutdownOneByOneSmall + numberOfNodesShutdownOneByOneLarge + numberOfNodesShutdown <= totalNumberOfNodes - 3, s"specified number of leaving/shutdown nodes <= ${totalNumberOfNodes - 3}") require(numberOfNodesJoinRemove <= totalNumberOfNodes, s"nr-of-nodes-join-remove should be <= ${totalNumberOfNodes}") override def toString: String = { testConfig.withFallback(ConfigFactory.parseString(s"nrOfNodes=${totalNumberOfNodes}")).root.render } } implicit class FormattedDouble(val d: Double) extends AnyVal { def form: String = d.formatted("%.2f") } final case class ClusterResult( address: Address, duration: Duration, clusterStats: GossipStats) final case class AggregatedClusterResult(title: String, duration: Duration, clusterStats: GossipStats) /** * Central aggregator of cluster statistics and metrics. * Reports the result via log periodically and when all * expected results has been collected. It shuts down * itself when expected results has been collected. */ class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging { import settings.reportMetricsInterval import settings.infolog val cluster = Cluster(context.system) var reportTo: Option[ActorRef] = None var results = Vector.empty[ClusterResult] var nodeMetrics = Set.empty[NodeMetrics] var phiValuesObservedByNode = { import akka.cluster.Member.addressOrdering immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]] } var clusterStatsObservedByNode = { import akka.cluster.Member.addressOrdering immutable.SortedMap.empty[Address, CurrentInternalStats] } import context.dispatcher val reportMetricsTask = context.system.scheduler.schedule( reportMetricsInterval, reportMetricsInterval, self, ReportTick) // subscribe to ClusterMetricsChanged, re-subscribe when restart override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged]) override def postStop(): Unit = { cluster.unsubscribe(self) reportMetricsTask.cancel() super.postStop() } def receive = { case ClusterMetricsChanged(clusterMetrics) ⇒ nodeMetrics = clusterMetrics case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues case StatsResult(from, stats) ⇒ clusterStatsObservedByNode += from -> stats case ReportTick ⇒ if (infolog) log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") case r: ClusterResult ⇒ results :+= r if (results.size == expectedResults) { val aggregated = AggregatedClusterResult(title, maxDuration, totalGossipStats) if (infolog) log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") reportTo foreach { _ ! aggregated } context stop self } case _: CurrentClusterState ⇒ case ReportTo(ref) ⇒ reportTo = ref } def maxDuration = results.map(_.duration).max def totalGossipStats = results.foldLeft(GossipStats()) { _ :+ _.clusterStats } def formatMetrics: String = { import akka.cluster.Member.addressOrdering (formatMetricsHeader +: (nodeMetrics.toSeq.sortBy(_.address) map formatMetricsLine)).mkString("\n") } def formatMetricsHeader: String = "[Node]\t[Heap (MB)]\t[CPU (%)]\t[Load]" def formatMetricsLine(nodeMetrics: NodeMetrics): String = { val heap = nodeMetrics match { case HeapMemory(address, timestamp, used, committed, max) ⇒ (used.doubleValue / 1024 / 1024).form case _ ⇒ "" } val cpuAndLoad = nodeMetrics match { case Cpu(address, timestamp, loadOption, cpuOption, processors) ⇒ format(cpuOption) + "\t" + format(loadOption) case _ ⇒ "N/A\tN/A" } s"${nodeMetrics.address}\t${heap}\t${cpuAndLoad}" } def format(opt: Option[Double]) = opt match { case None ⇒ "N/A" case Some(x) ⇒ x.form } def formatPhi: String = { if (phiValuesObservedByNode.isEmpty) "" else { import akka.cluster.Member.addressOrdering val lines = for { (monitor, phiValues) ← phiValuesObservedByNode phi ← phiValues } yield formatPhiLine(monitor, phi.address, phi) lines.mkString(formatPhiHeader + "\n", "\n", "") } } def formatPhiHeader: String = "[Monitor]\t[Subject]\t[count]\t[count phi > 1.0]\t[max phi]" def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" def formatStats: String = { def f(stats: CurrentInternalStats) = { import stats.gossipStats._ import stats.vclockStats._ s"ClusterStats($receivedGossipCount, $mergeCount, $sameCount, $newerCount, $olderCount, $versionSize, $seenLatest)" } (clusterStatsObservedByNode map { case (monitor, stats) ⇒ s"${monitor}\t${f(stats)}" }). mkString("ClusterStats(gossip, merge, same, newer, older, vclockSize, seenLatest)\n", "\n", "") } } /** * Keeps cluster statistics and metrics reported by * ClusterResultAggregator. Logs the list of historical * results when a new AggregatedClusterResult is received. */ class ClusterResultHistory extends Actor with ActorLogging { var history = Vector.empty[AggregatedClusterResult] def receive = { case result: AggregatedClusterResult ⇒ history :+= result log.info("Cluster result history\n" + formatHistory) } def formatHistory: String = (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n") def formatHistoryHeader: String = "[Title]\t[Duration (ms)]\t[GossipStats(gossip, merge, same, newer, older)]" def formatHistoryLine(result: AggregatedClusterResult): String = s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}" } /** * Collect phi values of the failure detector and report to the * central ClusterResultAggregator. */ class PhiObserver extends Actor with ActorLogging { val cluster = Cluster(context.system) var reportTo: Option[ActorRef] = None val emptyPhiByNode = Map.empty[Address, PhiValue].withDefault(address ⇒ PhiValue(address, 0, 0, 0.0)) var phiByNode = emptyPhiByNode var nodes = Set.empty[Address] def phi(address: Address): Double = cluster.failureDetector match { case reg: DefaultFailureDetectorRegistry[Address] ⇒ reg.failureDetector(address) match { case Some(fd: PhiAccrualFailureDetector) ⇒ fd.phi case _ ⇒ 0.0 } case _ ⇒ 0.0 } import context.dispatcher val checkPhiTask = context.system.scheduler.schedule( 1.second, 1.second, self, PhiTick) // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) override def postStop(): Unit = { cluster.unsubscribe(self) checkPhiTask.cancel() super.postStop() } def receive = { case PhiTick ⇒ nodes foreach { node ⇒ val previous = phiByNode(node) val φ = phi(node) if (φ > 0 || cluster.failureDetector.isMonitoring(node)) { val aboveOne = if (!φ.isInfinite && φ > 1.0) 1 else 0 phiByNode += node -> PhiValue(node, previous.countAboveOne + aboveOne, previous.count + 1, math.max(previous.max, φ)) } } val phiSet = immutable.SortedSet.empty[PhiValue] ++ phiByNode.values reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) } case state: CurrentClusterState ⇒ nodes = state.members.map(_.address) case memberEvent: MemberEvent ⇒ nodes += memberEvent.member.address case ReportTo(ref) ⇒ reportTo foreach context.unwatch reportTo = ref reportTo foreach context.watch case Terminated(ref) ⇒ reportTo match { case Some(`ref`) ⇒ reportTo = None case _ ⇒ } case Reset ⇒ phiByNode = emptyPhiByNode nodes = Set.empty[Address] cluster.unsubscribe(self) cluster.subscribe(self, classOf[MemberEvent]) } } class StatsObserver extends Actor { val cluster = Cluster(context.system) var reportTo: Option[ActorRef] = None var startStats: Option[GossipStats] = None override def preStart(): Unit = cluster.subscribe(self, classOf[CurrentInternalStats]) override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case CurrentInternalStats(gossipStats, vclockStats) ⇒ val diff = startStats match { case None ⇒ { startStats = Some(gossipStats); gossipStats } case Some(start) ⇒ gossipStats :- start } val res = StatsResult(cluster.selfAddress, CurrentInternalStats(diff, vclockStats)) reportTo foreach { _ ! res } case ReportTo(ref) ⇒ reportTo foreach context.unwatch reportTo = ref reportTo foreach context.watch case Terminated(ref) ⇒ reportTo match { case Some(`ref`) ⇒ reportTo = None case _ ⇒ } case Reset ⇒ startStats = None case _: CurrentClusterState ⇒ // not interesting here } } /** * Master of routers * * Flow control, to not flood the consumers, is handled by scheduling a * batch of messages to be sent to the router when half of the number * of outstanding messages remains. * * It uses a simple message retry mechanism. If an ack of a sent message * is not received within a timeout, that message will be resent to the router, * infinite number of times. * * When it receives the `End` command it will stop sending messages to the router, * resends continuous, until all outstanding acks have been received, and then * finally it replies with `WorkResult` to the sender of the `End` command, and stops * itself. */ class Master(settings: StressMultiJvmSpec.Settings, batchInterval: FiniteDuration, tree: Boolean) extends Actor { val workers = context.actorOf(FromConfig.props(Props[Worker]), "workers") val payload = Array.fill(settings.payloadSize)(ThreadLocalRandom.current.nextInt(127).toByte) val retryTimeout = 5.seconds.dilated(context.system) val idCounter = Iterator from 0 var sendCounter = 0L var ackCounter = 0L var outstanding = Map.empty[JobId, JobState] var startTime = 0L import context.dispatcher val resendTask = context.system.scheduler.schedule(3.seconds, 3.seconds, self, RetryTick) override def postStop(): Unit = { resendTask.cancel() super.postStop() } def receive = { case Begin ⇒ startTime = System.nanoTime self ! SendBatch context.become(working) case RetryTick ⇒ } def working: Receive = { case Ack(id) ⇒ outstanding -= id ackCounter += 1 if (outstanding.size == settings.workBatchSize / 2) if (batchInterval == Duration.Zero) self ! SendBatch else context.system.scheduler.scheduleOnce(batchInterval, self, SendBatch) case SendBatch ⇒ sendJobs() case RetryTick ⇒ resend() case End ⇒ done(sender()) context.become(ending(sender())) } def ending(replyTo: ActorRef): Receive = { case Ack(id) ⇒ outstanding -= id ackCounter += 1 done(replyTo) case SendBatch ⇒ case RetryTick ⇒ resend() } def done(replyTo: ActorRef): Unit = if (outstanding.isEmpty) { val duration = (System.nanoTime - startTime).nanos replyTo ! WorkResult(duration, sendCounter, ackCounter) context stop self } def sendJobs(): Unit = { 0 until settings.workBatchSize foreach { _ ⇒ send(createJob()) } } def createJob(): Job = { if (tree) TreeJob(idCounter.next(), payload, ThreadLocalRandom.current.nextInt(settings.treeWidth), settings.treeLevels, settings.treeWidth) else SimpleJob(idCounter.next(), payload) } def resend(): Unit = { outstanding.values foreach { jobState ⇒ if (jobState.deadline.isOverdue) send(jobState.job) } } def send(job: Job): Unit = { outstanding += job.id -> JobState(Deadline.now + retryTimeout, job) sendCounter += 1 workers ! job } } /** * Used by Master as routee */ class Worker extends Actor with ActorLogging { def receive = { case SimpleJob(id, payload) ⇒ sender() ! Ack(id) case TreeJob(id, payload, idx, levels, width) ⇒ // create the actors when first TreeJob message is received val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt log.debug("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children", totalActors, levels, width) val tree = context.actorOf(Props(classOf[TreeNode], levels, width), "tree") tree forward ((idx, SimpleJob(id, payload))) context.become(treeWorker(tree)) } def treeWorker(tree: ActorRef): Receive = { case SimpleJob(id, payload) ⇒ sender() ! Ack(id) case TreeJob(id, payload, idx, _, _) ⇒ tree forward ((idx, SimpleJob(id, payload))) } } class TreeNode(level: Int, width: Int) extends Actor { require(level >= 1) def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width) val indexedChildren = 0 until width map { i ⇒ context.actorOf(Props(createChild()).withDeploy(Deploy.local), name = i.toString) } toVector def receive = { case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward ((idx, job)) } } class Leaf extends Actor { def receive = { case (_: Int, job: SimpleJob) ⇒ sender() ! Ack(job.id) } } /** * Used for remote death watch testing */ class Watchee extends Actor { def receive = Actor.emptyBehavior } /** * Used for remote supervision testing */ class Supervisor extends Actor { var restartCount = 0 override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) { case _: Exception ⇒ restartCount += 1 Restart } def receive = { case props: Props ⇒ context.actorOf(props) case e: Exception ⇒ context.children foreach { _ ! e } case GetChildrenCount ⇒ sender() ! ChildrenCount(context.children.size, restartCount) case Reset ⇒ require(context.children.isEmpty, s"ResetChildrenCount not allowed when children exists, [${context.children.size}]") restartCount = 0 } } /** * Child of Supervisor for remote supervision testing */ class RemoteChild extends Actor { def receive = { case e: Exception ⇒ throw e } } case object Begin case object End case object RetryTick case object ReportTick case object PhiTick final case class PhiResult(from: Address, phiValues: immutable.SortedSet[PhiValue]) final case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) extends Ordered[PhiValue] { import akka.cluster.Member.addressOrdering def compare(that: PhiValue) = addressOrdering.compare(this.address, that.address) } final case class ReportTo(ref: Option[ActorRef]) final case class StatsResult(from: Address, stats: CurrentInternalStats) type JobId = Int trait Job { def id: JobId } final case class SimpleJob(id: JobId, payload: Any) extends Job final case class TreeJob(id: JobId, payload: Any, idx: Int, levels: Int, width: Int) extends Job final case class Ack(id: JobId) final case class JobState(deadline: Deadline, job: Job) final case class WorkResult(duration: Duration, sendCount: Long, ackCount: Long) { def retryCount: Long = sendCount - ackCount def jobsPerSecond: Double = ackCount * 1000.0 / duration.toMillis } case object SendBatch final case class CreateTree(levels: Int, width: Int) case object GetChildrenCount final case class ChildrenCount(numberOfChildren: Int, numberOfChildRestarts: Int) case object Reset } class StressMultiJvmNode1 extends StressSpec class StressMultiJvmNode2 extends StressSpec class StressMultiJvmNode3 extends StressSpec class StressMultiJvmNode4 extends StressSpec class StressMultiJvmNode5 extends StressSpec class StressMultiJvmNode6 extends StressSpec class StressMultiJvmNode7 extends StressSpec class StressMultiJvmNode8 extends StressSpec class StressMultiJvmNode9 extends StressSpec class StressMultiJvmNode10 extends StressSpec class StressMultiJvmNode11 extends StressSpec class StressMultiJvmNode12 extends StressSpec class StressMultiJvmNode13 extends StressSpec abstract class StressSpec extends MultiNodeSpec(StressMultiJvmSpec) with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender { import StressMultiJvmSpec._ import ClusterEvent._ val settings = new Settings(system.settings.config) import settings._ val identifyProbe = TestProbe() var step = 0 var nbrUsedRoles = 0 override def beforeEach(): Unit = { step += 1 } override def expectedTestDuration = settings.expectedTestDuration override def shutdownTimeout: FiniteDuration = 30.seconds.dilated override def muteLog(sys: ActorSystem = system): Unit = { super.muteLog(sys) sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) muteDeadLetters(classOf[SimpleJob], classOf[AggregatedClusterResult], SendBatch.getClass, classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys) } def jvmInfo(): String = { val runtime = ManagementFactory.getRuntimeMXBean val os = ManagementFactory.getOperatingSystemMXBean val threads = ManagementFactory.getThreadMXBean val mem = ManagementFactory.getMemoryMXBean val heap = mem.getHeapMemoryUsage val sb = new StringBuilder sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion) sb.append("\n") sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor). append(" ").append(runtime.getVmVersion) sb.append("\n") sb.append("Processors: ").append(os.getAvailableProcessors) sb.append("\n") sb.append("Load average: ").append(os.getSystemLoadAverage) sb.append("\n") sb.append("Thread count: ").append(threads.getThreadCount).append(" (").append(threads.getPeakThreadCount).append(")") sb.append("\n") sb.append("Heap: ").append((heap.getUsed.toDouble / 1024 / 1024).form). append(" (").append((heap.getInit.toDouble / 1024 / 1024).form). append(" - "). append((heap.getMax.toDouble / 1024 / 1024).form). append(")").append(" MB") sb.append("\n") import scala.collection.JavaConverters._ val args = runtime.getInputArguments.asScala.filterNot(_.contains("classpath")).mkString("\n ") sb.append("Args:\n ").append(args) sb.append("\n") sb.toString } val seedNodes = roles.take(numberOfSeedNodes) def latestGossipStats = cluster.readView.latestStats.gossipStats override def cluster: Cluster = { createWorker super.cluster } // always create one worker when the cluster is started lazy val createWorker: Unit = system.actorOf(Props[Worker], "worker") def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = { runOn(roles.head) { val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings).withDeploy(Deploy.local), name = "result" + step) if (includeInHistory && infolog) aggregator ! ReportTo(Some(clusterResultHistory)) else aggregator ! ReportTo(None) } enterBarrier("result-aggregator-created-" + step) runOn(roles.take(nbrUsedRoles): _*) { phiObserver ! ReportTo(clusterResultAggregator) statsObserver ! Reset statsObserver ! ReportTo(clusterResultAggregator) } } def clusterResultAggregator: Option[ActorRef] = { system.actorSelection(node(roles.head) / "user" / ("result" + step)).tell(Identify(step), identifyProbe.ref) identifyProbe.expectMsgType[ActorIdentity].ref } lazy val clusterResultHistory = if (settings.infolog) system.actorOf(Props[ClusterResultHistory], "resultHistory") else system.deadLetters lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver") lazy val statsObserver = system.actorOf(Props[StatsObserver], "statsObserver") def awaitClusterResult(): Unit = { runOn(roles.head) { clusterResultAggregator match { case Some(r) ⇒ watch(r) expectMsgPF() { case Terminated(a) if a.path == r.path ⇒ true } case None ⇒ // ok, already terminated } } enterBarrier("cluster-result-done-" + step) } def joinOneByOne(numberOfNodes: Int): Unit = { 0 until numberOfNodes foreach { _ ⇒ joinOne() nbrUsedRoles += 1 step += 1 } } def convergenceWithin(base: FiniteDuration, nodes: Int): FiniteDuration = (base.toMillis * convergenceWithinFactor * nodes).millis def joinOne(): Unit = within(5.seconds + convergenceWithin(2.seconds, nbrUsedRoles + 1)) { val currentRoles = roles.take(nbrUsedRoles + 1) val title = s"join one to ${nbrUsedRoles} nodes cluster" createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) runOn(currentRoles: _*) { reportResult { runOn(currentRoles.last) { cluster.join(roles.head) } awaitMembersUp(currentRoles.size, timeout = remainingOrDefault) } } awaitClusterResult() enterBarrier("join-one-" + step) } def joinSeveral(numberOfNodes: Int, toSeedNodes: Boolean): Unit = within(10.seconds + convergenceWithin(3.seconds, nbrUsedRoles + numberOfNodes)) { val currentRoles = roles.take(nbrUsedRoles + numberOfNodes) val joiningRoles = currentRoles.takeRight(numberOfNodes) val title = s"join ${numberOfNodes} to ${if (toSeedNodes) "seed nodes" else "one node"}, in ${nbrUsedRoles} nodes cluster" createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) runOn(currentRoles: _*) { reportResult { runOn(joiningRoles: _*) { if (toSeedNodes) cluster.joinSeedNodes(seedNodes.toIndexedSeq map address) else cluster.join(roles.head) } awaitMembersUp(currentRoles.size, timeout = remainingOrDefault) } } awaitClusterResult() enterBarrier("join-several-" + step) } def removeOneByOne(numberOfNodes: Int, shutdown: Boolean): Unit = { 0 until numberOfNodes foreach { _ ⇒ removeOne(shutdown) nbrUsedRoles -= 1 step += 1 } } def removeOne(shutdown: Boolean): Unit = within(25.seconds + convergenceWithin(3.seconds, nbrUsedRoles - 1)) { val currentRoles = roles.take(nbrUsedRoles - 1) val title = s"${if (shutdown) "shutdown" else "remove"} one from ${nbrUsedRoles} nodes cluster" createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) val removeRole = roles(nbrUsedRoles - 1) val removeAddress = address(removeRole) runOn(removeRole) { system.actorOf(Props[Watchee], "watchee") if (!shutdown) cluster.leave(myself) } enterBarrier("watchee-created-" + step) runOn(roles.head) { system.actorSelection(node(removeRole) / "user" / "watchee").tell(Identify("watchee"), identifyProbe.ref) val watchee = identifyProbe.expectMsgType[ActorIdentity].ref.get watch(watchee) } enterBarrier("watch-estabilished-" + step) runOn(currentRoles: _*) { reportResult { runOn(roles.head) { if (shutdown) { if (infolog) log.info("Shutting down [{}]", removeAddress) testConductor.exit(removeRole, 0).await } } awaitMembersUp(currentRoles.size, timeout = remainingOrDefault) awaitAllReachable() } } runOn(roles.head) { val expectedPath = RootActorPath(removeAddress) / "user" / "watchee" expectMsgPF() { case Terminated(a) if a.path == expectedPath ⇒ true } } enterBarrier("watch-verified-" + step) awaitClusterResult() enterBarrier("remove-one-" + step) } def removeSeveral(numberOfNodes: Int, shutdown: Boolean): Unit = within(25.seconds + convergenceWithin(5.seconds, nbrUsedRoles - numberOfNodes)) { val currentRoles = roles.take(nbrUsedRoles - numberOfNodes) val removeRoles = roles.slice(currentRoles.size, nbrUsedRoles) val title = s"${if (shutdown) "shutdown" else "leave"} ${numberOfNodes} in ${nbrUsedRoles} nodes cluster" createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) runOn(removeRoles: _*) { if (!shutdown) cluster.leave(myself) } runOn(currentRoles: _*) { reportResult { runOn(roles.head) { if (shutdown) removeRoles.foreach { r ⇒ if (infolog) log.info("Shutting down [{}]", address(r)) testConductor.exit(r, 0).await } } awaitMembersUp(currentRoles.size, timeout = remainingOrDefault) awaitAllReachable() } } awaitClusterResult() enterBarrier("remove-several-" + step) } def reportResult[T](thunk: ⇒ T): T = { val startTime = System.nanoTime val startStats = clusterView.latestStats.gossipStats val returnValue = thunk clusterResultAggregator foreach { _ ! ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, latestGossipStats :- startStats) } returnValue } def exerciseJoinRemove(title: String, duration: FiniteDuration): Unit = { val activeRoles = roles.take(numberOfNodesJoinRemove) val loopDuration = 10.seconds + convergenceWithin(4.seconds, nbrUsedRoles + activeRoles.size) val rounds = ((duration - loopDuration).toMillis / loopDuration.toMillis).max(1).toInt val usedRoles = roles.take(nbrUsedRoles) val usedAddresses = usedRoles.map(address(_)).toSet @tailrec def loop(counter: Int, previousAS: Option[ActorSystem], allPreviousAddresses: Set[Address]): Option[ActorSystem] = { if (counter > rounds) previousAS else { val t = title + " round " + counter runOn(usedRoles: _*) { phiObserver ! Reset statsObserver ! Reset } createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true) val (nextAS, nextAddresses) = within(loopDuration) { reportResult { val nextAS = if (activeRoles contains myself) { previousAS foreach { as ⇒ TestKit.shutdownActorSystem(as) } val sys = ActorSystem(system.name, system.settings.config) muteLog(sys) Cluster(sys).joinSeedNodes(seedNodes.toIndexedSeq map address) Some(sys) } else previousAS runOn(usedRoles: _*) { awaitMembersUp( nbrUsedRoles + activeRoles.size, canNotBePartOfMemberRing = allPreviousAddresses, timeout = remainingOrDefault) awaitAllReachable() } val nextAddresses = clusterView.members.map(_.address) -- usedAddresses runOn(usedRoles: _*) { nextAddresses.size should be(numberOfNodesJoinRemove) } enterBarrier("join-remove-" + step) (nextAS, nextAddresses) } } awaitClusterResult() step += 1 loop(counter + 1, nextAS, nextAddresses) } } loop(1, None, Set.empty) foreach { as ⇒ TestKit.shutdownActorSystem(as) } within(loopDuration) { runOn(usedRoles: _*) { awaitMembersUp(nbrUsedRoles, timeout = remainingOrDefault) awaitAllReachable() phiObserver ! Reset statsObserver ! Reset } } enterBarrier("join-remove-shutdown-" + step) } def masterName: String = "master-" + myself.name def master: Option[ActorRef] = { system.actorSelection("/user/" + masterName).tell(Identify("master"), identifyProbe.ref) identifyProbe.expectMsgType[ActorIdentity].ref } def exerciseRouters(title: String, duration: FiniteDuration, batchInterval: FiniteDuration, expectDroppedMessages: Boolean, tree: Boolean): Unit = within(duration + 10.seconds) { nbrUsedRoles should be(totalNumberOfNodes) createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) runOn(masterRoles: _*) { reportResult { val m = system.actorOf(Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local), name = masterName) m ! Begin import system.dispatcher system.scheduler.scheduleOnce(duration) { m.tell(End, testActor) } val workResult = awaitWorkResult(m) workResult.sendCount should be > (0L) workResult.ackCount should be > (0L) if (!expectDroppedMessages) workResult.retryCount should be(0) enterBarrier("routers-done-" + step) } } runOn(otherRoles: _*) { reportResult { enterBarrier("routers-done-" + step) } } awaitClusterResult() } def awaitWorkResult(m: ActorRef): WorkResult = { val workResult = expectMsgType[WorkResult] if (settings.infolog) log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName, workResult.jobsPerSecond.form, workResult.retryCount, workResult.sendCount) watch(m) expectTerminated(m) workResult } def exerciseSupervision(title: String, duration: FiniteDuration, oneIteration: Duration): Unit = within(duration + 10.seconds) { val rounds = (duration.toMillis / oneIteration.toMillis).max(1).toInt val supervisor = system.actorOf(Props[Supervisor], "supervisor") for (count ← 0 until rounds) { createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) runOn(masterRoles: _*) { reportResult { roles.take(nbrUsedRoles) foreach { r ⇒ supervisor ! Props[RemoteChild].withDeploy(Deploy(scope = RemoteScope(address(r)))) } supervisor ! GetChildrenCount expectMsgType[ChildrenCount] should be(ChildrenCount(nbrUsedRoles, 0)) 1 to 5 foreach { _ ⇒ supervisor ! new RuntimeException("Simulated exception") } awaitAssert { supervisor ! GetChildrenCount val c = expectMsgType[ChildrenCount] c should be(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles)) } // after 5 restart attempts the children should be stopped supervisor ! new RuntimeException("Simulated exception") awaitAssert { supervisor ! GetChildrenCount val c = expectMsgType[ChildrenCount] // zero children c should be(ChildrenCount(0, 6 * nbrUsedRoles)) } supervisor ! Reset } enterBarrier("supervision-done-" + step) } runOn(otherRoles: _*) { reportResult { enterBarrier("supervision-done-" + step) } } awaitClusterResult() step += 1 } } def idleGossip(title: String): Unit = { createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = true) reportResult { clusterView.members.size should be(nbrUsedRoles) Thread.sleep(idleGossipDuration.toMillis) clusterView.members.size should be(nbrUsedRoles) } awaitClusterResult() } "A cluster under stress" must { "log settings" taggedAs LongRunningTest in { if (infolog) { log.info("StressSpec JVM:\n{}", jvmInfo) runOn(roles.head) { log.info("StressSpec settings:\n{}", settings) } } enterBarrier("after-" + step) } "join seed nodes" taggedAs LongRunningTest in within(30 seconds) { val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) val size = seedNodes.size + otherNodesJoiningSeedNodes.size createResultAggregator("join seed nodes", expectedResults = size, includeInHistory = true) runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) { reportResult { cluster.joinSeedNodes(seedNodes.toIndexedSeq map address) awaitMembersUp(size, timeout = remainingOrDefault) } } awaitClusterResult() nbrUsedRoles += size enterBarrier("after-" + step) } "start routers that are running while nodes are joining" taggedAs LongRunningTest in { runOn(roles.take(3): _*) { system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), name = masterName) ! Begin } } "join nodes one-by-one to small cluster" taggedAs LongRunningTest in { joinOneByOne(numberOfNodesJoiningOneByOneSmall) enterBarrier("after-" + step) } "join several nodes to one node" taggedAs LongRunningTest in { joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = false) nbrUsedRoles += numberOfNodesJoiningToOneNode enterBarrier("after-" + step) } "join several nodes to seed nodes" taggedAs LongRunningTest in { if (numberOfNodesJoiningToSeedNodes > 0) { joinSeveral(numberOfNodesJoiningToSeedNodes, toSeedNodes = true) nbrUsedRoles += numberOfNodesJoiningToSeedNodes } enterBarrier("after-" + step) } "join nodes one-by-one to large cluster" taggedAs LongRunningTest in { joinOneByOne(numberOfNodesJoiningOneByOneLarge) enterBarrier("after-" + step) } "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) { if (exerciseActors) { runOn(roles.take(3): _*) { master match { case Some(m) ⇒ m.tell(End, testActor) val workResult = awaitWorkResult(m) workResult.retryCount should be(0) workResult.sendCount should be > (0L) workResult.ackCount should be > (0L) case None ⇒ fail("master not running") } } } enterBarrier("after-" + step) } "use routers with normal throughput" taggedAs LongRunningTest in { if (exerciseActors) { exerciseRouters("use routers with normal throughput", normalThroughputDuration, batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false) } enterBarrier("after-" + step) } "use routers with high throughput" taggedAs LongRunningTest in { if (exerciseActors) { exerciseRouters("use routers with high throughput", highThroughputDuration, batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false) } enterBarrier("after-" + step) } "use many actors with normal throughput" taggedAs LongRunningTest in { if (exerciseActors) { exerciseRouters("use many actors with normal throughput", normalThroughputDuration, batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true) } enterBarrier("after-" + step) } "use many actors with high throughput" taggedAs LongRunningTest in { if (exerciseActors) { exerciseRouters("use many actors with high throughput", highThroughputDuration, batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true) } enterBarrier("after-" + step) } "exercise join/remove/join/remove" taggedAs LongRunningTest in { exerciseJoinRemove("exercise join/remove", joinRemoveDuration) enterBarrier("after-" + step) } "exercise supervision" taggedAs LongRunningTest in { if (exerciseActors) { exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration) } enterBarrier("after-" + step) } "gossip when idle" taggedAs LongRunningTest in { idleGossip("idle gossip") enterBarrier("after-" + step) } "start routers that are running while nodes are removed" taggedAs LongRunningTest in { if (exerciseActors) { runOn(roles.take(3): _*) { system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), name = masterName) ! Begin } } enterBarrier("after-" + step) } "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in { removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false) enterBarrier("after-" + step) } "shutdown nodes one-by-one from large cluster" taggedAs LongRunningTest in { removeOneByOne(numberOfNodesShutdownOneByOneLarge, shutdown = true) enterBarrier("after-" + step) } "leave several nodes" taggedAs LongRunningTest in { removeSeveral(numberOfNodesLeaving, shutdown = false) nbrUsedRoles -= numberOfNodesLeaving enterBarrier("after-" + step) } "shutdown several nodes" taggedAs LongRunningTest in { removeSeveral(numberOfNodesShutdown, shutdown = true) nbrUsedRoles -= numberOfNodesShutdown enterBarrier("after-" + step) } "shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in { removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true) enterBarrier("after-" + step) } "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in { removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false) enterBarrier("after-" + step) } "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { if (exerciseActors) { runOn(roles.take(3): _*) { master match { case Some(m) ⇒ m.tell(End, testActor) val workResult = awaitWorkResult(m) workResult.sendCount should be > (0L) workResult.ackCount should be > (0L) case None ⇒ fail("master not running") } } } enterBarrier("after-" + step) } "log jvm info" taggedAs LongRunningTest in { if (infolog) { log.info("StressSpec JVM:\n{}", jvmInfo) } enterBarrier("after-" + step) } } } Other Akka source code examplesHere is a short list of links related to this Akka StressSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.