alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (AutoDown.scala)

This example Akka source code file (AutoDown.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, address, akka, autodown, autodownbase, cluster, concurrent, duration, finiteduration, member, props, scheduler, time, uniqueaddress, unit, unreachabletimeout

The AutoDown.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.cluster

import akka.actor.Actor
import scala.concurrent.duration.FiniteDuration
import akka.actor.Props
import akka.cluster.ClusterEvent._
import akka.actor.Cancellable
import scala.concurrent.duration.Duration
import akka.actor.Address
import akka.actor.Scheduler

/**
 * INTERNAL API
 */
private[cluster] object AutoDown {

  def props(autoDownUnreachableAfter: FiniteDuration): Props =
    Props(classOf[AutoDown], autoDownUnreachableAfter)

  final case class UnreachableTimeout(node: UniqueAddress)
}

/**
 * INTERNAL API
 *
 * An unreachable member will be downed by this actor if it remains unreachable
 * for the specified duration and this actor is running on the leader node in the
 * cluster.
 *
 * The implementation is split into two classes AutoDown and AutoDownBase to be
 * able to unit test the logic without running cluster.
 */
private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration)
  extends AutoDownBase(autoDownUnreachableAfter) {

  val cluster = Cluster(context.system)
  import cluster.InfoLogger._

  override def selfAddress = cluster.selfAddress

  override def scheduler: Scheduler = cluster.scheduler

  // re-subscribe when restart
  override def preStart(): Unit = {
    cluster.subscribe(self, classOf[ClusterDomainEvent])
    super.preStart()
  }
  override def postStop(): Unit = {
    cluster.unsubscribe(self)
    super.postStop()
  }

  override def down(node: Address): Unit = {
    require(leader)
    logInfo("Leader is auto-downing unreachable node [{}]", node)
    cluster.down(node)
  }

}

/**
 * INTERNAL API
 *
 * The implementation is split into two classes AutoDown and AutoDownBase to be
 * able to unit test the logic without running cluster.
 */
private[cluster] abstract class AutoDownBase(autoDownUnreachableAfter: FiniteDuration) extends Actor {

  import AutoDown._

  def selfAddress: Address

  def down(node: Address): Unit

  def scheduler: Scheduler

  import context.dispatcher

  val skipMemberStatus = Gossip.convergenceSkipUnreachableWithMemberStatus

  var scheduledUnreachable: Map[UniqueAddress, Cancellable] = Map.empty
  var pendingUnreachable: Set[UniqueAddress] = Set.empty
  var leader = false

  override def postStop(): Unit = {
    scheduledUnreachable.values foreach { _.cancel }
  }

  def receive = {
    case state: CurrentClusterState ⇒
      leader = state.leader.exists(_ == selfAddress)
      state.unreachable foreach unreachableMember

    case UnreachableMember(m) ⇒ unreachableMember(m)

    case ReachableMember(m)   ⇒ remove(m.uniqueAddress)
    case MemberRemoved(m, _)  ⇒ remove(m.uniqueAddress)

    case LeaderChanged(leaderOption) ⇒
      leader = leaderOption.exists(_ == selfAddress)
      if (leader) {
        pendingUnreachable.foreach(node ⇒ down(node.address))
        pendingUnreachable = Set.empty
      }

    case UnreachableTimeout(node) ⇒
      if (scheduledUnreachable contains node) {
        scheduledUnreachable -= node
        downOrAddPending(node)
      }

    case _: ClusterDomainEvent ⇒ // not interested in other events

  }

  def unreachableMember(m: Member): Unit =
    if (!skipMemberStatus(m.status) && !scheduledUnreachable.contains(m.uniqueAddress))
      scheduleUnreachable(m.uniqueAddress)

  def scheduleUnreachable(node: UniqueAddress): Unit = {
    if (autoDownUnreachableAfter == Duration.Zero) {
      downOrAddPending(node)
    } else {
      val task = scheduler.scheduleOnce(autoDownUnreachableAfter, self, UnreachableTimeout(node))
      scheduledUnreachable += (node -> task)
    }
  }

  def downOrAddPending(node: UniqueAddress): Unit = {
    if (leader) {
      down(node.address)
    } else {
      // it's supposed to be downed by another node, current leader, but if that crash
      // a new leader must pick up these
      pendingUnreachable += node
    }
  }

  def remove(node: UniqueAddress): Unit = {
    scheduledUnreachable.get(node) foreach { _.cancel }
    scheduledUnreachable -= node
    pendingUnreachable -= node
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka AutoDown.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.