alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ClusterMetricsCollector.scala)

This example Akka source code file (ClusterMetricsCollector.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, address, akka, collection, concurrent, double, forkjoin, long, metric, metricsgossip, nodemetrics, none, option, some, time, unit

The ClusterMetricsCollector.scala Akka example source code

/*
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.cluster

import java.io.Closeable
import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
import java.lang.management.{ OperatingSystemMXBean, MemoryMXBean, ManagementFactory }
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Try, Success, Failure }
import akka.ConfigurationException
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.DynamicAccess
import akka.actor.ExtendedActorSystem
import akka.cluster.MemberStatus.Up
import akka.event.Logging
import java.lang.management.MemoryUsage

/**
 * INTERNAL API.
 *
 * Cluster metrics is primarily for load-balancing of nodes. It controls metrics sampling
 * at a regular frequency, prepares highly variable data for further analysis by other entities,
 * and publishes the latest cluster metrics data around the node ring and local eventStream
 * to assist in determining the need to redirect traffic to the least-loaded nodes.
 *
 * Metrics sampling is delegated to the [[akka.cluster.MetricsCollector]].
 *
 * Smoothing of the data for each monitored process is delegated to the
 * [[akka.cluster.EWMA]] for exponential weighted moving average.
 */
private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {

  import InternalClusterAction._
  import ClusterEvent._
  import Member.addressOrdering
  import context.dispatcher
  val cluster = Cluster(context.system)
  import cluster.{ selfAddress, scheduler, settings }
  import cluster.settings._
  import cluster.InfoLogger._

  /**
   * The node ring gossipped that contains only members that are Up.
   */
  var nodes: immutable.SortedSet[Address] = immutable.SortedSet.empty

  /**
   * The latest metric values with their statistical data.
   */
  var latestGossip: MetricsGossip = MetricsGossip.empty

  /**
   * The metrics collector that samples data on the node.
   */
  val collector: MetricsCollector = MetricsCollector(context.system.asInstanceOf[ExtendedActorSystem], settings)

  /**
   * Start periodic gossip to random nodes in cluster
   */
  val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay max MetricsGossipInterval,
    MetricsGossipInterval, self, GossipTick)

  /**
   * Start periodic metrics collection
   */
  val metricsTask = scheduler.schedule(PeriodicTasksInitialDelay max MetricsInterval,
    MetricsInterval, self, MetricsTick)

  override def preStart(): Unit = {
    cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
    logInfo("Metrics collection has started successfully")
  }

  def receive = {
    case GossipTick                 ⇒ gossip()
    case MetricsTick                ⇒ collect()
    case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg)
    case state: CurrentClusterState ⇒ receiveState(state)
    case MemberUp(m)                ⇒ addMember(m)
    case MemberRemoved(m, _)        ⇒ removeMember(m)
    case MemberExited(m)            ⇒ removeMember(m)
    case UnreachableMember(m)       ⇒ removeMember(m)
    case ReachableMember(m)         ⇒ if (m.status == Up) addMember(m)
    case _: MemberEvent             ⇒ // not interested in other types of MemberEvent

  }

  override def postStop: Unit = {
    cluster unsubscribe self
    gossipTask.cancel()
    metricsTask.cancel()
    collector.close()
  }

  /**
   * Adds a member to the node ring.
   */
  def addMember(member: Member): Unit = nodes += member.address

  /**
   * Removes a member from the member node ring.
   */
  def removeMember(member: Member): Unit = {
    nodes -= member.address
    latestGossip = latestGossip remove member.address
    publish()
  }

  /**
   * Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]].
   */
  def receiveState(state: CurrentClusterState): Unit =
    nodes = state.members collect { case m if m.status == Up ⇒ m.address }

  /**
   * Samples the latest metrics for the node, updates metrics statistics in
   * [[akka.cluster.MetricsGossip]], and publishes the change to the event bus.
   *
   * @see [[akka.cluster.ClusterMetricsCollector.collect( )]]
   */
  def collect(): Unit = {
    latestGossip :+= collector.sample()
    publish()
  }

  /**
   * Receives changes from peer nodes, merges remote with local gossip nodes, then publishes
   * changes to the event stream for load balancing router consumption, and gossip back.
   */
  def receiveGossip(envelope: MetricsGossipEnvelope): Unit = {
    // remote node might not have same view of member nodes, this side should only care
    // about nodes that are known here, otherwise removed nodes can come back
    val otherGossip = envelope.gossip.filter(nodes)
    latestGossip = latestGossip merge otherGossip
    // changes will be published in the period collect task
    if (!envelope.reply)
      replyGossipTo(envelope.from)
  }

  /**
   * Gossip to peer nodes.
   */
  def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo

  def gossipTo(address: Address): Unit =
    sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = false))

  def replyGossipTo(address: Address): Unit =
    sendGossip(address, MetricsGossipEnvelope(selfAddress, latestGossip, reply = true))

  def sendGossip(address: Address, envelope: MetricsGossipEnvelope): Unit =
    context.actorSelection(self.path.toStringWithAddress(address)) ! envelope

  def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
    if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))

  /**
   * Publishes to the event stream.
   */
  def publish(): Unit = publisher ! PublishEvent(ClusterMetricsChanged(latestGossip.nodes))

}

