|
Akka/Scala example source code file (Coroner.scala)
The Coroner.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.testkit import java.io.PrintStream import java.lang.management.{ ManagementFactory, ThreadInfo } import java.util.Date import java.util.concurrent.{ TimeoutException, CountDownLatch } import org.scalatest.{ BeforeAndAfterAll, Suite } import scala.annotation.tailrec import scala.concurrent.{ Promise, Awaitable, CanAwait, Await } import scala.concurrent.duration._ import scala.util.control.NonFatal /** * The Coroner can be used to print a diagnostic report of the JVM state, * including stack traces and deadlocks. A report can be printed directly, by * calling `printReport`. Alternatively, the Coroner can be asked to `watch` * the JVM and generate a report at a later time - unless the Coroner is cancelled * by that time. * * The latter method is useful for printing diagnostics in the event that, for * example, a unit test stalls and fails to cancel the Coroner in time. The * Coroner will assume that the test has "died" and print a report to aid in * debugging. */ object Coroner { /** * Used to cancel the Coroner after calling `watch`. * The result of this Awaitable will be `true` if it has been cancelled. */ trait WatchHandle extends Awaitable[Boolean] { /** * Will try to ensure that the Coroner has finished reporting. */ def cancel(): Unit } private class WatchHandleImpl(startAndStopDuration: FiniteDuration) extends WatchHandle { val cancelPromise = Promise[Boolean] val startedLatch = new CountDownLatch(1) val finishedLatch = new CountDownLatch(1) def waitForStart(): Unit = { startedLatch.await(startAndStopDuration.length, startAndStopDuration.unit) } def started(): Unit = startedLatch.countDown() def finished(): Unit = finishedLatch.countDown() def expired(): Unit = cancelPromise.trySuccess(false) override def cancel(): Unit = { cancelPromise.trySuccess(true) finishedLatch.await(startAndStopDuration.length, startAndStopDuration.unit) } override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { result(atMost) this } override def result(atMost: Duration)(implicit permit: CanAwait): Boolean = try { Await.result(cancelPromise.future, atMost) } catch { case _: TimeoutException ⇒ false } } val defaultStartAndStopDuration = 1.second /** * Ask the Coroner to print a report if it is not cancelled by the given deadline. * The returned handle can be used to perform the cancellation. * * If displayThreadCounts is set to true, then the Coroner will print thread counts during start * and stop. */ def watch(duration: FiniteDuration, reportTitle: String, out: PrintStream, startAndStopDuration: FiniteDuration = defaultStartAndStopDuration, displayThreadCounts: Boolean = false): WatchHandle = { val watchedHandle = new WatchHandleImpl(startAndStopDuration) def triggerReportIfOverdue(duration: Duration): Unit = { val threadMx = ManagementFactory.getThreadMXBean() val startThreads = threadMx.getThreadCount if (displayThreadCounts) { threadMx.resetPeakThreadCount() out.println(s"Coroner Thread Count starts at $startThreads in $reportTitle") } watchedHandle.started() try { if (!Await.result(watchedHandle, duration)) { watchedHandle.expired() out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...") try printReport(reportTitle, out) catch { case NonFatal(ex) ⇒ { out.println("Error displaying Coroner's Report") ex.printStackTrace(out) } } } } finally { if (displayThreadCounts) { val endThreads = threadMx.getThreadCount out.println(s"Coroner Thread Count started at $startThreads, ended at $endThreads, peaked at ${threadMx.getPeakThreadCount} in $reportTitle") } out.flush() watchedHandle.finished() } } new Thread(new Runnable { def run = triggerReportIfOverdue(duration) }, "Coroner").start() watchedHandle.waitForStart() watchedHandle } /** * Print a report containing diagnostic information. */ def printReport(reportTitle: String, out: PrintStream) { import out.println val osMx = ManagementFactory.getOperatingSystemMXBean() val rtMx = ManagementFactory.getRuntimeMXBean() val memMx = ManagementFactory.getMemoryMXBean() val threadMx = ManagementFactory.getThreadMXBean() println(s"""#Coroner's Report: $reportTitle #OS Architecture: ${osMx.getArch()} #Available processors: ${osMx.getAvailableProcessors()} #System load (last minute): ${osMx.getSystemLoadAverage()} #VM start time: ${new Date(rtMx.getStartTime())} #VM uptime: ${rtMx.getUptime()}ms #Heap usage: ${memMx.getHeapMemoryUsage()} #Non-heap usage: ${memMx.getNonHeapMemoryUsage()}""".stripMargin('#')) def dumpAllThreads: Seq[ThreadInfo] = { threadMx.dumpAllThreads( threadMx.isObjectMonitorUsageSupported, threadMx.isSynchronizerUsageSupported) } def findDeadlockedThreads: (Seq[ThreadInfo], String) = { val (ids, desc) = if (threadMx.isSynchronizerUsageSupported()) { (threadMx.findDeadlockedThreads(), "monitors and ownable synchronizers") } else { (threadMx.findMonitorDeadlockedThreads(), "monitors, but NOT ownable synchronizers") } if (ids == null) { (Seq.empty, desc) } else { val maxTraceDepth = 1000 // Seems deep enough (threadMx.getThreadInfo(ids, maxTraceDepth), desc) } } def printThreadInfos(threadInfos: Seq[ThreadInfo]) = { if (threadInfos.isEmpty) { println("None") } else { for (ti ← threadInfos.sortBy(_.getThreadName)) { println(threadInfoToString(ti)) } } } def threadInfoToString(ti: ThreadInfo): String = { val sb = new java.lang.StringBuilder sb.append("\"") sb.append(ti.getThreadName) sb.append("\" Id=") sb.append(ti.getThreadId) sb.append(" ") sb.append(ti.getThreadState) if (ti.getLockName != null) { sb.append(" on " + ti.getLockName) } if (ti.getLockOwnerName != null) { sb.append(" owned by \"") sb.append(ti.getLockOwnerName) sb.append("\" Id=") sb.append(ti.getLockOwnerId) } if (ti.isSuspended) { sb.append(" (suspended)") } if (ti.isInNative) { sb.append(" (in native)") } sb.append('\n') def appendMsg(msg: String, o: Any) = { sb.append(msg) sb.append(o) sb.append('\n') } val stackTrace = ti.getStackTrace for (i ← 0 until stackTrace.length) { val ste = stackTrace(i) appendMsg("\tat ", ste) if (i == 0 && ti.getLockInfo != null) { import java.lang.Thread.State._ ti.getThreadState match { case BLOCKED ⇒ appendMsg("\t- blocked on ", ti.getLockInfo) case WAITING ⇒ appendMsg("\t- waiting on ", ti.getLockInfo) case TIMED_WAITING ⇒ appendMsg("\t- waiting on ", ti.getLockInfo) case _ ⇒ } } for (mi ← ti.getLockedMonitors if mi.getLockedStackDepth == i) appendMsg("\t- locked ", mi) } val locks = ti.getLockedSynchronizers if (locks.length > 0) { appendMsg("\n\tNumber of locked synchronizers = ", locks.length) for (li ← locks) appendMsg("\t- ", li) } sb.append('\n') return sb.toString } println("All threads:") printThreadInfos(dumpAllThreads) val (deadlockedThreads, deadlockDesc) = findDeadlockedThreads println(s"Deadlocks found for $deadlockDesc:") printThreadInfos(deadlockedThreads) } } /** * Mixin for tests that should be watched by the Coroner. The `startCoroner` * and `stopCoroner` methods should be called before and after the test runs. * The Coroner will display its report if the test takes longer than the * (dilated) `expectedTestDuration` to run. * * If displayThreadCounts is set to true, then the Coroner will print thread * counts during start and stop. */ trait WatchedByCoroner { self: TestKit ⇒ @volatile private var coronerWatch: Coroner.WatchHandle = _ final def startCoroner() { coronerWatch = Coroner.watch(expectedTestDuration.dilated, getClass.getName, System.err, startAndStopDuration.dilated, displayThreadCounts) } final def stopCoroner() { coronerWatch.cancel() coronerWatch = null } def expectedTestDuration: FiniteDuration def displayThreadCounts: Boolean = true def startAndStopDuration: FiniteDuration = Coroner.defaultStartAndStopDuration } Other Akka source code examplesHere is a short list of links related to this Akka Coroner.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.