alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (MetricsGossipSpec.scala)

This example Akka source code file (MetricsGossipSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

a, akka, cluster, concurrent, duration, implicitsender, metricscollectorfactory, metricsgossip, metricsgossipspec, nodemetrics, test, testing, time, up

The MetricsGossipSpec.scala Akka example source code

/*
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.cluster

import scala.concurrent.duration._

import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.actor.Address

import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricsCollectorFactory {

  val collector = createMetricsCollector

  "A MetricsGossip" must {
    "add new NodeMetrics" in {
      val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
      val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)

      m1.metrics.size should be > (3)
      m2.metrics.size should be > (3)

      val g1 = MetricsGossip.empty :+ m1
      g1.nodes.size should be(1)
      g1.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics))

      val g2 = g1 :+ m2
      g2.nodes.size should be(2)
      g2.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics))
      g2.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2.metrics))
    }

    "merge peer metrics" in {
      val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
      val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)

      val g1 = MetricsGossip.empty :+ m1 :+ m2
      g1.nodes.size should be(2)
      val beforeMergeNodes = g1.nodes

      val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)
      val g2 = g1 :+ m2Updated // merge peers
      g2.nodes.size should be(2)
      g2.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics))
      g2.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2Updated.metrics))
      g2.nodes collect { case peer if peer.address == m2.address ⇒ peer.timestamp should be(m2Updated.timestamp) }
    }

    "merge an existing metric set for a node and update node ring" in {
      val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
      val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)
      val m3 = NodeMetrics(Address("akka.tcp", "sys", "a", 2556), newTimestamp, collector.sample.metrics)
      val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000)

      val g1 = MetricsGossip.empty :+ m1 :+ m2
      val g2 = MetricsGossip.empty :+ m3 :+ m2Updated

      g1.nodes.map(_.address) should be(Set(m1.address, m2.address))

      // should contain nodes 1,3, and the most recent version of 2
      val mergedGossip = g1 merge g2
      mergedGossip.nodes.map(_.address) should be(Set(m1.address, m2.address, m3.address))
      mergedGossip.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics))
      mergedGossip.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2Updated.metrics))
      mergedGossip.nodeMetricsFor(m3.address).map(_.metrics) should be(Some(m3.metrics))
      mergedGossip.nodes.foreach(_.metrics.size should be > (3))
      mergedGossip.nodeMetricsFor(m2.address).map(_.timestamp) should be(Some(m2Updated.timestamp))
    }

    "get the current NodeMetrics if it exists in the local nodes" in {
      val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
      val g1 = MetricsGossip.empty :+ m1
      g1.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics))
    }

    "remove a node if it is no longer Up" in {
      val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
      val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)

      val g1 = MetricsGossip.empty :+ m1 :+ m2
      g1.nodes.size should be(2)
      val g2 = g1 remove m1.address
      g2.nodes.size should be(1)
      g2.nodes.exists(_.address == m1.address) should be(false)
      g2.nodeMetricsFor(m1.address) should be(None)
      g2.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2.metrics))
    }

    "filter nodes" in {
      val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics)
      val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics)

      val g1 = MetricsGossip.empty :+ m1 :+ m2
      g1.nodes.size should be(2)
      val g2 = g1 filter Set(m2.address)
      g2.nodes.size should be(1)
      g2.nodes.exists(_.address == m1.address) should be(false)
      g2.nodeMetricsFor(m1.address) should be(None)
      g2.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2.metrics))
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka MetricsGossipSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.