/**
 * INTERNAL API
 */
private[cluster] object MetricsGossip {
  val empty = MetricsGossip(Set.empty[NodeMetrics])
}

/**
 * INTERNAL API
 *
 * @param nodes metrics per node
 */
@SerialVersionUID(1L)
private[cluster] final case class MetricsGossip(nodes: Set[NodeMetrics]) {

  /**
   * Removes nodes if their correlating node ring members are not [[akka.cluster.MemberStatus.Up]]
   */
  def remove(node: Address): MetricsGossip = copy(nodes = nodes filterNot (_.address == node))

  /**
   * Only the nodes that are in the `includeNodes` Set.
   */
  def filter(includeNodes: Set[Address]): MetricsGossip =
    copy(nodes = nodes filter { includeNodes contains _.address })

  /**
   * Adds new remote [[akka.cluster.NodeMetrics]] and merges existing from a remote gossip.
   */
  def merge(otherGossip: MetricsGossip): MetricsGossip =
    otherGossip.nodes.foldLeft(this) { (gossip, nodeMetrics) ⇒ gossip :+ nodeMetrics }

  /**
   * Adds new local [[akka.cluster.NodeMetrics]], or merges an existing.
   */
  def :+(newNodeMetrics: NodeMetrics): MetricsGossip = nodeMetricsFor(newNodeMetrics.address) match {
    case Some(existingNodeMetrics) ⇒
      copy(nodes = nodes - existingNodeMetrics + (existingNodeMetrics merge newNodeMetrics))
    case None ⇒ copy(nodes = nodes + newNodeMetrics)
  }

  /**
   * Returns [[akka.cluster.NodeMetrics]] for a node if exists.
   */
  def nodeMetricsFor(address: Address): Option[NodeMetrics] = nodes find { n ⇒ n.address == address }

}

/**
 * INTERNAL API
 * Envelope adding a sender address to the gossip.
 */
@SerialVersionUID(1L)
private[cluster] final case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip, reply: Boolean)
  extends ClusterMessage

private[cluster] object EWMA {
  /**
   * math.log(2)
   */
  private val LogOf2 = 0.69315

  /**
   * Calculate the alpha (decay factor) used in [[akka.cluster.EWMA]]
   * from specified half-life and interval between observations.
   * Half-life is the interval over which the weights decrease by a factor of two.
   * The relevance of each data sample is halved for every passing half-life duration,
   * i.e. after 4 times the half-life, a data sample’s relevance is reduced to 6% of
   * its original relevance. The initial relevance of a data sample is given by
   * 1 – 0.5 ^ (collect-interval / half-life).
   */
  def alpha(halfLife: FiniteDuration, collectInterval: FiniteDuration): Double = {
    val halfLifeMillis = halfLife.toMillis
    require(halfLife.toMillis > 0, "halfLife must be > 0 s")
    val decayRate = LogOf2 / halfLifeMillis
    1 - math.exp(-decayRate * collectInterval.toMillis)
  }
}

