|
Akka/Scala example source code file (MetricsGossipSpec.scala)
The MetricsGossipSpec.scala Akka example source code/* * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import scala.concurrent.duration._ import akka.testkit.{ ImplicitSender, AkkaSpec } import akka.actor.Address import java.lang.System.{ currentTimeMillis ⇒ newTimestamp } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class MetricsGossipSpec extends AkkaSpec(MetricsEnabledSpec.config) with ImplicitSender with MetricsCollectorFactory { val collector = createMetricsCollector "A MetricsGossip" must { "add new NodeMetrics" in { val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics) m1.metrics.size should be > (3) m2.metrics.size should be > (3) val g1 = MetricsGossip.empty :+ m1 g1.nodes.size should be(1) g1.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics)) val g2 = g1 :+ m2 g2.nodes.size should be(2) g2.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics)) g2.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2.metrics)) } "merge peer metrics" in { val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics) val g1 = MetricsGossip.empty :+ m1 :+ m2 g1.nodes.size should be(2) val beforeMergeNodes = g1.nodes val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000) val g2 = g1 :+ m2Updated // merge peers g2.nodes.size should be(2) g2.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics)) g2.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2Updated.metrics)) g2.nodes collect { case peer if peer.address == m2.address ⇒ peer.timestamp should be(m2Updated.timestamp) } } "merge an existing metric set for a node and update node ring" in { val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics) val m3 = NodeMetrics(Address("akka.tcp", "sys", "a", 2556), newTimestamp, collector.sample.metrics) val m2Updated = m2 copy (metrics = collector.sample.metrics, timestamp = m2.timestamp + 1000) val g1 = MetricsGossip.empty :+ m1 :+ m2 val g2 = MetricsGossip.empty :+ m3 :+ m2Updated g1.nodes.map(_.address) should be(Set(m1.address, m2.address)) // should contain nodes 1,3, and the most recent version of 2 val mergedGossip = g1 merge g2 mergedGossip.nodes.map(_.address) should be(Set(m1.address, m2.address, m3.address)) mergedGossip.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics)) mergedGossip.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2Updated.metrics)) mergedGossip.nodeMetricsFor(m3.address).map(_.metrics) should be(Some(m3.metrics)) mergedGossip.nodes.foreach(_.metrics.size should be > (3)) mergedGossip.nodeMetricsFor(m2.address).map(_.timestamp) should be(Some(m2Updated.timestamp)) } "get the current NodeMetrics if it exists in the local nodes" in { val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val g1 = MetricsGossip.empty :+ m1 g1.nodeMetricsFor(m1.address).map(_.metrics) should be(Some(m1.metrics)) } "remove a node if it is no longer Up" in { val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics) val g1 = MetricsGossip.empty :+ m1 :+ m2 g1.nodes.size should be(2) val g2 = g1 remove m1.address g2.nodes.size should be(1) g2.nodes.exists(_.address == m1.address) should be(false) g2.nodeMetricsFor(m1.address) should be(None) g2.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2.metrics)) } "filter nodes" in { val m1 = NodeMetrics(Address("akka.tcp", "sys", "a", 2554), newTimestamp, collector.sample.metrics) val m2 = NodeMetrics(Address("akka.tcp", "sys", "a", 2555), newTimestamp, collector.sample.metrics) val g1 = MetricsGossip.empty :+ m1 :+ m2 g1.nodes.size should be(2) val g2 = g1 filter Set(m2.address) g2.nodes.size should be(1) g2.nodes.exists(_.address == m1.address) should be(false) g2.nodeMetricsFor(m1.address) should be(None) g2.nodeMetricsFor(m2.address).map(_.metrics) should be(Some(m2.metrics)) } } } Other Akka source code examplesHere is a short list of links related to this Akka MetricsGossipSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.