|
Akka/Scala example source code file (ClusterMetricsCollector.scala)
The ClusterMetricsCollector.scala Akka example source code
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import java.io.Closeable
import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Try, Success, Failure }
import akka.ConfigurationException
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.DynamicAccess
import akka.actor.ExtendedActorSystem
import akka.cluster.MemberStatus.Up
import akka.event.Logging
import java.lang.management.MemoryUsage
/**
* INTERNAL API.
*
* Cluster metrics is primarily for load-balancing of nodes. It controls metrics sampling
* at a regular frequency, prepares highly variable data for further analysis by other entities,
* and publishes the latest cluster metrics data around the node ring and local eventStream
* to assist in determining the need to redirect traffic to the least-loaded nodes.
*
* Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]].
*
* Smoothing of the data for each monitored process is delegated to the
* [[akka.cluster.EWMA]] for exponential weighted moving average.
*/
private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {
import InternalClusterAction._
import ClusterEvent._
import Member.addressOrdering
import context.dispatcher
val cluster = Cluster(context.system)
import cluster.{ selfAddress, scheduler, settings }
import cluster.settings._
import cluster.InfoLogger._
/**
* The node ring gossipped that contains only members that are Up.
*/
var nodes: immutable.SortedSet[Address] = immutable.SortedSet.empty
/**
* The latest metric values with their statistical data.
*/
var latestGossip: MetricsGossip = MetricsGossip.empty
/**
* The metrics collector that samples data on the node.
*/
val collector: MetricsCollector = MetricsCollector(context.system.asInstanceOf[ExtendedActorSystem], settings)
/**
* Start periodic gossip to random nodes in cluster
*/
val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay max MetricsGossipInterval,
MetricsGossipInterval, self, GossipTick)
/**
* Start periodic metrics collection
*/
val metricsTask = scheduler.schedule(PeriodicTasksInitialDelay max MetricsInterval,
MetricsInterval, self, MetricsTick)
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
logInfo("Metrics collection has started successfully")
}
def receive = {
case GossipTick ⇒ gossip()
case MetricsTick ⇒ collect()
case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg)
case state: CurrentClusterState ⇒ receiveState(state)
case MemberUp(m) ⇒ addMember(m)
case MemberRemoved(m, _) ⇒ removeMember(m)
case MemberExited(m) ⇒ removeMember(m)
case UnreachableMember(m) ⇒ removeMember(m)
case ReachableMember(m) ⇒ if (m.status == Up) addMember(m)
case _: MemberEvent ⇒ // not interested in other types of MemberEvent
}
override def postStop: Unit = {
cluster unsubscribe self
gossipTask.cancel()
metricsTask.cancel()
collector.close()
}
/**
* Adds a member to the node ring.
*/
def addMember(member: Member): Unit = nodes += member.address
/**
* Removes a member from the member node ring.
*/
def removeMember(member: Member): Unit = {
nodes -= member.address
latestGossip = latestGossip remove member.address
publish()
}
/**
* Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]].
*/
def receiveState(state: CurrentClusterState): Unit =
nodes = state.members collect { case m if m.status == Up ⇒ m.address }
/**
* Samples the latest metrics for the node, updates metrics statistics in
* [[akka.cluster.MetricsGossip]], and publishes the change to the event bus.
*
* @see [[akka.cluster.ClusterMetricsCollector.collect( )]]
*/
def collect(): Unit = {
latestGossip :+= collector.sample()
publish()
}
/**
* Receives changes from peer nodes, merges remote with local gossip nodes, then publishes
* changes to the event stream for load balancing router consumption, and gossip back.
*/
def receiveGossip(envelope: MetricsGossipEnvelope): Unit = {
// remote node might not have same view of member nodes, this side should only care
// about nodes that are known here, otherwise removed nodes can come back
val otherGossip = envelope.gossip.filter(nodes)
latestGossip = latestGossip merge otherGossip
// changes will be published in the period collect task
if (!envelope.reply)
replyGossipTo(envelope.from)
}
/**
* Gossip to peer nodes.
*/
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo
def gossipTo(address: Address): Unit =
sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = false))
def replyGossipTo(address: Address): Unit =
sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = true))
def sendGossip(address: Address, envelope: MetricsGossipEnvelope): Unit =
context.actorSelection(self.path.toStringWithAddress(address)) ! envelope
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
/**
* Publishes to the event stream.
*/
def publish(): Unit = publisher ! PublishEvent(ClusterMetricsChanged(latestGossip.nodes))
}
/**
* INTERNAL API
*/
private[cluster] object MetricsGossip {
val empty = MetricsGossip(Set.empty[NodeMetrics])
}
/**
* INTERNAL API
*
* @param nodes metrics per node
*/
@SerialVersionUID(1L)
private[cluster] final case class MetricsGossip(nodes: Set[NodeMetrics]) {
/**
* Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]]
*/
def remove(node: Address): MetricsGossip = copy(nodes = nodes filterNot (_.address == node))
/**
* Only the nodes that are in the `includeNodes` Set.
*/
def filter(includeNodes: Set[Address]): MetricsGossip =
copy(nodes = nodes filter { includeNodes contains _.address })
/**
* Adds new remote [[akka.cluster.NodeMetrics]] and merges existing from a remote gossip.
*/
def merge(otherGossip: MetricsGossip): MetricsGossip =
otherGossip.nodes.foldLeft(this) { (gossip, nodeMetrics) ⇒ gossip :+ nodeMetrics }
/**
* Adds new local [[akka.cluster.NodeMetrics]], or merges an existing.
*/
def :+(newNodeMetrics: NodeMetrics): MetricsGossip = nodeMetricsFor(newNodeMetrics.address) match {
case Some(existingNodeMetrics) ⇒
copy(nodes = nodes - existingNodeMetrics + (existingNodeMetrics merge newNodeMetrics))
case None ⇒ copy(nodes = nodes + newNodeMetrics)
}
/**
* Returns [[akka.cluster.NodeMetrics]] for a node if exists.
*/
def nodeMetricsFor(address: Address): Option[NodeMetrics] = nodes find { n ⇒ n.address == address }
}
/**
* INTERNAL API
* Envelope adding a sender address to the gossip.
*/
@SerialVersionUID(1L)
private[cluster] final case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean)
extends ClusterMessage
private[cluster] object EWMA {
/**
* math.log(2)
*/
private val LogOf2 = 0.69315
/**
* Calculate the alpha (decay factor) used in [[akka.cluster.EWMA]]
* from specified half-life and interval between observations.
* Half-life is the interval over which the weights decrease by a factor of two.
* The relevance of each data sample is halved for every passing half-life duration,
* i.e. after 4 times the half-life, a data sample’s relevance is reduced to 6% of
* its original relevance. The initial relevance of a data sample is given by
* 1 – 0.5 ^ (collect-interval / half-life).
*/
def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double = {
val halfLifeMillis = halfLife.toMillis
require(halfLife.toMillis > 0, "halfLife must be > 0 s")
val decayRate = LogOf2 / halfLifeMillis
1 - math.exp(-decayRate * collectInterval.toMillis)
}
}
/**
* The exponentially weighted moving average (EWMA) approach captures short-term
* movements in volatility for a conditional volatility forecasting model. By virtue
* of its alpha, or decay factor, this provides a statistical streaming data model
* that is exponentially biased towards newer entries.
*
* http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* An EWMA only needs the most recent forecast value to be kept, as opposed to a standard
* moving average model.
*
* INTERNAL API
*
* @param alpha decay factor, sets how quickly the exponential weighting decays for past data compared to new data,
* see http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
*
* @param value the current exponentially weighted moving average, e.g. Y(n - 1), or,
* the sampled value resulting from the previous smoothing iteration.
* This value is always used as the previous EWMA to calculate the new EWMA.
*
*/
@SerialVersionUID(1L)
private[cluster] final case class EWMA(value: Double, alpha: Double) {
require(0.0 <= alpha && alpha <= 1.0, "alpha must be between 0.0 and 1.0")
/**
* Calculates the exponentially weighted moving average for a given monitored data set.
*
* @param xn the new data point
* @return a new [[akka.cluster.EWMA]] with the updated value
*/
def :+(xn: Double): EWMA = {
val newValue = (alpha * xn) + (1 - alpha) * value
if (newValue == value) this // no change
else copy(value = newValue)
}
}
/**
* Metrics key/value.
*
* Equality of Metric is based on its name.
*
* @param name the metric name
* @param value the metric value, which must be a valid numerical value,
* a valid value is neither negative nor NaN/Infinite.
* @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as number of processors), are not trended.
*/
@SerialVersionUID(1L)
final case class Metric private[cluster] (name: String, value: Number, private[cluster] val average: Option[EWMA])
extends MetricNumericConverter {
require(defined(value), s"Invalid Metric [$name] value [$value]")
/**
* Updates the data point, and if defined, updates the data stream (average).
* Returns the updated metric.
*/
def :+(latest: Metric): Metric =
if (this sameAs latest) average match {
case Some(avg) ⇒ copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue))
case None if latest.average.isDefined ⇒ copy(value = latest.value, average = latest.average)
case _ ⇒ copy(value = latest.value)
}
else this
/**
* The numerical value of the average, if defined, otherwise the latest value
*/
def smoothValue: Double = average match {
case Some(avg) ⇒ avg.value
case None ⇒ value.doubleValue
}
/**
* @return true if this value is smoothed
*/
def isSmooth: Boolean = average.isDefined
/**
* Returns true if <code>that</code> is tracking the same metric as this.
*/
def sameAs(that: Metric): Boolean = name == that.name
override def hashCode = name.##
override def equals(obj: Any) = obj match {
case other: Metric ⇒ sameAs(other)
case _ ⇒ false
}
}
/**
* Factory for creating valid Metric instances.
*/
object Metric extends MetricNumericConverter {
/**
* Creates a new Metric instance if the value is valid, otherwise None
* is returned. Invalid numeric values are negative and NaN/Infinite.
*/
def create(name: String, value: Number, decayFactor: Option[Double]): Option[Metric] =
if (defined(value)) Some(new Metric(name, value, ceateEWMA(value.doubleValue, decayFactor)))
else None
/**
* Creates a new Metric instance if the Try is successful and the value is valid,
* otherwise None is returned. Invalid numeric values are negative and NaN/Infinite.
*/
def create(name: String, value: Try[Number], decayFactor: Option[Double]): Option[Metric] = value match {
case Success(v) ⇒ create(name, v, decayFactor)
case Failure(_) ⇒ None
}
private def ceateEWMA(value: Double, decayFactor: Option[Double]): Option[EWMA] = decayFactor match {
case Some(alpha) ⇒ Some(EWMA(value, alpha))
case None ⇒ None
}
}
/**
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
* Equality of NodeMetrics is based on its address.
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param metrics the set of sampled [[akka.actor.Metric]]
*/
@SerialVersionUID(1L)
final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) {
/**
* Returns the most recent data.
*/
def merge(that: NodeMetrics): NodeMetrics = {
require(address == that.address, s"merge only allowed for same address, [$address] != [$that.address]")
if (timestamp >= that.timestamp) this // that is older
else {
// equality is based on the name of the Metric and Set doesn't replace existing element
copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp)
}
}
def metric(key: String): Option[Metric] = metrics.collectFirst { case m if m.name == key ⇒ m }
/**
* Java API
*/
def getMetrics: java.lang.Iterable[Metric] =
scala.collection.JavaConverters.asJavaIterableConverter(metrics).asJava
/**
* Returns true if <code>that</code> address is the same as this
*/
def sameAs(that: NodeMetrics): Boolean = address == that.address
override def hashCode = address.##
override def equals(obj: Any) = obj match {
case other: NodeMetrics ⇒ sameAs(other)
case _ ⇒ false
}
}
/**
* Definitions of the built-in standard metrics.
*
* The following extractors and data structures makes it easy to consume the
* [[akka.cluster.NodeMetrics]] in for example load balancers.
*/
object StandardMetrics {
// Constants for the heap related Metric names
final val HeapMemoryUsed = "heap-memory-used"
final val HeapMemoryCommitted = "heap-memory-committed"
final val HeapMemoryMax = "heap-memory-max"
// Constants for the cpu related Metric names
final val SystemLoadAverage = "system-load-average"
final val Processors = "processors"
final val CpuCombined = "cpu-combined"
object HeapMemory {
/**
* Given a NodeMetrics it returns the HeapMemory data if the nodeMetrics contains
* necessary heap metrics.
* @return if possible a tuple matching the HeapMemory constructor parameters
*/
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Long, Long, Option[Long])] = {
for {
used ← nodeMetrics.metric(HeapMemoryUsed)
committed ← nodeMetrics.metric(HeapMemoryCommitted)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
used.smoothValue.longValue, committed.smoothValue.longValue,
nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue))
}
}
/**
* Java API to extract HeapMemory data from nodeMetrics, if the nodeMetrics
* contains necessary heap metrics, otherwise it returns null.
*/
def extractHeapMemory(nodeMetrics: NodeMetrics): HeapMemory = nodeMetrics match {
case HeapMemory(address, timestamp, used, committed, max) ⇒
// note that above extractor returns tuple
HeapMemory(address, timestamp, used, committed, max)
case _ ⇒ null
}
/**
* The amount of used and committed memory will always be <= max if max is defined.
* A memory allocation may fail if it attempts to increase the used memory such that used > committed
* even if used <= max is true (e.g. when the system virtual memory is low).
*
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param used the current sum of heap memory used from all heap memory pools (in bytes)
* @param committed the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes). Committed will always be greater than or equal to used.
* @param max the maximum amount of memory (in bytes) that can be used for JVM memory management.
* Can be undefined on some OS.
*/
@SerialVersionUID(1L)
final case class HeapMemory(address: Address, timestamp: Long, used: Long, committed: Long, max: Option[Long]) {
require(committed > 0L, "committed heap expected to be > 0 bytes")
require(max.isEmpty || max.get > 0L, "max heap expected to be > 0 bytes")
}
object Cpu {
/**
* Given a NodeMetrics it returns the Cpu data if the nodeMetrics contains
* necessary cpu metrics.
* @return if possible a tuple matching the Cpu constructor parameters
*/
def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Option[Double], Option[Double], Int)] = {
for {
processors ← nodeMetrics.metric(Processors)
} yield (nodeMetrics.address, nodeMetrics.timestamp,
nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue),
nodeMetrics.metric(CpuCombined).map(_.smoothValue), processors.value.intValue)
}
}
/**
* Java API to extract Cpu data from nodeMetrics, if the nodeMetrics
* contains necessary cpu metrics, otherwise it returns null.
*/
def extractCpu(nodeMetrics: NodeMetrics): Cpu = nodeMetrics match {
case Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors) ⇒
// note that above extractor returns tuple
Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
case _ ⇒ null
}
/**
* @param address [[akka.actor.Address]] of the node the metrics are gathered at
* @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
* @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute,
* The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores.
* @param cpuCombined combined CPU sum of User + Sys + Nice + Wait, in percentage ([0.0 - 1.0]. This
* metric can describe the amount of time the CPU spent executing code during n-interval and how
* much more it could theoretically.
* @param processors the number of available processors
*/
@SerialVersionUID(1L)
final case class Cpu(
address: Address,
timestamp: Long,
systemLoadAverage: Option[Double],
cpuCombined: Option[Double],
processors: Int) {
cpuCombined match {
case Some(x) ⇒ require(0.0 <= x && x <= 1.0, s"cpuCombined must be between [0.0 - 1.0], was [$x]")
case None ⇒
}
}
}
/**
* INTERNAL API
*
* Encapsulates evaluation of validity of metric values, conversion of an actual metric value to
* a [[akka.cluster.Metric]] for consumption by subscribed cluster entities.
*/
private[cluster] trait MetricNumericConverter {
/**
* An defined value is neither negative nor NaN/Infinite:
* <ul><li>JMX system load average and max heap can be 'undefined' for certain OS, in which case a -1 is returned</li>
* <li>SIGAR combined CPU can occasionally return a NaN or Infinite (known bug)</li></ul>
*/
def defined(value: Number): Boolean = convertNumber(value) match {
case Left(a) ⇒ a >= 0
case Right(b) ⇒ !(b < 0.0 || b.isNaN || b.isInfinite)
}
/**
* May involve rounding or truncation.
*/
def convertNumber(from: Any): Either[Long, Double] = from match {
case n: Int ⇒ Left(n)
case n: Long ⇒ Left(n)
case n: Double ⇒ Right(n)
case n: Float ⇒ Right(n)
case n: BigInt ⇒ Left(n.longValue)
case n: BigDecimal ⇒ Right(n.doubleValue)
case x ⇒ throw new IllegalArgumentException(s"Not a number [$x]")
}
}
/**
* Implementations of cluster system metrics extends this trait.
*/
trait MetricsCollector extends Closeable {
/**
* Samples and collects new data points.
* This method is invoked periodically and should return
* current metrics for this node.
*/
def sample(): NodeMetrics
}
/**
* Loads JVM and system metrics through JMX monitoring beans.
*
* @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
*/
class JmxMetricsCollector(address: Address, decayFactor: Double) extends MetricsCollector {
import StandardMetrics._
private def this(cluster: Cluster) =
this(cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval))
/**
* This constructor is used when creating an instance from configured FQCN
*/
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean
/**
* Samples and collects new data points.
* Creates a new instance each time.
*/
def sample(): NodeMetrics = NodeMetrics(address, newTimestamp, metrics)
def metrics: Set[Metric] = {
val heap = heapMemoryUsage
Set(systemLoadAverage, heapUsed(heap), heapCommitted(heap), heapMax(heap), processors).flatten
}
/**
* JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is
* returned from JMX, and None is returned from this method.
* Creates a new instance each time.
*/
def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
value = osMBean.getSystemLoadAverage,
decayFactor = None)
/**
* (JMX) Returns the number of available processors
* Creates a new instance each time.
*/
def processors: Option[Metric] = Metric.create(
name = Processors,
value = osMBean.getAvailableProcessors,
decayFactor = None)
/**
* Current heap to be passed in to heapUsed, heapCommitted and heapMax
*/
def heapMemoryUsage: MemoryUsage = memoryMBean.getHeapMemoryUsage
/**
* (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
* Creates a new instance each time.
*/
def heapUsed(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryUsed,
value = heap.getUsed,
decayFactor = decayFactorOption)
/**
* (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
* from all heap memory pools (in bytes).
* Creates a new instance each time.
*/
def heapCommitted(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryCommitted,
value = heap.getCommitted,
decayFactor = decayFactorOption)
/**
* (JMX) Returns the maximum amount of memory (in bytes) that can be used
* for JVM memory management. If not defined the metrics value is None, i.e.
* never negative.
* Creates a new instance each time.
*/
def heapMax(heap: MemoryUsage): Option[Metric] = Metric.create(
name = HeapMemoryMax,
value = heap.getMax,
decayFactor = None)
override def close(): Unit = ()
}
/**
* Loads metrics through Hyperic SIGAR and JMX monitoring beans. This
* loads wider and more accurate range of metrics compared to JmxMetricsCollector
* by using SIGAR's native OS library.
*
* The constructor will by design throw exception if org.hyperic.sigar.Sigar can't be loaded, due
* to missing classes or native libraries.
*
* @param address The [[akka.actor.Address]] of the node being sampled
* @param decay how quickly the exponential weighting of past data is decayed
* @param sigar the org.hyperic.Sigar instance
*/
class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef)
extends JmxMetricsCollector(address, decayFactor) {
import StandardMetrics._
private def this(cluster: Cluster) =
this(cluster.selfAddress,
EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval),
cluster.system.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil).get)
/**
* This constructor is used when creating an instance from configured FQCN
*/
def this(system: ActorSystem) = this(Cluster(system))
private val decayFactorOption = Some(decayFactor)
private val EmptyClassArray: Array[(Class[_])] = Array.empty[(Class[_])]
private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
private val Cpu: Option[Method] = createMethodFrom(sigar, "getCpuPerc")
private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption
// Do something initially, in constructor, to make sure that the native library can be loaded.
// This will by design throw exception if sigar isn't usable
val pid: Long = createMethodFrom(sigar, "getPid") match {
case Some(method) ⇒
try method.invoke(sigar).asInstanceOf[Long] catch {
case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError] ⇒
// native libraries not in place
// don't throw fatal LinkageError, but something harmless
throw new IllegalArgumentException(e.getCause.toString)
case e: InvocationTargetException ⇒ throw e.getCause
}
case None ⇒ throw new IllegalArgumentException("Wrong version of Sigar, expected 'getPid' method")
}
override def metrics: Set[Metric] = {
super.metrics.filterNot(_.name == SystemLoadAverage) ++ Set(systemLoadAverage, cpuCombined).flatten
}
/**
* (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
* On some systems the JMX OS system load average may not be available, in which case a -1 is returned
* from JMX, which means that None is returned from this method.
* Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
* Creates a new instance each time.
*/
override def systemLoadAverage: Option[Metric] = Metric.create(
name = SystemLoadAverage,
value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]](0).asInstanceOf[Number]),
decayFactor = None) orElse super.systemLoadAverage
/**
* (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
* the amount of time the CPU spent executing code during n-interval and how much more it could
* theoretically. Note that 99% CPU utilization can be optimal or indicative of failure.
*
* In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
* Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
*
* Creates a new instance each time.
*/
def cpuCombined: Option[Metric] = Metric.create(
name = CpuCombined,
value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]),
decayFactor = decayFactorOption)
/**
* Releases any native resources associated with this instance.
*/
override def close(): Unit = Try(createMethodFrom(sigar, "close").get.invoke(sigar))
private def createMethodFrom(ref: AnyRef, method: String, types: Array[(Class[_])] = EmptyClassArray): Option[Method] =
Try(ref.getClass.getMethod(method, types: _*)).toOption
}
/**
* INTERNAL API
* Factory to create configured MetricsCollector.
* If instantiation of SigarMetricsCollector fails (missing class or native library)
* it falls back to use JmxMetricsCollector.
*/
private[cluster] object MetricsCollector {
def apply(system: ExtendedActorSystem, settings: ClusterSettings): MetricsCollector = {
import settings.{ MetricsCollectorClass ⇒ fqcn }
def log = Logging(system, "MetricsCollector")
if (fqcn == classOf[SigarMetricsCollector].getName) {
Try(new SigarMetricsCollector(system)) match {
case Success(sigarCollector) ⇒ sigarCollector
case Failure(e) ⇒
Cluster(system).InfoLogger.logInfo(
"Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
"To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " +
"platform-specific native libary to 'java.library.path'. Reason: " +
e.toString)
new JmxMetricsCollector(system)
}
} else {
system.dynamicAccess.createInstanceFor[MetricsCollector](fqcn, List(classOf[ActorSystem] -> system)).
recover {
case e ⇒ throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString)
}.get
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ClusterMetricsCollector.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.