/**
 * The exponentially weighted moving average (EWMA) approach captures short-term
 * movements in volatility for a conditional volatility forecasting model. By virtue
 * of its alpha, or decay factor, this provides a statistical streaming data model
 * that is exponentially biased towards newer entries.
 *
 * http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
 *
 * An EWMA only needs the most recent forecast value to be kept, as opposed to a standard
 * moving average model.
 *
 * INTERNAL API
 *
 * @param alpha decay factor, sets how quickly the exponential weighting decays for past data compared to new data,
 *   see http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
 *
 * @param value the current exponentially weighted moving average, e.g. Y(n - 1), or,
 *             the sampled value resulting from the previous smoothing iteration.
 *             This value is always used as the previous EWMA to calculate the new EWMA.
 *
 */
@SerialVersionUID(1L)
private[cluster] final case class EWMA(value: Double, alpha: Double) {

  require(0.0 <= alpha && alpha <= 1.0, "alpha must be between 0.0 and 1.0")

  /**
   * Calculates the exponentially weighted moving average for a given monitored data set.
   *
   * @param xn the new data point
   * @return a new [[akka.cluster.EWMA]] with the updated value
   */
  def :+(xn: Double): EWMA = {
    val newValue = (alpha * xn) + (1 - alpha) * value
    if (newValue == value) this // no change
    else copy(value = newValue)
  }

}

/**
 * Metrics key/value.
 *
 * Equality of Metric is based on its name.
 *
 * @param name the metric name
 * @param value the metric value, which must be a valid numerical value,
 *   a valid value is neither negative nor NaN/Infinite.
 * @param average the data stream of the metric value, for trending over time. Metrics that are already
 *   averages (e.g. system load average) or finite (e.g. as number of processors), are not trended.
 */
@SerialVersionUID(1L)
final case class Metric private[cluster] (name: String, value: Number, private[cluster] val average: Option[EWMA])
  extends MetricNumericConverter {

  require(defined(value), s"Invalid Metric [$name] value [$value]")

  /**
   * Updates the data point, and if defined, updates the data stream (average).
   * Returns the updated metric.
   */
  def :+(latest: Metric): Metric =
    if (this sameAs latest) average match {
      case Some(avg)                        ⇒ copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue))
      case None if latest.average.isDefined ⇒ copy(value = latest.value, average = latest.average)
      case _                                ⇒ copy(value = latest.value)
    }
    else this

  /**
   * The numerical value of the average, if defined, otherwise the latest value
   */
  def smoothValue: Double = average match {
    case Some(avg) ⇒ avg.value
    case None      ⇒ value.doubleValue
  }

  /**
   * @return true if this value is smoothed
   */
  def isSmooth: Boolean = average.isDefined

  /**
   * Returns true if <code>that</code> is tracking the same metric as this.
   */
  def sameAs(that: Metric): Boolean = name == that.name

  override def hashCode = name.##
  override def equals(obj: Any) = obj match {
    case other: Metric ⇒ sameAs(other)
    case _             ⇒ false
  }

}

/**
 * Factory for creating valid Metric instances.
 */
object Metric extends MetricNumericConverter {

  /**
   * Creates a new Metric instance if the value is valid, otherwise None
   * is returned. Invalid numeric values are negative and NaN/Infinite.
   */
  def create(name: String, value: Number, decayFactor: Option[Double]): Option[Metric] =
    if (defined(value)) Some(new Metric(name, value, ceateEWMA(value.doubleValue, decayFactor)))
    else None

  /**
   * Creates a new Metric instance if the Try is successful and the value is valid,
   * otherwise None is returned. Invalid numeric values are negative and NaN/Infinite.
   */
  def create(name: String, value: Try[Number], decayFactor: Option[Double]): Option[Metric] = value match {
    case Success(v) ⇒ create(name, v, decayFactor)
    case Failure(_) ⇒ None
  }

  private def ceateEWMA(value: Double, decayFactor: Option[Double]): Option[EWMA] = decayFactor match {
    case Some(alpha) ⇒ Some(EWMA(value, alpha))
    case None        ⇒ None
  }

}

