|
Akka/Scala example source code file (ClusterRemoteWatcher.scala)
The ClusterRemoteWatcher.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import scala.concurrent.duration.FiniteDuration import akka.actor.Actor import akka.actor.Address import akka.actor.Props import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved import akka.remote.FailureDetectorRegistry import akka.remote.RemoteWatcher import akka.actor.Deploy /** * INTERNAL API */ private[cluster] object ClusterRemoteWatcher { /** * Factory method for `ClusterRemoteWatcher` [[akka.actor.Props]]. */ def props( failureDetector: FailureDetectorRegistry[Address], heartbeatInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration): Props = Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval, heartbeatExpectedResponseAfter).withDeploy(Deploy.local) } /** * INTERNAL API * * Specialization of [[akka.remote.RemoteWatcher]] that keeps * track of cluster member nodes and is responsible for watchees on cluster nodes. * [[akka.actor.AddressTerminate]] is published when node is removed from cluster. * * `RemoteWatcher` handles non-cluster nodes. `ClusterRemoteWatcher` will take * over responsibility from `RemoteWatcher` if a watch is added before a node is member * of the cluster and then later becomes cluster member. */ private[cluster] class ClusterRemoteWatcher( failureDetector: FailureDetectorRegistry[Address], heartbeatInterval: FiniteDuration, unreachableReaperInterval: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher( failureDetector, heartbeatInterval, unreachableReaperInterval, heartbeatExpectedResponseAfter) { import RemoteWatcher._ val cluster = Cluster(context.system) import cluster.selfAddress var clusterNodes: Set[Address] = Set.empty override def preStart(): Unit = { super.preStart() cluster.subscribe(self, classOf[MemberEvent]) } override def postStop(): Unit = { super.postStop() cluster.unsubscribe(self) } override def receive = receiveClusterEvent orElse super.receive def receiveClusterEvent: Actor.Receive = { case WatchRemote(watchee, watcher) if clusterNodes(watchee.path.address) ⇒ () // cluster managed node, don't propagate to super case state: CurrentClusterState ⇒ clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility unreachable --= clusterNodes case MemberUp(m) ⇒ if (m.address != selfAddress) { clusterNodes += m.address takeOverResponsibility(m.address) unreachable -= m.address } case MemberRemoved(m, previousStatus) ⇒ if (m.address != selfAddress) { clusterNodes -= m.address if (previousStatus == MemberStatus.Down) { quarantine(m.address, Some(m.uniqueAddress.uid)) } publishAddressTerminated(m.address) } case _: MemberEvent ⇒ // not interesting } /** * When a cluster node is added this class takes over the * responsibility for watchees on that node already handled * by super RemoteWatcher. */ def takeOverResponsibility(address: Address): Unit = { watching foreach { case (watchee, watcher) ⇒ if (watchee.path.address == address) unwatchRemote(watchee, watcher) } } } Other Akka source code examplesHere is a short list of links related to this Akka ClusterRemoteWatcher.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.