|
Akka/Scala example source code file (Reachability.scala)
The Reachability.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.cluster import scala.annotation.tailrec import scala.collection.immutable import scala.collection.breakOut import akka.actor.Address /** * INTERNAL API */ private[cluster] object Reachability { val empty = new Reachability(Vector.empty, Map.empty) def apply(records: immutable.IndexedSeq[Record], versions: Map[UniqueAddress, Long]): Reachability = new Reachability(records, versions) def create(records: immutable.Seq[Record], versions: Map[UniqueAddress, Long]): Reachability = records match { case r: immutable.IndexedSeq[Record] ⇒ apply(r, versions) case _ ⇒ apply(records.toVector, versions) } @SerialVersionUID(1L) final case class Record(observer: UniqueAddress, subject: UniqueAddress, status: ReachabilityStatus, version: Long) sealed trait ReachabilityStatus @SerialVersionUID(1L) case object Reachable extends ReachabilityStatus @SerialVersionUID(1L) case object Unreachable extends ReachabilityStatus @SerialVersionUID(1L) case object Terminated extends ReachabilityStatus } /** * INTERNAL API * * Immutable data structure that holds the reachability status of subject nodes as seen * from observer nodes. Failure detector for the subject nodes exist on the * observer nodes. Changes (reachable, unreachable, terminated) are only performed * by observer nodes to its own records. Each change bumps the version number of the * record, and thereby it is always possible to determine which record is newest when * merging two instances. * * Aggregated status of a subject node is defined as (in this order): * - Terminated if any observer node considers it as Terminated * - Unreachable if any observer node considers it as Unreachable * - Reachable otherwise, i.e. no observer node considers it as Unreachable */ @SerialVersionUID(1L) private[cluster] class Reachability private ( val records: immutable.IndexedSeq[Reachability.Record], val versions: Map[UniqueAddress, Long]) extends Serializable { import Reachability._ private class Cache { val (observerRowsMap, allUnreachable, allTerminated) = { if (records.isEmpty) { val observerRowsMap = Map.empty[UniqueAddress, Map[UniqueAddress, Reachability.Record]] val allTerminated = Set.empty[UniqueAddress] val allUnreachable = Set.empty[UniqueAddress] (observerRowsMap, allUnreachable, allTerminated) } else { val mapBuilder = scala.collection.mutable.Map.empty[UniqueAddress, Map[UniqueAddress, Reachability.Record]] import scala.collection.mutable.SetBuilder val terminatedBuilder = new SetBuilder[UniqueAddress, Set[UniqueAddress]](Set.empty) val unreachableBuilder = new SetBuilder[UniqueAddress, Set[UniqueAddress]](Set.empty) records foreach { r ⇒ val m = mapBuilder.get(r.observer) match { case None ⇒ Map(r.subject -> r) case Some(m) ⇒ m.updated(r.subject, r) } mapBuilder += (r.observer -> m) if (r.status == Unreachable) unreachableBuilder += r.subject else if (r.status == Terminated) terminatedBuilder += r.subject } val observerRowsMap: Map[UniqueAddress, Map[UniqueAddress, Reachability.Record]] = mapBuilder.toMap val allTerminated: Set[UniqueAddress] = terminatedBuilder.result() val allUnreachable: Set[UniqueAddress] = unreachableBuilder.result() -- allTerminated (observerRowsMap, allUnreachable, allTerminated) } } val allUnreachableOrTerminated: Set[UniqueAddress] = if (allTerminated.isEmpty) allUnreachable else allUnreachable ++ allTerminated } @transient private lazy val cache = new Cache private def observerRows(observer: UniqueAddress): Option[Map[UniqueAddress, Reachability.Record]] = cache.observerRowsMap.get(observer) def unreachable(observer: UniqueAddress, subject: UniqueAddress): Reachability = change(observer, subject, Unreachable) def reachable(observer: UniqueAddress, subject: UniqueAddress): Reachability = change(observer, subject, Reachable) def terminated(observer: UniqueAddress, subject: UniqueAddress): Reachability = change(observer, subject, Terminated) private def currentVersion(observer: UniqueAddress): Long = versions.get(observer) match { case None ⇒ 0 case Some(v) ⇒ v } private def nextVersion(observer: UniqueAddress): Long = currentVersion(observer) + 1 private def change(observer: UniqueAddress, subject: UniqueAddress, status: ReachabilityStatus): Reachability = { val v = nextVersion(observer) val newVersions = versions.updated(observer, v) val newRecord = Record(observer, subject, status, v) observerRows(observer) match { case None if status == Reachable ⇒ this case None ⇒ new Reachability(records :+ newRecord, newVersions) case Some(oldObserverRows) ⇒ oldObserverRows.get(subject) match { case None ⇒ if (status == Reachable && oldObserverRows.forall { case (_, r) ⇒ r.status == Reachable }) { // all Reachable, prune by removing the records of the observer, and bump the version new Reachability(records.filterNot(_.observer == observer), newVersions) } else new Reachability(records :+ newRecord, newVersions) case Some(oldRecord) ⇒ if (oldRecord.status == Terminated || oldRecord.status == status) this else { if (status == Reachable && oldObserverRows.forall { case (_, r) ⇒ r.status == Reachable || r.subject == subject }) { // all Reachable, prune by removing the records of the observer, and bump the version new Reachability(records.filterNot(_.observer == observer), newVersions) } else { val newRecords = records.updated(records.indexOf(oldRecord), newRecord) new Reachability(newRecords, newVersions) } } } } } def merge(allowed: immutable.Set[UniqueAddress], other: Reachability): Reachability = { val recordBuilder = new immutable.VectorBuilder[Record] recordBuilder.sizeHint(math.max(this.records.size, other.records.size)) var newVersions = versions allowed foreach { observer ⇒ val observerVersion1 = this.currentVersion(observer) val observerVersion2 = other.currentVersion(observer) (this.observerRows(observer), other.observerRows(observer)) match { case (None, None) ⇒ case (Some(rows1), Some(rows2)) ⇒ val rows = if (observerVersion1 > observerVersion2) rows1 else rows2 recordBuilder ++= rows.collect { case (_, r) if allowed(r.subject) ⇒ r } case (Some(rows1), None) ⇒ if (observerVersion1 > observerVersion2) recordBuilder ++= rows1.collect { case (_, r) if allowed(r.subject) ⇒ r } case (None, Some(rows2)) ⇒ if (observerVersion2 > observerVersion1) recordBuilder ++= rows2.collect { case (_, r) if allowed(r.subject) ⇒ r } } if (observerVersion2 > observerVersion1) newVersions += (observer -> observerVersion2) } newVersions = newVersions.filterNot { case (k, _) ⇒ !allowed(k) } new Reachability(recordBuilder.result(), newVersions) } def remove(nodes: Iterable[UniqueAddress]): Reachability = { val nodesSet = nodes.to[immutable.HashSet] val newRecords = records.filterNot(r ⇒ nodesSet(r.observer) || nodesSet(r.subject)) if (newRecords.size == records.size) this else { val newVersions = versions -- nodes Reachability(newRecords, newVersions) } } def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus = observerRows(observer) match { case None ⇒ Reachable case Some(observerRows) ⇒ observerRows.get(subject) match { case None ⇒ Reachable case Some(record) ⇒ record.status } } def status(node: UniqueAddress): ReachabilityStatus = if (cache.allTerminated(node)) Terminated else if (cache.allUnreachable(node)) Unreachable else Reachable def isReachable(node: UniqueAddress): Boolean = isAllReachable || !allUnreachableOrTerminated.contains(node) def isReachable(observer: UniqueAddress, subject: UniqueAddress): Boolean = status(observer, subject) == Reachable def isAllReachable: Boolean = records.isEmpty /** * Doesn't include terminated */ def allUnreachable: Set[UniqueAddress] = cache.allUnreachable def allUnreachableOrTerminated: Set[UniqueAddress] = cache.allUnreachableOrTerminated /** * Doesn't include terminated */ def allUnreachableFrom(observer: UniqueAddress): Set[UniqueAddress] = observerRows(observer) match { case None ⇒ Set.empty case Some(observerRows) ⇒ observerRows.collect { case (subject, record) if record.status == Unreachable ⇒ subject }(breakOut) } def observersGroupedByUnreachable: Map[UniqueAddress, Set[UniqueAddress]] = { records.groupBy(_.subject).collect { case (subject, records) if records.exists(_.status == Unreachable) ⇒ val observers: Set[UniqueAddress] = records.collect { case r if r.status == Unreachable ⇒ r.observer }(breakOut) (subject -> observers) } } def allObservers: Set[UniqueAddress] = versions.keySet def recordsFrom(observer: UniqueAddress): immutable.IndexedSeq[Record] = { observerRows(observer) match { case None ⇒ Vector.empty case Some(rows) ⇒ rows.valuesIterator.toVector } } // only used for testing override def hashCode: Int = versions.hashCode // only used for testing override def equals(obj: Any): Boolean = obj match { case other: Reachability ⇒ records.size == other.records.size && versions == versions && cache.observerRowsMap == other.cache.observerRowsMap case _ ⇒ false } override def toString: String = { val rows = for { observer ← versions.keys.toSeq.sorted rowsOption = observerRows(observer) if rowsOption.isDefined // compilation err for subject <- rowsOption rows = rowsOption.get subject ← rows.keys.toSeq.sorted } yield { val record = rows(subject) val aggregated = status(subject) s"${observer.address} -> ${subject.address}: ${record.status} [$aggregated] (${record.version})" } rows.mkString(", ") } } Other Akka source code examplesHere is a short list of links related to this Akka Reachability.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.