|
Akka/Scala example source code file (ClusterEvent.scala)
The ClusterEvent.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
/**
* Domain events published to the event bus.
* Subscribe with:
* {{{
* Cluster(system).subscribe(actorRef, classOf[ClusterDomainEvent])
* }}}
*/
object ClusterEvent {
sealed abstract class SubscriptionInitialStateMode
/**
* When using this subscription mode a snapshot of
* [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the
* subscriber as the first message.
*/
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
/**
* When using this subscription mode the events corresponding
* to the current state will be sent to the subscriber to mimic what you would
* have seen if you were listening to the events when they occurred in the past.
*/
case object InitialStateAsEvents extends SubscriptionInitialStateMode
/**
* Java API
*/
def initialStateAsSnapshot = InitialStateAsSnapshot
/**
* Java API
*/
def initialStateAsEvents = InitialStateAsEvents
/**
* Marker interface for cluster domain events.
*/
sealed trait ClusterDomainEvent
/**
* Current snapshot state of the cluster. Sent to new subscriber.
*/
final case class CurrentClusterState(
members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
unreachable: Set[Member] = Set.empty,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None,
roleLeaderMap: Map[String, Option[Address]] = Map.empty) {
/**
* Java API: get current member list.
*/
def getMembers: java.lang.Iterable[Member] = {
import scala.collection.JavaConverters._
members.asJava
}
/**
* Java API: get current unreachable set.
*/
def getUnreachable: java.util.Set[Member] =
scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava
/**
* Java API: get current “seen-by” set.
*/
def getSeenBy: java.util.Set[Address] =
scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava
/**
* Java API: get address of current leader, or null if none
*/
def getLeader: Address = leader orNull
/**
* All node roles in the cluster
*/
def allRoles: Set[String] = roleLeaderMap.keySet
/**
* Java API: All node roles in the cluster
*/
def getAllRoles: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava
/**
* get address of current leader, if any, within the role set
*/
def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)
/**
* Java API: get address of current leader within the role set,
* or null if no node with that role
*/
def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
}
/**
* Marker interface for membership events.
* Published when the state change is first seen on a node.
* The state change was performed by the leader when there was
* convergence on the leader node, i.e. all members had seen previous
* state.
*/
sealed trait MemberEvent extends ClusterDomainEvent {
def member: Member
}
/**
* Member status changed to Up.
*/
final case class MemberUp(member: Member) extends MemberEvent {
if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
}
/**
* Member status changed to [[MemberStatus.Exiting]] and will be removed
* when all members have seen the `Exiting` status.
*/
final case class MemberExited(member: Member) extends MemberEvent {
if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
}
/**
* Member completely removed from the cluster.
* When `previousStatus` is `MemberStatus.Down` the node was removed
* after being detected as unreachable and downed.
* When `previousStatus` is `MemberStatus.Exiting` the node was removed
* after graceful leaving and exiting.
*/
final case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent {
if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
}
/**
* Leader of the cluster members changed. Published when the state change
* is first seen on a node.
*/
final case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
/**
* Java API
* @return address of current leader, or null if none
*/
def getLeader: Address = leader orNull
}
/**
* First member (leader) of the members within a role set changed.
* Published when the state change is first seen on a node.
*/
final case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {
/**
* Java API
* @return address of current leader, or null if none
*/
def getLeader: Address = leader orNull
}
/**
* Marker interface to facilitate subscription of
* both [[UnreachableMember]] and [[ReachableMember]].
*/
sealed trait ReachabilityEvent extends ClusterDomainEvent
/**
* A member is considered as unreachable by the failure detector.
*/
final case class UnreachableMember(member: Member) extends ReachabilityEvent
/**
* A member is considered as reachable by the failure detector
* after having been unreachable.
* @see [[UnreachableMember]]
*/
final case class ReachableMember(member: Member) extends ReachabilityEvent
/**
* Current snapshot of cluster node metrics. Published to subscribers.
*/
final case class ClusterMetricsChanged(nodeMetrics: Set[NodeMetrics]) extends ClusterDomainEvent {
/**
* Java API
*/
def getNodeMetrics: java.lang.Iterable[NodeMetrics] =
scala.collection.JavaConverters.asJavaIterableConverter(nodeMetrics).asJava
}
/**
* INTERNAL API
* The nodes that have seen current version of the Gossip.
*/
private[cluster] final case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent
/**
* INTERNAL API
*/
private[cluster] final case class ReachabilityChanged(reachability: Reachability) extends ClusterDomainEvent
/**
* INTERNAL API
*/
private[cluster] final case class CurrentInternalStats(
gossipStats: GossipStats,
vclockStats: VectorClockStats) extends ClusterDomainEvent
/**
* INTERNAL API
*/
private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[UnreachableMember] =
if (newGossip eq oldGossip) Nil
else {
val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated
(newGossip.overview.reachability.allUnreachableOrTerminated.collect {
case node if !oldUnreachableNodes.contains(node) ⇒
UnreachableMember(newGossip.member(node))
})(collection.breakOut)
}
/**
* INTERNAL API
*/
private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachableMember] =
if (newGossip eq oldGossip) Nil
else {
(oldGossip.overview.reachability.allUnreachable.collect {
case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) ⇒
ReachableMember(newGossip.member(node))
})(collection.breakOut)
}
/**
* INTERNAL API.
*/
private[cluster] def diffMemberEvents(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[MemberEvent] =
if (newGossip eq oldGossip) Nil
else {
val newMembers = newGossip.members -- oldGossip.members
val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.uniqueAddress)
val changedMembers = membersGroupedByAddress collect {
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember
}
val memberEvents = (newMembers ++ changedMembers) collect {
case m if m.status == Up ⇒ MemberUp(m)
case m if m.status == Exiting ⇒ MemberExited(m)
// no events for other transitions
}
val removedMembers = oldGossip.members -- newGossip.members
val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed), m.status))
(new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result()
}
/**
* INTERNAL API
*/
private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = {
val newLeader = newGossip.leader
if (newLeader != oldGossip.leader) List(LeaderChanged(newLeader.map(_.address)))
else Nil
}
/**
* INTERNAL API
*/
private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip): Set[RoleLeaderChanged] = {
for {
role ← (oldGossip.allRoles ++ newGossip.allRoles)
newLeader = newGossip.roleLeader(role)
if newLeader != oldGossip.roleLeader(role)
} yield RoleLeaderChanged(role, newLeader.map(_.address))
}
/**
* INTERNAL API
*/
private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[SeenChanged] =
if (newGossip eq oldGossip) Nil
else {
val newConvergence = newGossip.convergence
val newSeenBy = newGossip.seenBy
if (newConvergence != oldGossip.convergence || newSeenBy != oldGossip.seenBy)
List(SeenChanged(newConvergence, newSeenBy.map(_.address)))
else Nil
}
/**
* INTERNAL API
*/
private[cluster] def diffReachability(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachabilityChanged] =
if (newGossip.overview.reachability eq oldGossip.overview.reachability) Nil
else List(ReachabilityChanged(newGossip.overview.reachability))
}
/**
* INTERNAL API.
* Responsible for domain event subscriptions and publishing of
* domain events to event bus.
*/
private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
var latestGossip: Gossip = Gossip.empty
override def preRestart(reason: Throwable, message: Option[Any]) {
// don't postStop when restarted, no children to stop
}
override def postStop(): Unit = {
// publish the final removed state before shutting down
publishChanges(Gossip.empty)
}
def receive = {
case PublishChanges(newGossip) ⇒ publishChanges(newGossip)
case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats)
case SendCurrentClusterState(receiver) ⇒ sendCurrentClusterState(receiver)
case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to)
case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to)
case PublishEvent(event) ⇒ publish(event)
}
def eventStream: EventStream = context.system.eventStream
/**
* The current snapshot state corresponding to latest gossip
* to mimic what you would have seen if you were listening to the events.
*/
def sendCurrentClusterState(receiver: ActorRef): Unit = {
val state = CurrentClusterState(
members = latestGossip.members,
unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member,
seenBy = latestGossip.seenBy.map(_.address),
leader = latestGossip.leader.map(_.address),
roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut))
receiver ! state
}
def subscribe(subscriber: ActorRef, initMode: SubscriptionInitialStateMode, to: Set[Class[_]]): Unit = {
initMode match {
case InitialStateAsEvents ⇒
def pub(event: AnyRef): Unit = {
if (to.exists(_.isAssignableFrom(event.getClass)))
subscriber ! event
}
publishDiff(Gossip.empty, latestGossip, pub)
case InitialStateAsSnapshot ⇒
sendCurrentClusterState(subscriber)
}
to foreach { eventStream.subscribe(subscriber, _) }
}
def unsubscribe(subscriber: ActorRef, to: Option[Class[_]]): Unit = to match {
case None ⇒ eventStream.unsubscribe(subscriber)
case Some(c) ⇒ eventStream.unsubscribe(subscriber, c)
}
def publishChanges(newGossip: Gossip): Unit = {
val oldGossip = latestGossip
// keep the latestGossip to be sent to new subscribers
latestGossip = newGossip
publishDiff(oldGossip, newGossip, publish)
}
def publishDiff(oldGossip: Gossip, newGossip: Gossip, pub: AnyRef ⇒ Unit): Unit = {
diffMemberEvents(oldGossip, newGossip) foreach pub
diffUnreachable(oldGossip, newGossip) foreach pub
diffReachable(oldGossip, newGossip) foreach pub
diffLeader(oldGossip, newGossip) foreach pub
diffRolesLeader(oldGossip, newGossip) foreach pub
// publish internal SeenState for testing purposes
diffSeen(oldGossip, newGossip) foreach pub
diffReachability(oldGossip, newGossip) foreach pub
}
def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats)
def publish(event: AnyRef): Unit = eventStream publish event
def clearState(): Unit = {
latestGossip = Gossip.empty
}
}
Other Akka source code examplesHere is a short list of links related to this Akka ClusterEvent.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.