|
Akka/Scala example source code file (MBeanSpec.scala)
The MBeanSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import language.postfixOps import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import java.lang.management.ManagementFactory import javax.management.InstanceNotFoundException import javax.management.ObjectName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.util.Try object MBeanMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" akka.cluster.jmx.enabled = on """)).withFallback(MultiNodeClusterSpec.clusterConfig)) } class MBeanMultiJvmNode1 extends MBeanSpec class MBeanMultiJvmNode2 extends MBeanSpec class MBeanMultiJvmNode3 extends MBeanSpec class MBeanMultiJvmNode4 extends MBeanSpec abstract class MBeanSpec extends MultiNodeSpec(MBeanMultiJvmSpec) with MultiNodeClusterSpec { import MBeanMultiJvmSpec._ import ClusterEvent._ val mbeanName = new ObjectName("akka:type=Cluster") lazy val mbeanServer = ManagementFactory.getPlatformMBeanServer "Cluster MBean" must { "expose attributes" taggedAs LongRunningTest in { val info = mbeanServer.getMBeanInfo(mbeanName) info.getAttributes.map(_.getName).toSet should be(Set( "ClusterStatus", "Members", "Unreachable", "MemberStatus", "Leader", "Singleton", "Available")) enterBarrier("after-1") } "expose operations" taggedAs LongRunningTest in { val info = mbeanServer.getMBeanInfo(mbeanName) info.getOperations.map(_.getName).toSet should be(Set( "join", "leave", "down")) enterBarrier("after-2") } "change attributes after startup" taggedAs LongRunningTest in { runOn(first) { mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] should be(false) mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] should be(false) mbeanServer.getAttribute(mbeanName, "Leader") should be("") mbeanServer.getAttribute(mbeanName, "Members") should be("") mbeanServer.getAttribute(mbeanName, "Unreachable") should be("") mbeanServer.getAttribute(mbeanName, "MemberStatus") should be("Removed") } awaitClusterUp(first) runOn(first) { awaitAssert(mbeanServer.getAttribute(mbeanName, "MemberStatus") should be("Up")) awaitAssert(mbeanServer.getAttribute(mbeanName, "Leader") should be(address(first).toString)) mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] should be(true) mbeanServer.getAttribute(mbeanName, "Members") should be(address(first).toString) mbeanServer.getAttribute(mbeanName, "Unreachable") should be("") mbeanServer.getAttribute(mbeanName, "Available").asInstanceOf[Boolean] should be(true) } enterBarrier("after-3") } "support join" taggedAs LongRunningTest in { runOn(second, third, fourth) { mbeanServer.invoke(mbeanName, "join", Array(address(first).toString), Array("java.lang.String")) } enterBarrier("joined") awaitMembersUp(4) assertMembers(clusterView.members, roles.map(address(_)): _*) awaitAssert(mbeanServer.getAttribute(mbeanName, "MemberStatus") should be("Up")) val expectedMembers = roles.sorted.map(address(_)).mkString(",") awaitAssert(mbeanServer.getAttribute(mbeanName, "Members") should be(expectedMembers)) val expectedLeader = address(roleOfLeader()) awaitAssert(mbeanServer.getAttribute(mbeanName, "Leader") should be(expectedLeader.toString)) mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] should be(false) enterBarrier("after-4") } val fourthAddress = address(fourth) "format cluster status as JSON with full reachability info" taggedAs LongRunningTest in within(30 seconds) { runOn(first) { testConductor.exit(fourth, 0).await } enterBarrier("fourth-shutdown") runOn(first, second, third) { awaitAssert(mbeanServer.getAttribute(mbeanName, "Unreachable") should be(fourthAddress.toString)) val expectedMembers = Seq(first, second, third, fourth).sorted.map(address(_)).mkString(",") awaitAssert(mbeanServer.getAttribute(mbeanName, "Members") should be(expectedMembers)) } enterBarrier("fourth-unreachable") runOn(first) { val sortedNodes = Vector(first, second, third, fourth).sorted.map(address(_)) val unreachableObservedBy = Vector(first, second, third).sorted.map(address(_)) val expectedJson = s"""{ | "self-address": "${address(first)}", | "members": [ | { | "address": "${sortedNodes(0)}", | "status": "Up" | }, | { | "address": "${sortedNodes(1)}", | "status": "Up" | }, | { | "address": "${sortedNodes(2)}", | "status": "Up" | }, | { | "address": "${sortedNodes(3)}", | "status": "Up" | } | ], | "unreachable": [ | { | "node": "${address(fourth)}", | "observed-by": [ | "${unreachableObservedBy(0)}", | "${unreachableObservedBy(1)}", | "${unreachableObservedBy(2)}" | ] | } | ] |} |""".stripMargin // awaitAssert to make sure that all nodes detects unreachable within(15.seconds) { awaitAssert(mbeanServer.getAttribute(mbeanName, "ClusterStatus") should be(expectedJson)) } } enterBarrier("after-5") } "support down" taggedAs LongRunningTest in within(20 seconds) { // fourth unreachable in previous step runOn(second) { mbeanServer.invoke(mbeanName, "down", Array(fourthAddress.toString), Array("java.lang.String")) } enterBarrier("fourth-down") runOn(first, second, third) { awaitMembersUp(3, canNotBePartOfMemberRing = Set(fourthAddress)) assertMembers(clusterView.members, first, second, third) awaitAssert(mbeanServer.getAttribute(mbeanName, "Unreachable") should be("")) } enterBarrier("after-6") } "support leave" taggedAs LongRunningTest in within(20 seconds) { runOn(second) { mbeanServer.invoke(mbeanName, "leave", Array(address(third).toString), Array("java.lang.String")) } enterBarrier("third-left") runOn(first, second) { awaitMembersUp(2) assertMembers(clusterView.members, first, second) val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(",") awaitAssert(mbeanServer.getAttribute(mbeanName, "Members") should be(expectedMembers)) } runOn(third) { awaitCond(cluster.isTerminated) // mbean should be unregistered, i.e. throw InstanceNotFoundException awaitAssert(intercept[InstanceNotFoundException] { mbeanServer.getMBeanInfo(mbeanName) }) } enterBarrier("after-7") } } } Other Akka source code examplesHere is a short list of links related to this Akka MBeanSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.