/**
 * The snapshot of current sampled health metrics for any monitored process.
 * Collected and gossipped at regular intervals for dynamic cluster management strategies.
 *
 * Equality of NodeMetrics is based on its address.
 *
 * @param address [[akka.actor.Address]] of the node the metrics are gathered at
 * @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
 * @param metrics the set of sampled [[akka.actor.Metric]]
 */
@SerialVersionUID(1L)
final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) {

  /**
   * Returns the most recent data.
   */
  def merge(that: NodeMetrics): NodeMetrics = {
    require(address == that.address, s"merge only allowed for same address, [$address] != [$that.address]")
    if (timestamp >= that.timestamp) this // that is older
    else {
      // equality is based on the name of the Metric and Set doesn't replace existing element
      copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp)
    }
  }

  def metric(key: String): Option[Metric] = metrics.collectFirst { case m if m.name == key ⇒ m }

  /**
   * Java API
   */
  def getMetrics: java.lang.Iterable[Metric] =
    scala.collection.JavaConverters.asJavaIterableConverter(metrics).asJava

  /**
   * Returns true if <code>that</code> address is the same as this
   */
  def sameAs(that: NodeMetrics): Boolean = address == that.address

  override def hashCode = address.##
  override def equals(obj: Any) = obj match {
    case other: NodeMetrics ⇒ sameAs(other)
    case _                  ⇒ false
  }

}

/**
 * Definitions of the built-in standard metrics.
 *
 * The following extractors and data structures makes it easy to consume the
 * [[akka.cluster.NodeMetrics]] in for example load balancers.
 */
object StandardMetrics {

  // Constants for the heap related Metric names
  final val HeapMemoryUsed = "heap-memory-used"
  final val HeapMemoryCommitted = "heap-memory-committed"
  final val HeapMemoryMax = "heap-memory-max"

  // Constants for the cpu related Metric names
  final val SystemLoadAverage = "system-load-average"
  final val Processors = "processors"
  final val CpuCombined = "cpu-combined"

  object HeapMemory {

