alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Reachability.scala)

This example Akka source code file (Reachability.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, annotation, cluster, collection, long, map, none, reachability, reachabilitystatus, reachable, set, some, uniqueaddress, unreachable

The Reachability.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.cluster

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.breakOut
import akka.actor.Address

/**
 * INTERNAL API
 */
private[cluster] object Reachability {
  val empty = new Reachability(Vector.empty, Map.empty)

  def apply(records: immutable.IndexedSeq[Record], versions: Map[UniqueAddress, Long]): Reachability =
    new Reachability(records, versions)

  def create(records: immutable.Seq[Record], versions: Map[UniqueAddress, Long]): Reachability = records match {
    case r: immutable.IndexedSeq[Record] ⇒ apply(r, versions)
    case _                               ⇒ apply(records.toVector, versions)
  }

  @SerialVersionUID(1L)
  final case class Record(observer: UniqueAddress, subject: UniqueAddress, status: ReachabilityStatus, version: Long)

  sealed trait ReachabilityStatus
  @SerialVersionUID(1L) case object Reachable extends ReachabilityStatus
  @SerialVersionUID(1L) case object Unreachable extends ReachabilityStatus
  @SerialVersionUID(1L) case object Terminated extends ReachabilityStatus

}

/**
 * INTERNAL API
 *
 * Immutable data structure that holds the reachability status of subject nodes as seen
 * from observer nodes. Failure detector for the subject nodes exist on the
 * observer nodes. Changes (reachable, unreachable, terminated) are only performed
 * by observer nodes to its own records. Each change bumps the version number of the
 * record, and thereby it is always possible to determine which record is newest when
 * merging two instances.
 *
 * Aggregated status of a subject node is defined as (in this order):
 * - Terminated if any observer node considers it as Terminated
 * - Unreachable if any observer node considers it as Unreachable
 * - Reachable otherwise, i.e. no observer node considers it as Unreachable
 */
@SerialVersionUID(1L)
private[cluster] class Reachability private (
  val records: immutable.IndexedSeq[Reachability.Record],
  val versions: Map[UniqueAddress, Long]) extends Serializable {

  import Reachability._

  private class Cache {
    val (observerRowsMap, allUnreachable, allTerminated) = {
      if (records.isEmpty) {
        val observerRowsMap = Map.empty[UniqueAddress, Map[UniqueAddress, Reachability.Record]]
        val allTerminated = Set.empty[UniqueAddress]
        val allUnreachable = Set.empty[UniqueAddress]
        (observerRowsMap, allUnreachable, allTerminated)
      } else {
        val mapBuilder = scala.collection.mutable.Map.empty[UniqueAddress, Map[UniqueAddress, Reachability.Record]]
        import scala.collection.mutable.SetBuilder
        val terminatedBuilder = new SetBuilder[UniqueAddress, Set[UniqueAddress]](Set.empty)
        val unreachableBuilder = new SetBuilder[UniqueAddress, Set[UniqueAddress]](Set.empty)

        records foreach { r ⇒
          val m = mapBuilder.get(r.observer) match {
            case None    ⇒ Map(r.subject -> r)
            case Some(m) ⇒ m.updated(r.subject, r)
          }
          mapBuilder += (r.observer -> m)

          if (r.status == Unreachable) unreachableBuilder += r.subject
          else if (r.status == Terminated) terminatedBuilder += r.subject
        }

        val observerRowsMap: Map[UniqueAddress, Map[UniqueAddress, Reachability.Record]] = mapBuilder.toMap
        val allTerminated: Set[UniqueAddress] = terminatedBuilder.result()
        val allUnreachable: Set[UniqueAddress] = unreachableBuilder.result() -- allTerminated

        (observerRowsMap, allUnreachable, allTerminated)
      }
    }

    val allUnreachableOrTerminated: Set[UniqueAddress] =
      if (allTerminated.isEmpty) allUnreachable
      else allUnreachable ++ allTerminated

  }

  @transient private lazy val cache = new Cache

  private def observerRows(observer: UniqueAddress): Option[Map[UniqueAddress, Reachability.Record]] =
    cache.observerRowsMap.get(observer)

  def unreachable(observer: UniqueAddress, subject: UniqueAddress): Reachability =
    change(observer, subject, Unreachable)

  def reachable(observer: UniqueAddress, subject: UniqueAddress): Reachability =
    change(observer, subject, Reachable)

  def terminated(observer: UniqueAddress, subject: UniqueAddress): Reachability =
    change(observer, subject, Terminated)

  private def currentVersion(observer: UniqueAddress): Long = versions.get(observer) match {
    case None    ⇒ 0
    case Some(v) ⇒ v
  }

  private def nextVersion(observer: UniqueAddress): Long = currentVersion(observer) + 1

  private def change(observer: UniqueAddress, subject: UniqueAddress, status: ReachabilityStatus): Reachability = {
    val v = nextVersion(observer)
    val newVersions = versions.updated(observer, v)
    val newRecord = Record(observer, subject, status, v)
    observerRows(observer) match {
      case None if status == Reachable ⇒ this
      case None ⇒
        new Reachability(records :+ newRecord, newVersions)

      case Some(oldObserverRows) ⇒

        oldObserverRows.get(subject) match {
          case None ⇒
            if (status == Reachable && oldObserverRows.forall { case (_, r) ⇒ r.status == Reachable }) {
              // all Reachable, prune by removing the records of the observer, and bump the version
              new Reachability(records.filterNot(_.observer == observer), newVersions)
            } else
              new Reachability(records :+ newRecord, newVersions)
          case Some(oldRecord) ⇒
            if (oldRecord.status == Terminated || oldRecord.status == status)
              this
            else {
              if (status == Reachable && oldObserverRows.forall { case (_, r) ⇒ r.status == Reachable || r.subject == subject }) {
                // all Reachable, prune by removing the records of the observer, and bump the version
                new Reachability(records.filterNot(_.observer == observer), newVersions)
              } else {
                val newRecords = records.updated(records.indexOf(oldRecord), newRecord)
                new Reachability(newRecords, newVersions)
              }
            }
        }
    }
  }

  def merge(allowed: immutable.Set[UniqueAddress], other: Reachability): Reachability = {
    val recordBuilder = new immutable.VectorBuilder[Record]
    recordBuilder.sizeHint(math.max(this.records.size, other.records.size))
    var newVersions = versions
    allowed foreach { observer ⇒
      val observerVersion1 = this.currentVersion(observer)
      val observerVersion2 = other.currentVersion(observer)

      (this.observerRows(observer), other.observerRows(observer)) match {
        case (None, None) ⇒
        case (Some(rows1), Some(rows2)) ⇒
          val rows = if (observerVersion1 > observerVersion2) rows1 else rows2
          recordBuilder ++= rows.collect { case (_, r) if allowed(r.subject) ⇒ r }
        case (Some(rows1), None) ⇒
          if (observerVersion1 > observerVersion2)
            recordBuilder ++= rows1.collect { case (_, r) if allowed(r.subject) ⇒ r }
        case (None, Some(rows2)) ⇒
          if (observerVersion2 > observerVersion1)
            recordBuilder ++= rows2.collect { case (_, r) if allowed(r.subject) ⇒ r }
      }

      if (observerVersion2 > observerVersion1)
        newVersions += (observer -> observerVersion2)
    }

    newVersions = newVersions.filterNot { case (k, _) ⇒ !allowed(k) }

    new Reachability(recordBuilder.result(), newVersions)
  }

  def remove(nodes: Iterable[UniqueAddress]): Reachability = {
    val nodesSet = nodes.to[immutable.HashSet]
    val newRecords = records.filterNot(r ⇒ nodesSet(r.observer) || nodesSet(r.subject))
    if (newRecords.size == records.size) this
    else {
      val newVersions = versions -- nodes
      Reachability(newRecords, newVersions)
    }
  }

  def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus =
    observerRows(observer) match {
      case None ⇒ Reachable
      case Some(observerRows) ⇒ observerRows.get(subject) match {
        case None         ⇒ Reachable
        case Some(record) ⇒ record.status
      }
    }

  def status(node: UniqueAddress): ReachabilityStatus =
    if (cache.allTerminated(node)) Terminated
    else if (cache.allUnreachable(node)) Unreachable
    else Reachable

  def isReachable(node: UniqueAddress): Boolean = isAllReachable || !allUnreachableOrTerminated.contains(node)

  def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean =
    status(observer, subject) == Reachable

  def isAllReachable: Boolean = records.isEmpty

  /**
   * Doesn't include terminated
   */
  def allUnreachable: Set[UniqueAddress] = cache.allUnreachable

  def allUnreachableOrTerminated: Set[UniqueAddress] = cache.allUnreachableOrTerminated

  /**
   * Doesn't include terminated
   */
  def allUnreachableFrom(observer: UniqueAddress): Set[UniqueAddress] =
    observerRows(observer) match {
      case None ⇒ Set.empty
      case Some(observerRows) ⇒
        observerRows.collect {
          case (subject, record) if record.status == Unreachable ⇒ subject
        }(breakOut)
    }

  def observersGroupedByUnreachable: Map[UniqueAddress, Set[UniqueAddress]] = {
    records.groupBy(_.subject).collect {
      case (subject, records) if records.exists(_.status == Unreachable) ⇒
        val observers: Set[UniqueAddress] =
          records.collect { case r if r.status == Unreachable ⇒ r.observer }(breakOut)
        (subject -> observers)
    }
  }

  def allObservers: Set[UniqueAddress] = versions.keySet

  def recordsFrom(observer: UniqueAddress): immutable.IndexedSeq[Record] = {
    observerRows(observer) match {
      case None       ⇒ Vector.empty
      case Some(rows) ⇒ rows.valuesIterator.toVector
    }
  }

  // only used for testing
  override def hashCode: Int = versions.hashCode

  // only used for testing
  override def equals(obj: Any): Boolean = obj match {
    case other: Reachability ⇒
      records.size == other.records.size && versions == versions &&
        cache.observerRowsMap == other.cache.observerRowsMap
    case _ ⇒ false
  }

  override def toString: String = {
    val rows = for {
      observer ← versions.keys.toSeq.sorted
      rowsOption = observerRows(observer)
      if rowsOption.isDefined // compilation err for subject <- rowsOption
      rows = rowsOption.get
      subject ← rows.keys.toSeq.sorted
    } yield {
      val record = rows(subject)
      val aggregated = status(subject)
      s"${observer.address} -> ${subject.address}: ${record.status} [$aggregated] (${record.version})"
    }

    rows.mkString(", ")
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka Reachability.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.