|
Akka/Scala example source code file (ClusterJmx.scala)
The ClusterJmx.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import java.lang.management.ManagementFactory import javax.management.StandardMBean import akka.event.LoggingAdapter import akka.actor.AddressFromURIString import javax.management.ObjectName import javax.management.InstanceAlreadyExistsException import javax.management.InstanceNotFoundException /** * Interface for the cluster JMX MBean. */ trait ClusterNodeMBean { /** * Member status for this node. */ def getMemberStatus: String /** * Comma separated addresses of member nodes, sorted in the cluster ring order. * The address format is `akka.tcp://actor-system-name@hostname:port` */ def getMembers: String /** * Comma separated addresses of unreachable member nodes. * The address format is `akka.tcp://actor-system-name@hostname:port` */ def getUnreachable: String /* * JSON format of the status of all nodes in the cluster as follows: * {{{ * { * "self-address": "akka.tcp://system@host1:2552", * "members": [ * { * "address": "akka.tcp://system@host1:2552", * "status": "Up" * }, * { * "address": "akka.tcp://system@host2:2552", * "status": "Up" * }, * { * "address": "akka.tcp://system@host3:2552", * "status": "Down" * }, * { * "address": "akka.tcp://system@host4:2552", * "status": "Joining" * } * ], * "unreachable": [ * { * "node": "akka.tcp://system@host2:2552", * "observed-by": [ * "akka.tcp://system@host1:2552", * "akka.tcp://system@host3:2552" * ] * }, * { * "node": "akka.tcp://system@host3:2552", * "observed-by": [ * "akka.tcp://system@host1:2552", * "akka.tcp://system@host2:2552" * ] * } * ] * } * }}} */ def getClusterStatus: String /** * Get the address of the current leader. * The address format is `akka.tcp://actor-system-name@hostname:port` */ def getLeader: String /** * Does the cluster consist of only one member? */ def isSingleton: Boolean /** * Returns true if the node is not unreachable and not `Down` * and not `Removed`. */ def isAvailable: Boolean /** * Try to join this cluster node with the node specified by 'address'. * The address format is `akka.tcp://actor-system-name@hostname:port`. * A 'Join(thisNodeAddress)' command is sent to the node to join. */ def join(address: String) /** * Send command to issue state transition to LEAVING for the node specified by 'address'. * The address format is `akka.tcp://actor-system-name@hostname:port` */ def leave(address: String) /** * Send command to DOWN the node specified by 'address'. * The address format is `akka.tcp://actor-system-name@hostname:port` */ def down(address: String) } /** * INTERNAL API */ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { private val mBeanServer = ManagementFactory.getPlatformMBeanServer private val clusterMBeanName = new ObjectName("akka:type=Cluster") private def clusterView = cluster.readView import cluster.InfoLogger._ /** * Creates the cluster JMX MBean and registers it in the MBean server. */ def createMBean() = { val mbean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean { // JMX attributes (bean-style) def getClusterStatus: String = { val members = clusterView.members.toSeq.sorted(Member.ordering).map { m ⇒ s"""{ | "address": "${m.address}", | "status": "${m.status}" | }""".stripMargin } mkString (",\n ") val unreachable = clusterView.reachability.observersGroupedByUnreachable.toSeq.sortBy(_._1).map { case (subject, observers) ⇒ s"""{ | "node": "${subject.address}", | "observed-by": [ | ${observers.toSeq.sorted.map(_.address).mkString("\"", "\",\n \"", "\"")} | ] | }""".stripMargin } mkString (",\n") s"""{ | "self-address": "${clusterView.selfAddress}", | "members": [ | ${members} | ], | "unreachable": [ | ${unreachable} | ] |} |""".stripMargin } def getMembers: String = clusterView.members.toSeq.map(_.address).mkString(",") def getUnreachable: String = clusterView.unreachableMembers.map(_.address).mkString(",") def getMemberStatus: String = clusterView.status.toString def getLeader: String = clusterView.leader.fold("")(_.toString) def isSingleton: Boolean = clusterView.isSingletonCluster def isAvailable: Boolean = clusterView.isAvailable // JMX commands def join(address: String) = cluster.join(AddressFromURIString(address)) def leave(address: String) = cluster.leave(AddressFromURIString(address)) def down(address: String) = cluster.down(AddressFromURIString(address)) } try { mBeanServer.registerMBean(mbean, clusterMBeanName) logInfo("Registered cluster JMX MBean [{}]", clusterMBeanName) } catch { case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) } } /** * Unregisters the cluster JMX MBean from MBean server. */ def unregisterMBean(): Unit = { try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { case e: InstanceNotFoundException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) } } } Other Akka source code examplesHere is a short list of links related to this Akka ClusterJmx.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.