    /**
     * Given a NodeMetrics it returns the HeapMemory data if the nodeMetrics contains
     * necessary heap metrics.
     * @return if possible a tuple matching the HeapMemory constructor parameters
     */
    def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Long, Long, Option[Long])] = {
      for {
        used ← nodeMetrics.metric(HeapMemoryUsed)
        committed ← nodeMetrics.metric(HeapMemoryCommitted)
      } yield (nodeMetrics.address, nodeMetrics.timestamp,
        used.smoothValue.longValue, committed.smoothValue.longValue,
        nodeMetrics.metric(HeapMemoryMax).map(_.smoothValue.longValue))
    }

  }

  /**
   * Java API to extract HeapMemory data from nodeMetrics, if the nodeMetrics
   * contains necessary heap metrics, otherwise it returns null.
   */
  def extractHeapMemory(nodeMetrics: NodeMetrics): HeapMemory = nodeMetrics match {
    case HeapMemory(address, timestamp, used, committed, max) ⇒
      // note that above extractor returns tuple
      HeapMemory(address, timestamp, used, committed, max)
    case _ ⇒ null
  }

  /**
   * The amount of used and committed memory will always be <= max if max is defined.
   * A memory allocation may fail if it attempts to increase the used memory such that used > committed
   * even if used <= max is true (e.g. when the system virtual memory is low).
   *
   * @param address [[akka.actor.Address]] of the node the metrics are gathered at
   * @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
   * @param used the current sum of heap memory used from all heap memory pools (in bytes)
   * @param committed the current sum of heap memory guaranteed to be available to the JVM
   *   from all heap memory pools (in bytes). Committed will always be greater than or equal to used.
   * @param max the maximum amount of memory (in bytes) that can be used for JVM memory management.
   *   Can be undefined on some OS.
   */
  @SerialVersionUID(1L)
  final case class HeapMemory(address: Address, timestamp: Long, used: Long, committed: Long, max: Option[Long]) {
    require(committed > 0L, "committed heap expected to be > 0 bytes")
    require(max.isEmpty || max.get > 0L, "max heap expected to be > 0 bytes")
  }

  object Cpu {

    /**
     * Given a NodeMetrics it returns the Cpu data if the nodeMetrics contains
     * necessary cpu metrics.
     * @return if possible a tuple matching the Cpu constructor parameters
     */
    def unapply(nodeMetrics: NodeMetrics): Option[(Address, Long, Option[Double], Option[Double], Int)] = {
      for {
        processors ← nodeMetrics.metric(Processors)
      } yield (nodeMetrics.address, nodeMetrics.timestamp,
        nodeMetrics.metric(SystemLoadAverage).map(_.smoothValue),
        nodeMetrics.metric(CpuCombined).map(_.smoothValue), processors.value.intValue)
    }

  }

  /**
   * Java API to extract Cpu data from nodeMetrics, if the nodeMetrics
   * contains necessary cpu metrics, otherwise it returns null.
   */
  def extractCpu(nodeMetrics: NodeMetrics): Cpu = nodeMetrics match {
    case Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors) ⇒
      // note that above extractor returns tuple
      Cpu(address, timestamp, systemLoadAverage, cpuCombined, processors)
    case _ ⇒ null
  }

  /**
   * @param address [[akka.actor.Address]] of the node the metrics are gathered at
   * @param timestamp the time of sampling, in milliseconds since midnight, January 1, 1970 UTC
   * @param systemLoadAverage OS-specific average load on the CPUs in the system, for the past 1 minute,
   *    The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores.
   * @param cpuCombined combined CPU sum of User + Sys + Nice + Wait, in percentage ([0.0 - 1.0]. This
   *   metric can describe the amount of time the CPU spent executing code during n-interval and how
   *   much more it could theoretically.
   * @param processors the number of available processors
   */
  @SerialVersionUID(1L)
  final case class Cpu(
    address: Address,
    timestamp: Long,
    systemLoadAverage: Option[Double],
    cpuCombined: Option[Double],
    processors: Int) {

    cpuCombined match {
      case Some(x) ⇒ require(0.0 <= x && x <= 1.0, s"cpuCombined must be between [0.0 - 1.0], was [$x]")
      case None    ⇒
    }

  }

}

/**
 * INTERNAL API
 *
 * Encapsulates evaluation of validity of metric values, conversion of an actual metric value to
 * a [[akka.cluster.Metric]] for consumption by subscribed cluster entities.
 */
private[cluster] trait MetricNumericConverter {

  /**
   * An defined value is neither negative nor NaN/Infinite:
   * <ul><li>JMX system load average and max heap can be 'undefined' for certain OS, in which case a -1 is returned</li>
   * <li>SIGAR combined CPU can occasionally return a NaN or Infinite (known bug)</li></ul>
   */
  def defined(value: Number): Boolean = convertNumber(value) match {
    case Left(a)  ⇒ a >= 0
    case Right(b) ⇒ !(b < 0.0 || b.isNaN || b.isInfinite)
  }

  /**
   * May involve rounding or truncation.
   */
  def convertNumber(from: Any): Either[Long, Double] = from match {
    case n: Int        ⇒ Left(n)
    case n: Long       ⇒ Left(n)
    case n: Double     ⇒ Right(n)
    case n: Float      ⇒ Right(n)
    case n: BigInt     ⇒ Left(n.longValue)
    case n: BigDecimal ⇒ Right(n.doubleValue)
    case x             ⇒ throw new IllegalArgumentException(s"Not a number [$x]")
  }

}

/**
 * Implementations of cluster system metrics extends this trait.
 */
trait MetricsCollector extends Closeable {
  /**
   * Samples and collects new data points.
   * This method is invoked periodically and should return
   * current metrics for this node.
   */
  def sample(): NodeMetrics
}

/**
 * Loads JVM and system metrics through JMX monitoring beans.
 *
 * @param address The [[akka.actor.Address]] of the node being sampled
 * @param decay how quickly the exponential weighting of past data is decayed
 */
class JmxMetricsCollector(address: Address, decayFactor: Double) extends MetricsCollector {
  import StandardMetrics._

