Akka/Scala example source code file (ClusterReadView.scala)
The ClusterReadView.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import java.io.Closeable import scala.collection.immutable import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props } import akka.cluster.ClusterEvent._ import akka.actor.PoisonPill import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.actor.Deploy /** * INTERNAL API * * Read view of cluster state, updated via subscription of * cluster events published on the event bus. */ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { /** * Current state */ @volatile private var _state: CurrentClusterState = CurrentClusterState() @volatile private var _reachability: Reachability = Reachability.empty /** * Current internal cluster stats, updated periodically via event bus. */ @volatile private var _latestStats = CurrentInternalStats(GossipStats(), VectorClockStats()) /** * Current cluster metrics, updated periodically via event bus. */ @volatile private var _clusterMetrics: Set[NodeMetrics] = Set.empty val selfAddress = cluster.selfAddress // create actor that subscribes to the cluster eventBus to update current read view state private val eventBusListener: ActorRef = { cluster.system.systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case e: ClusterDomainEvent ⇒ e match { case SeenChanged(convergence, seenBy) ⇒ _state = _state.copy(seenBy = seenBy) case ReachabilityChanged(reachability) ⇒ _reachability = reachability case MemberRemoved(member, _) ⇒ _state = _state.copy(members = _state.members - member, unreachable = _state.unreachable - member) case UnreachableMember(member) ⇒ // replace current member with new member (might have different status, only address is used in equals) _state = _state.copy(unreachable = _state.unreachable - member + member) case ReachableMember(member) ⇒ _state = _state.copy(unreachable = _state.unreachable - member) case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) val newUnreachable = if (_state.unreachable.contains(event.member)) _state.unreachable - event.member + event.member else _state.unreachable _state = _state.copy(members = _state.members - event.member + event.member, unreachable = newUnreachable) case LeaderChanged(leader) ⇒ _state = _state.copy(leader = leader) case RoleLeaderChanged(role, leader) ⇒ _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role -> leader)) case stats: CurrentInternalStats ⇒ _latestStats = stats case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes } case s: CurrentClusterState ⇒ _state = s } }).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener") } def state: CurrentClusterState = _state def self: Member = { import cluster.selfUniqueAddress state.members.find(_.uniqueAddress == selfUniqueAddress). getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed)) } /** * Returns true if this cluster instance has be shutdown. */ def isTerminated: Boolean = cluster.isTerminated /** * Current cluster members, sorted by address. */ def members: immutable.SortedSet[Member] = state.members /** * Members that has been detected as unreachable. */ def unreachableMembers: Set[Member] = state.unreachable /** * Member status for this node ([[akka.cluster.MemberStatus]]). * * NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state * and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the * model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`. */ def status: MemberStatus = self.status /** * Is this node the leader? */ def isLeader: Boolean = leader == Some(selfAddress) /** * Get the address of the current leader. */ def leader: Option[Address] = state.leader /** * Does the cluster consist of only one member? */ def isSingletonCluster: Boolean = members.size == 1 /** * Returns true if the node is not unreachable and not `Down` * and not `Removed`. */ def isAvailable: Boolean = { val myself = self !unreachableMembers.contains(myself) && myself.status != MemberStatus.Down && myself.status != MemberStatus.Removed } def reachability: Reachability = _reachability /** * Current cluster metrics. */ def clusterMetrics: Set[NodeMetrics] = _clusterMetrics /** * INTERNAL API */ private[cluster] def refreshCurrentState(): Unit = cluster.sendCurrentClusterState(eventBusListener) /** * INTERNAL API * The nodes that has seen current version of the Gossip. */ private[cluster] def seenBy: Set[Address] = state.seenBy /** * INTERNAL API */ private[cluster] def latestStats: CurrentInternalStats = _latestStats /** * Unsubscribe to cluster events. */ def close(): Unit = if (!eventBusListener.isTerminated) eventBusListener ! PoisonPill } Other Akka source code examplesHere is a short list of links related to this Akka ClusterReadView.scala source code file: |
... this post is sponsored by my books ... | |
![]() #1 New Release! |
![]() FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.