alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ClusterEvent.scala)

This example Akka source code file (ClusterEvent.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, address, akka, cluster, clusterdomainevent, collection, dispatch, event, gossip, member, memberevent, nil, option, set, subscriptioninitialstatemode, unit

The ClusterEvent.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.cluster

import language.postfixOps
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }

/**
 * Domain events published to the event bus.
 * Subscribe with:
 * {{{
 *   Cluster(system).subscribe(actorRef, classOf[ClusterDomainEvent])
 * }}}
 */
object ClusterEvent {

  sealed abstract class SubscriptionInitialStateMode
  /**
   * When using this subscription mode a snapshot of
   * [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the
   * subscriber as the first message.
   */
  case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
  /**
   * When using this subscription mode the events corresponding
   * to the current state will be sent to the subscriber to mimic what you would
   * have seen if you were listening to the events when they occurred in the past.
   */
  case object InitialStateAsEvents extends SubscriptionInitialStateMode

  /**
   * Java API
   */
  def initialStateAsSnapshot = InitialStateAsSnapshot

  /**
   * Java API
   */
  def initialStateAsEvents = InitialStateAsEvents

  /**
   * Marker interface for cluster domain events.
   */
  sealed trait ClusterDomainEvent

  /**
   * Current snapshot state of the cluster. Sent to new subscriber.
   */
  final case class CurrentClusterState(
    members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
    unreachable: Set[Member] = Set.empty,
    seenBy: Set[Address] = Set.empty,
    leader: Option[Address] = None,
    roleLeaderMap: Map[String, Option[Address]] = Map.empty) {

    /**
     * Java API: get current member list.
     */
    def getMembers: java.lang.Iterable[Member] = {
      import scala.collection.JavaConverters._
      members.asJava
    }

    /**
     * Java API: get current unreachable set.
     */
    def getUnreachable: java.util.Set[Member] =
      scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava

    /**
     * Java API: get current “seen-by” set.
     */
    def getSeenBy: java.util.Set[Address] =
      scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava

    /**
     * Java API: get address of current leader, or null if none
     */
    def getLeader: Address = leader orNull

    /**
     * All node roles in the cluster
     */
    def allRoles: Set[String] = roleLeaderMap.keySet

    /**
     * Java API: All node roles in the cluster
     */
    def getAllRoles: java.util.Set[String] =
      scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava

    /**
     * get address of current leader, if any, within the role set
     */
    def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)

    /**
     * Java API: get address of current leader within the role set,
     * or null if no node with that role
     */
    def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
  }

  /**
   * Marker interface for membership events.
   * Published when the state change is first seen on a node.
   * The state change was performed by the leader when there was
   * convergence on the leader node, i.e. all members had seen previous
   * state.
   */
  sealed trait MemberEvent extends ClusterDomainEvent {
    def member: Member
  }

  /**
   * Member status changed to Up.
   */
  final case class MemberUp(member: Member) extends MemberEvent {
    if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
  }

  /**
   * Member status changed to [[MemberStatus.Exiting]] and will be removed
   * when all members have seen the `Exiting` status.
   */
  final case class MemberExited(member: Member) extends MemberEvent {
    if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
  }

  /**
   * Member completely removed from the cluster.
   * When `previousStatus` is `MemberStatus.Down` the node was removed
   * after being detected as unreachable and downed.
   * When `previousStatus` is `MemberStatus.Exiting` the node was removed
   * after graceful leaving and exiting.
   */
  final case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent {
    if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
  }

  /**
   * Leader of the cluster members changed. Published when the state change
   * is first seen on a node.
   */
  final case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
    /**
     * Java API
     * @return address of current leader, or null if none
     */
    def getLeader: Address = leader orNull
  }

  /**
   * First member (leader) of the members within a role set changed.
   * Published when the state change is first seen on a node.
   */
  final case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {
    /**
     * Java API
     * @return address of current leader, or null if none
     */
    def getLeader: Address = leader orNull
  }

  /**
   * Marker interface to facilitate subscription of
   * both [[UnreachableMember]] and [[ReachableMember]].
   */
  sealed trait ReachabilityEvent extends ClusterDomainEvent

  /**
   * A member is considered as unreachable by the failure detector.
   */
  final case class UnreachableMember(member: Member) extends ReachabilityEvent

  /**
   * A member is considered as reachable by the failure detector
   * after having been unreachable.
   * @see [[UnreachableMember]]
   */
  final case class ReachableMember(member: Member) extends ReachabilityEvent

  /**
   * Current snapshot of cluster node metrics. Published to subscribers.
   */
  final case class ClusterMetricsChanged(nodeMetrics: Set[NodeMetrics]) extends ClusterDomainEvent {
    /**
     * Java API
     */
    def getNodeMetrics: java.lang.Iterable[NodeMetrics] =
      scala.collection.JavaConverters.asJavaIterableConverter(nodeMetrics).asJava
  }

  /**
   * INTERNAL API
   * The nodes that have seen current version of the Gossip.
   */
  private[cluster] final case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent

  /**
   * INTERNAL API
   */
  private[cluster] final case class ReachabilityChanged(reachability: Reachability) extends ClusterDomainEvent

  /**
   * INTERNAL API
   */
  private[cluster] final case class CurrentInternalStats(
    gossipStats: GossipStats,
    vclockStats: VectorClockStats) extends ClusterDomainEvent

  /**
   * INTERNAL API
   */
  private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[UnreachableMember] =
    if (newGossip eq oldGossip) Nil
    else {
      val oldUnreachableNodes = oldGossip.overview.reachability.allUnreachableOrTerminated
      (newGossip.overview.reachability.allUnreachableOrTerminated.collect {
        case node if !oldUnreachableNodes.contains(node) ⇒
          UnreachableMember(newGossip.member(node))
      })(collection.breakOut)
    }

  /**
   * INTERNAL API
   */
  private[cluster] def diffReachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachableMember] =
    if (newGossip eq oldGossip) Nil
    else {
      (oldGossip.overview.reachability.allUnreachable.collect {
        case node if newGossip.hasMember(node) && newGossip.overview.reachability.isReachable(node) ⇒
          ReachableMember(newGossip.member(node))
      })(collection.breakOut)

    }

  /**
   * INTERNAL API.
   */
  private[cluster] def diffMemberEvents(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[MemberEvent] =
    if (newGossip eq oldGossip) Nil
    else {
      val newMembers = newGossip.members -- oldGossip.members
      val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.uniqueAddress)
      val changedMembers = membersGroupedByAddress collect {
        case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember
      }
      val memberEvents = (newMembers ++ changedMembers) collect {
        case m if m.status == Up      ⇒ MemberUp(m)
        case m if m.status == Exiting ⇒ MemberExited(m)
        // no events for other transitions
      }

      val removedMembers = oldGossip.members -- newGossip.members
      val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed), m.status))

      (new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result()
    }

  /**
   * INTERNAL API
   */
  private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = {
    val newLeader = newGossip.leader
    if (newLeader != oldGossip.leader) List(LeaderChanged(newLeader.map(_.address)))
    else Nil
  }

  /**
   * INTERNAL API
   */
  private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip): Set[RoleLeaderChanged] = {
    for {
      role ← (oldGossip.allRoles ++ newGossip.allRoles)
      newLeader = newGossip.roleLeader(role)
      if newLeader != oldGossip.roleLeader(role)
    } yield RoleLeaderChanged(role, newLeader.map(_.address))
  }

  /**
   * INTERNAL API
   */
  private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[SeenChanged] =
    if (newGossip eq oldGossip) Nil
    else {
      val newConvergence = newGossip.convergence
      val newSeenBy = newGossip.seenBy
      if (newConvergence != oldGossip.convergence || newSeenBy != oldGossip.seenBy)
        List(SeenChanged(newConvergence, newSeenBy.map(_.address)))
      else Nil
    }

  /**
   * INTERNAL API
   */
  private[cluster] def diffReachability(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[ReachabilityChanged] =
    if (newGossip.overview.reachability eq oldGossip.overview.reachability) Nil
    else List(ReachabilityChanged(newGossip.overview.reachability))

}