  private def this(cluster: Cluster) =
    this(cluster.selfAddress,
      EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval))

  /**
   * This constructor is used when creating an instance from configured FQCN
   */
  def this(system: ActorSystem) = this(Cluster(system))

  private val decayFactorOption = Some(decayFactor)

  private val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean

  private val osMBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean

  /**
   * Samples and collects new data points.
   * Creates a new instance each time.
   */
  def sample(): NodeMetrics = NodeMetrics(address, newTimestamp, metrics)

  def metrics: Set[Metric] = {
    val heap = heapMemoryUsage
    Set(systemLoadAverage, heapUsed(heap), heapCommitted(heap), heapMax(heap), processors).flatten
  }

  /**
   * JMX Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
   * On some systems the JMX OS system load average may not be available, in which case a -1 is
   * returned from JMX, and None is returned from this method.
   * Creates a new instance each time.
   */
  def systemLoadAverage: Option[Metric] = Metric.create(
    name = SystemLoadAverage,
    value = osMBean.getSystemLoadAverage,
    decayFactor = None)

  /**
   * (JMX) Returns the number of available processors
   * Creates a new instance each time.
   */
  def processors: Option[Metric] = Metric.create(
    name = Processors,
    value = osMBean.getAvailableProcessors,
    decayFactor = None)

  /**
   * Current heap to be passed in to heapUsed, heapCommitted and heapMax
   */
  def heapMemoryUsage: MemoryUsage = memoryMBean.getHeapMemoryUsage

  /**
   * (JMX) Returns the current sum of heap memory used from all heap memory pools (in bytes).
   * Creates a new instance each time.
   */
  def heapUsed(heap: MemoryUsage): Option[Metric] = Metric.create(
    name = HeapMemoryUsed,
    value = heap.getUsed,
    decayFactor = decayFactorOption)

  /**
   * (JMX) Returns the current sum of heap memory guaranteed to be available to the JVM
   * from all heap memory pools (in bytes).
   * Creates a new instance each time.
   */
  def heapCommitted(heap: MemoryUsage): Option[Metric] = Metric.create(
    name = HeapMemoryCommitted,
    value = heap.getCommitted,
    decayFactor = decayFactorOption)

  /**
   * (JMX) Returns the maximum amount of memory (in bytes) that can be used
   * for JVM memory management. If not defined the metrics value is None, i.e.
   * never negative.
   * Creates a new instance each time.
   */
  def heapMax(heap: MemoryUsage): Option[Metric] = Metric.create(
    name = HeapMemoryMax,
    value = heap.getMax,
    decayFactor = None)

  override def close(): Unit = ()

}

/**
 * Loads metrics through Hyperic SIGAR and JMX monitoring beans. This
 * loads wider and more accurate range of metrics compared to JmxMetricsCollector
 * by using SIGAR's native OS library.
 *
 * The constructor will by design throw exception if org.hyperic.sigar.Sigar can't be loaded, due
 * to missing classes or native libraries.
 *
 * @param address The [[akka.actor.Address]] of the node being sampled
 * @param decay how quickly the exponential weighting of past data is decayed
 * @param sigar the org.hyperic.Sigar instance
 */
