|
Akka/Scala example source code file (ClusterHeartbeat.scala)
The ClusterHeartbeat.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import language.postfixOps import scala.collection.immutable import scala.concurrent.duration._ import akka.actor.{ ActorLogging, ActorRef, ActorSelection, Address, Actor, RootActorPath, Props } import akka.cluster.ClusterEvent._ import akka.routing.MurmurHash import akka.remote.FailureDetectorRegistry import akka.remote.PriorityMessage /** * INTERNAL API. * * Receives Heartbeat messages and replies. */ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { import ClusterHeartbeatSender._ val selfHeartbeatRsp = HeartbeatRsp(Cluster(context.system).selfUniqueAddress) def receive = { case Heartbeat(from) ⇒ sender() ! selfHeartbeatRsp } } /** * INTERNAL API */ private[cluster] object ClusterHeartbeatSender { /** * Sent at regular intervals for failure detection. */ final case class Heartbeat(from: Address) extends ClusterMessage with PriorityMessage /** * Sent as reply to [[Heartbeat]] messages. */ final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with PriorityMessage // sent to self only case object HeartbeatTick final case class ExpectedFirstHeartbeat(from: UniqueAddress) } /* * INTERNAL API * * This actor is responsible for sending the heartbeat messages to * a few other nodes, which will reply and then this actor updates the * failure detector. */ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ val cluster = Cluster(context.system) import cluster.{ selfAddress, selfUniqueAddress, scheduler } import cluster.settings._ import cluster.InfoLogger._ import context.dispatcher // the failureDetector is only updated by this actor, but read from other places val failureDetector = Cluster(context.system).failureDetector val selfHeartbeat = Heartbeat(selfAddress) var state = ClusterHeartbeatSenderState( ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), MonitoredByNrOfMembers), unreachable = Set.empty[UniqueAddress], failureDetector) // start periodic heartbeat to other nodes in cluster val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay max HeartbeatInterval, HeartbeatInterval, self, HeartbeatTick) override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) } override def postStop(): Unit = { state.activeReceivers.foreach(a ⇒ failureDetector.remove(a.address)) heartbeatTask.cancel() cluster.unsubscribe(self) } /** * Looks up and returns the remote cluster heartbeat connection for the specific address. */ def heartbeatReceiver(address: Address): ActorSelection = context.actorSelection(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") def receive = initializing def initializing: Actor.Receive = { case s: CurrentClusterState ⇒ init(s) context.become(active) case HeartbeatTick ⇒ } def active: Actor.Receive = { case HeartbeatTick ⇒ heartbeat() case HeartbeatRsp(from) ⇒ heartbeatRsp(from) case MemberUp(m) ⇒ addMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) case _: MemberEvent ⇒ // not interested in other types of MemberEvent case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) } def init(snapshot: CurrentClusterState): Unit = { val nodes: Set[UniqueAddress] = snapshot.members.collect { case m if m.status == MemberStatus.Up ⇒ m.uniqueAddress }(collection.breakOut) state = state.init(nodes) } def addMember(m: Member): Unit = if (m.uniqueAddress != selfUniqueAddress) state = state.addMember(m.uniqueAddress) def removeMember(m: Member): Unit = if (m.uniqueAddress == cluster.selfUniqueAddress) { // This cluster node will be shutdown, but stop this actor immediately // to avoid further updates context stop self } else { state = state.removeMember(m.uniqueAddress) } def heartbeat(): Unit = { state.activeReceivers foreach { to ⇒ if (cluster.failureDetector.isMonitoring(to.address)) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address) else { log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address) // schedule the expected first heartbeat for later, which will give the // other side a chance to reply, and also trigger some resends if needed scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to)) } heartbeatReceiver(to.address) ! selfHeartbeat } } def heartbeatRsp(from: UniqueAddress): Unit = { log.debug("Cluster Node [{}] - Heartbeat response from [{}]", selfAddress, from.address) state = state.heartbeatRsp(from) } def triggerFirstHeartbeat(from: UniqueAddress): Unit = if (state.activeReceivers(from) && !failureDetector.isMonitoring(from.address)) { log.debug("Cluster Node [{}] - Trigger extra expected heartbeat from [{}]", selfAddress, from.address) failureDetector.heartbeat(from.address) } } /** * INTERNAL API * State of [[ClusterHeartbeatSender]]. Encapsulated to facilitate unit testing. * It is immutable, but it updates the failureDetector. */ private[cluster] final case class ClusterHeartbeatSenderState( ring: HeartbeatNodeRing, unreachable: Set[UniqueAddress], failureDetector: FailureDetectorRegistry[Address]) { val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ unreachable def selfAddress = ring.selfAddress def init(nodes: Set[UniqueAddress]): ClusterHeartbeatSenderState = copy(ring = ring.copy(nodes = nodes + selfAddress)) def addMember(node: UniqueAddress): ClusterHeartbeatSenderState = membershipChange(ring :+ node) def removeMember(node: UniqueAddress): ClusterHeartbeatSenderState = { val newState = membershipChange(ring :- node) failureDetector remove node.address if (newState.unreachable(node)) newState.copy(unreachable = newState.unreachable - node) else newState } private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = { val oldReceivers = ring.myReceivers val removedReceivers = oldReceivers -- newRing.myReceivers var newUnreachable = unreachable removedReceivers foreach { a ⇒ if (failureDetector.isAvailable(a.address)) failureDetector remove a.address else newUnreachable += a } copy(newRing, newUnreachable) } def heartbeatRsp(from: UniqueAddress): ClusterHeartbeatSenderState = if (activeReceivers(from)) { failureDetector heartbeat from.address if (unreachable(from)) { // back from unreachable, ok to stop heartbeating to it if (!ring.myReceivers(from)) failureDetector remove from.address copy(unreachable = unreachable - from) } else this } else this } /** * INTERNAL API * * Data structure for picking heartbeat receivers. The node ring is * shuffled by deterministic hashing to avoid picking physically co-located * neighbors. * * It is immutable, i.e. the methods return new instances. */ private[cluster] final case class HeartbeatNodeRing(selfAddress: UniqueAddress, nodes: Set[UniqueAddress], monitoredByNrOfMembers: Int) { require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]") private val nodeRing: immutable.SortedSet[UniqueAddress] = { implicit val ringOrdering: Ordering[UniqueAddress] = Ordering.fromLessThan[UniqueAddress] { (a, b) ⇒ val ha = a.## val hb = b.## ha < hb || (ha == hb && Member.addressOrdering.compare(a.address, b.address) < 0) } immutable.SortedSet() ++ nodes } /** * Receivers for `selfAddress`. Cached for subsequent access. */ lazy val myReceivers: immutable.Set[UniqueAddress] = receivers(selfAddress) private val useAllAsReceivers = monitoredByNrOfMembers >= (nodeRing.size - 1) /** * The receivers to use from a specified sender. */ def receivers(sender: UniqueAddress): immutable.Set[UniqueAddress] = if (useAllAsReceivers) nodeRing - sender else { val slice = nodeRing.from(sender).tail.take(monitoredByNrOfMembers) if (slice.size < monitoredByNrOfMembers) (slice ++ nodeRing.take(monitoredByNrOfMembers - slice.size)) else slice } /** * Add a node to the ring. */ def :+(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) this else copy(nodes = nodes + node) /** * Remove a node from the ring. */ def :-(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this } Other Akka source code examplesHere is a short list of links related to this Akka ClusterHeartbeat.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.