/**
 * INTERNAL API.
 * Responsible for domain event subscriptions and publishing of
 * domain events to event bus.
 */
private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging
  with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
  import InternalClusterAction._

  var latestGossip: Gossip = Gossip.empty

  override def preRestart(reason: Throwable, message: Option[Any]) {
    // don't postStop when restarted, no children to stop
  }

  override def postStop(): Unit = {
    // publish the final removed state before shutting down
    publishChanges(Gossip.empty)
  }

  def receive = {
    case PublishChanges(newGossip)           ⇒ publishChanges(newGossip)
    case currentStats: CurrentInternalStats  ⇒ publishInternalStats(currentStats)
    case SendCurrentClusterState(receiver)   ⇒ sendCurrentClusterState(receiver)
    case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to)
    case Unsubscribe(subscriber, to)         ⇒ unsubscribe(subscriber, to)
    case PublishEvent(event)                 ⇒ publish(event)
  }

  def eventStream: EventStream = context.system.eventStream

  /**
   * The current snapshot state corresponding to latest gossip
   * to mimic what you would have seen if you were listening to the events.
   */
  def sendCurrentClusterState(receiver: ActorRef): Unit = {
    val state = CurrentClusterState(
      members = latestGossip.members,
      unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated map latestGossip.member,
      seenBy = latestGossip.seenBy.map(_.address),
      leader = latestGossip.leader.map(_.address),
      roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut))
    receiver ! state
  }

  def subscribe(subscriber: ActorRef, initMode: SubscriptionInitialStateMode, to: Set[Class[_]]): Unit = {
    initMode match {
      case InitialStateAsEvents ⇒
        def pub(event: AnyRef): Unit = {
          if (to.exists(_.isAssignableFrom(event.getClass)))
            subscriber ! event
        }
        publishDiff(Gossip.empty, latestGossip, pub)
      case InitialStateAsSnapshot ⇒
        sendCurrentClusterState(subscriber)
    }

    to foreach { eventStream.subscribe(subscriber, _) }
  }

  def unsubscribe(subscriber: ActorRef, to: Option[Class[_]]): Unit = to match {
    case None    ⇒ eventStream.unsubscribe(subscriber)
    case Some(c) ⇒ eventStream.unsubscribe(subscriber, c)
  }

  def publishChanges(newGossip: Gossip): Unit = {
    val oldGossip = latestGossip
    // keep the latestGossip to be sent to new subscribers
    latestGossip = newGossip
    publishDiff(oldGossip, newGossip, publish)
  }

  def publishDiff(oldGossip: Gossip, newGossip: Gossip, pub: AnyRef ⇒ Unit): Unit = {
    diffMemberEvents(oldGossip, newGossip) foreach pub
    diffUnreachable(oldGossip, newGossip) foreach pub
    diffReachable(oldGossip, newGossip) foreach pub
    diffLeader(oldGossip, newGossip) foreach pub
    diffRolesLeader(oldGossip, newGossip) foreach pub
    // publish internal SeenState for testing purposes
    diffSeen(oldGossip, newGossip) foreach pub
    diffReachability(oldGossip, newGossip) foreach pub
  }

  def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats)

  def publish(event: AnyRef): Unit = eventStream publish event

  def clearState(): Unit = {
    latestGossip = Gossip.empty
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ClusterEvent.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.