class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef)
  extends JmxMetricsCollector(address, decayFactor) {

  import StandardMetrics._

  private def this(cluster: Cluster) =
    this(cluster.selfAddress,
      EWMA.alpha(cluster.settings.MetricsMovingAverageHalfLife, cluster.settings.MetricsInterval),
      cluster.system.dynamicAccess.createInstanceFor[AnyRef]("org.hyperic.sigar.Sigar", Nil).get)

  /**
   * This constructor is used when creating an instance from configured FQCN
   */
  def this(system: ActorSystem) = this(Cluster(system))

  private val decayFactorOption = Some(decayFactor)

  private val EmptyClassArray: Array[(Class[_])] = Array.empty[(Class[_])]
  private val LoadAverage: Option[Method] = createMethodFrom(sigar, "getLoadAverage")
  private val Cpu: Option[Method] = createMethodFrom(sigar, "getCpuPerc")
  private val CombinedCpu: Option[Method] = Try(Cpu.get.getReturnType.getMethod("getCombined")).toOption

  // Do something initially, in constructor, to make sure that the native library can be loaded.
  // This will by design throw exception if sigar isn't usable
  val pid: Long = createMethodFrom(sigar, "getPid") match {
    case Some(method) ⇒
      try method.invoke(sigar).asInstanceOf[Long] catch {
        case e: InvocationTargetException if e.getCause.isInstanceOf[LinkageError] ⇒
          // native libraries not in place
          // don't throw fatal LinkageError, but something harmless
          throw new IllegalArgumentException(e.getCause.toString)
        case e: InvocationTargetException ⇒ throw e.getCause
      }
    case None ⇒ throw new IllegalArgumentException("Wrong version of Sigar, expected 'getPid' method")
  }

  override def metrics: Set[Metric] = {
    super.metrics.filterNot(_.name == SystemLoadAverage) ++ Set(systemLoadAverage, cpuCombined).flatten
  }

  /**
   * (SIGAR / JMX) Returns the OS-specific average load on the CPUs in the system, for the past 1 minute.
   * On some systems the JMX OS system load average may not be available, in which case a -1 is returned
   * from JMX, which means that None is returned from this method.
   * Hyperic SIGAR provides more precise values, thus, if the library is on the classpath, it is the default.
   * Creates a new instance each time.
   */
  override def systemLoadAverage: Option[Metric] = Metric.create(
    name = SystemLoadAverage,
    value = Try(LoadAverage.get.invoke(sigar).asInstanceOf[Array[AnyRef]](0).asInstanceOf[Number]),
    decayFactor = None) orElse super.systemLoadAverage

  /**
   * (SIGAR) Returns the combined CPU sum of User + Sys + Nice + Wait, in percentage. This metric can describe
   * the amount of time the CPU spent executing code during n-interval and how much more it could
   * theoretically. Note that 99% CPU utilization can be optimal or indicative of failure.
   *
   * In the data stream, this will sometimes return with a valid metric value, and sometimes as a NaN or Infinite.
   * Documented bug https://bugzilla.redhat.com/show_bug.cgi?id=749121 and several others.
   *
   * Creates a new instance each time.
   */
  def cpuCombined: Option[Metric] = Metric.create(
    name = CpuCombined,
    value = Try(CombinedCpu.get.invoke(Cpu.get.invoke(sigar)).asInstanceOf[Number]),
    decayFactor = decayFactorOption)

  /**
   * Releases any native resources associated with this instance.
   */
  override def close(): Unit = Try(createMethodFrom(sigar, "close").get.invoke(sigar))

  private def createMethodFrom(ref: AnyRef, method: String, types: Array[(Class[_])] = EmptyClassArray): Option[Method] =
    Try(ref.getClass.getMethod(method, types: _*)).toOption

}

/**
 * INTERNAL API
 * Factory to create configured MetricsCollector.
 * If instantiation of SigarMetricsCollector fails (missing class or native library)
 * it falls back to use JmxMetricsCollector.
 */
private[cluster] object MetricsCollector {
  def apply(system: ExtendedActorSystem, settings: ClusterSettings): MetricsCollector = {
    import settings.{ MetricsCollectorClass ⇒ fqcn }
    def log = Logging(system, "MetricsCollector")
    if (fqcn == classOf[SigarMetricsCollector].getName) {
      Try(new SigarMetricsCollector(system)) match {
        case Success(sigarCollector) ⇒ sigarCollector
        case Failure(e) ⇒
          Cluster(system).InfoLogger.logInfo(
            "Metrics will be retreived from MBeans, and may be incorrect on some platforms. " +
              "To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate " +
              "platform-specific native libary to 'java.library.path'. Reason: " +
              e.toString)
          new JmxMetricsCollector(system)
      }

    } else {
      system.dynamicAccess.createInstanceFor[MetricsCollector](fqcn, List(classOf[ActorSystem] -> system)).
        recover {
          case e ⇒ throw new ConfigurationException("Could not create custom metrics collector [" + fqcn + "] due to:" + e.toString)
        }.get
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ClusterMetricsCollector.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.