|
Akka/Scala example source code file (Gossip.scala)
The Gossip.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import scala.collection.immutable import MemberStatus._ import akka.cluster.protobuf.ClusterMessageSerializer import scala.concurrent.duration.Deadline /** * INTERNAL API */ private[cluster] object Gossip { val emptyMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty val empty: Gossip = new Gossip(Gossip.emptyMembers) def apply(members: immutable.SortedSet[Member]) = if (members.isEmpty) empty else empty.copy(members = members) private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) } /** * INTERNAL API * * Represents the state of the cluster; cluster ring membership, ring convergence - * all versioned by a vector clock. * * When a node is joining the `Member`, with status `Joining`, is added to `members`. * If the joining node was downed it is moved from `overview.unreachable` (status `Down`) * to `members` (status `Joining`). It cannot rejoin if not first downed. * * When convergence is reached the leader change status of `members` from `Joining` * to `Up`. * * When failure detector consider a node as unavailable it will be moved from * `members` to `overview.unreachable`. * * When a node is downed, either manually or automatically, its status is changed to `Down`. * It is also removed from `overview.seen` table. The node will reside as `Down` in the * `overview.unreachable` set until joining again and it will then go through the normal * joining procedure. * * When a `Gossip` is received the version (vector clock) is used to determine if the * received `Gossip` is newer or older than the current local `Gossip`. The received `Gossip` * and local `Gossip` is merged in case of conflicting version, i.e. vector clocks without * same history. * * When a node is told by the user to leave the cluster the leader will move it to `Leaving` * and then rebalance and repartition the cluster and start hand-off by migrating the actors * from the leaving node to the new partitions. Once this process is complete the leader will * move the node to the `Exiting` state and once a convergence is complete move the node to * `Removed` by removing it from the `members` set and sending a `Removed` command to the * removed node telling it to shut itself down. */ @SerialVersionUID(1L) private[cluster] final case class Gossip( members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address overview: GossipOverview = GossipOverview(), version: VectorClock = VectorClock()) { // vector clock version if (Cluster.isAssertInvariantsEnabled) assertInvariants() private def assertInvariants(): Unit = { if (members.exists(_.status == Removed)) throw new IllegalArgumentException(s"Live members must have status [${Removed}], " + s"got [${members.filter(_.status == Removed)}]") val inReachabilityButNotMember = overview.reachability.allObservers -- members.map(_.uniqueAddress) if (inReachabilityButNotMember.nonEmpty) throw new IllegalArgumentException("Nodes not part of cluster in reachability table, got [%s]" format inReachabilityButNotMember.mkString(", ")) val seenButNotMember = overview.seen -- members.map(_.uniqueAddress) if (seenButNotMember.nonEmpty) throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" format seenButNotMember.mkString(", ")) } @transient private lazy val membersMap: Map[UniqueAddress, Member] = members.map(m ⇒ m.uniqueAddress -> m)(collection.breakOut) /** * Increments the version for this 'Node'. */ def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node) /** * Adds a member to the member node ring. */ def :+(member: Member): Gossip = { if (members contains member) this else this copy (members = members + member) } /** * Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen' */ def seen(node: UniqueAddress): Gossip = { if (seenByNode(node)) this else this copy (overview = overview copy (seen = overview.seen + node)) } /** * Marks the gossip as seen by only this node (address) by replacing the 'gossip.overview.seen' */ def onlySeen(node: UniqueAddress): Gossip = { this copy (overview = overview copy (seen = Set(node))) } /** * The nodes that have seen the current version of the Gossip. */ def seenBy: Set[UniqueAddress] = overview.seen /** * Has this Gossip been seen by this node. */ def seenByNode(node: UniqueAddress): Boolean = overview.seen(node) /** * Merges the seen table of two Gossip instances. */ def mergeSeen(that: Gossip): Gossip = this copy (overview = overview copy (seen = overview.seen ++ that.overview.seen)) /** * Merges two Gossip instances including membership tables, and the VectorClock histories. */ def merge(that: Gossip): Gossip = { import Member.ordering // 1. merge vector clocks val mergedVClock = this.version merge that.version // 2. merge members by selecting the single Member with highest MemberStatus out of the Member groups val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members) // 3. merge reachability table by picking records with highest version val mergedReachability = this.overview.reachability.merge(mergedMembers.map(_.uniqueAddress), that.overview.reachability) // 4. Nobody can have seen this new gossip yet val mergedSeen = Set.empty[UniqueAddress] Gossip(mergedMembers, GossipOverview(mergedSeen, mergedReachability), mergedVClock) } /** * Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence - * waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down). * * @return true if convergence have been reached and false if not */ def convergence: Boolean = { // First check that: // 1. we don't have any members that are unreachable, or // 2. all unreachable members in the set have status DOWN or EXITING // Else we can't continue to check for convergence // When that is done we check that all members with a convergence // status is in the seen table and has the latest vector clock // version val unreachable = overview.reachability.allUnreachableOrTerminated map member unreachable.forall(m ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) && !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress)) } def isLeader(node: UniqueAddress): Boolean = leader == Some(node) def leader: Option[UniqueAddress] = leaderOf(members) def roleLeader(role: String): Option[UniqueAddress] = leaderOf(members.filter(_.hasRole(role))) private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[UniqueAddress] = { val reachableMembers = if (overview.reachability.isAllReachable) mbrs else mbrs.filter(m ⇒ overview.reachability.isReachable(m.uniqueAddress)) if (reachableMembers.isEmpty) None else reachableMembers.find(m ⇒ Gossip.leaderMemberStatus(m.status)). orElse(Some(reachableMembers.min(Member.leaderStatusOrdering))).map(_.uniqueAddress) } def allRoles: Set[String] = members.flatMap(_.roles) def isSingletonCluster: Boolean = members.size == 1 def member(node: UniqueAddress): Member = { membersMap.getOrElse(node, Member.removed(node)) // placeholder for removed member } def hasMember(node: UniqueAddress): Boolean = membersMap.contains(node) def youngestMember: Member = { require(members.nonEmpty, "No youngest when no members") members.maxBy(m ⇒ if (m.upNumber == Int.MaxValue) 0 else m.upNumber) } override def toString = s"Gossip(members = [${members.mkString(", ")}], overview = ${overview}, version = ${version})" } /** * INTERNAL API * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. */ @SerialVersionUID(1L) private[cluster] final case class GossipOverview( seen: Set[UniqueAddress] = Set.empty, reachability: Reachability = Reachability.empty) { override def toString = s"GossipOverview(reachability = [$reachability], seen = [${seen.mkString(", ")}])" } object GossipEnvelope { def apply(from: UniqueAddress, to: UniqueAddress, gossip: Gossip): GossipEnvelope = new GossipEnvelope(from, to, gossip, null, null) def apply(from: UniqueAddress, to: UniqueAddress, serDeadline: Deadline, ser: () ⇒ Gossip): GossipEnvelope = new GossipEnvelope(from, to, null, serDeadline, ser) } /** * INTERNAL API * Envelope adding a sender and receiver address to the gossip. * The reason for including the receiver address is to be able to * ignore messages that were intended for a previous incarnation of * the node with same host:port. The `uid` in the `UniqueAddress` is * different in that case. */ @SerialVersionUID(2L) private[cluster] class GossipEnvelope private ( val from: UniqueAddress, val to: UniqueAddress, @volatile var g: Gossip, serDeadline: Deadline, @transient @volatile var ser: () ⇒ Gossip) extends ClusterMessage { def gossip: Gossip = { deserialize() g } private def deserialize(): Unit = { if ((g eq null) && (ser ne null)) { if (serDeadline.hasTimeLeft) g = ser() else g = Gossip.empty ser = null } } @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = { deserialize() this } } /** * INTERNAL API * When there are no known changes to the node ring a `GossipStatus` * initiates a gossip chat between two members. If the receiver has a newer * version it replies with a `GossipEnvelope`. If receiver has older version * it replies with its `GossipStatus`. Same versions ends the chat immediately. */ @SerialVersionUID(1L) private[cluster] final case class GossipStatus(from: UniqueAddress, version: VectorClock) extends ClusterMessage Other Akka source code examplesHere is a short list of links related to this Akka Gossip.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.