alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (FaultHandling.scala)

This example Akka source code file (FaultHandling.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, collection, concurrent, dispatch, envelope, failed, interruptedexception, none, resume, set, some, throwable, time, unit

The FaultHandling.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.actor.dungeon

import akka.actor.PostRestartException
import akka.actor.PreRestartException
import akka.actor.{ InternalActorRef, ActorRef, ActorInterruptedException, ActorCell, Actor }
import akka.dispatch._
import akka.dispatch.sysmsg._
import akka.event.Logging
import akka.event.Logging.Debug
import akka.event.Logging.Error
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.util.control.Exception._
import scala.util.control.NonFatal
import akka.actor.ActorRefScope

private[akka] trait FaultHandling { this: ActorCell ⇒

  /* =================
   * T H E   R U L E S
   * =================
   *
   * Actors can be suspended for two reasons:
   * - they fail
   * - their supervisor gets suspended
   *
   * In particular they are not suspended multiple times because of cascading
   * own failures, i.e. while currentlyFailed() they do not fail again. In case
   * of a restart, failures in constructor/preStart count as new failures.
   */

  private def suspendNonRecursive(): Unit = dispatcher suspend this

  private def resumeNonRecursive(): Unit = dispatcher resume this

  /*
   * have we told our supervisor that we Failed() and have not yet heard back?
   * (actually: we might have heard back but not yet acted upon it, in case of
   * a restart with dying children)
   * might well be replaced by ref to a Cancellable in the future (see #2299)
   */
  private var _failed: ActorRef = null
  private def isFailed: Boolean = _failed != null
  private def setFailed(perpetrator: ActorRef): Unit = _failed = perpetrator
  private def clearFailed(): Unit = _failed = null
  private def perpetrator: ActorRef = _failed

  /**
   * Do re-create the actor in response to a failure.
   */
  protected def faultRecreate(cause: Throwable): Unit =
    if (actor == null) {
      system.eventStream.publish(Error(self.path.toString, clazz(actor),
        "changing Recreate into Create after " + cause))
      faultCreate()
    } else if (isNormal) {
      val failedActor = actor
      if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
      if (failedActor ne null) {
        val optionalMessage = if (currentMessage ne null) Some(currentMessage.message) else None
        try {
          // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort
          if (failedActor.context ne null) failedActor.aroundPreRestart(cause, optionalMessage)
        } catch handleNonFatalOrInterruptedException { e ⇒
          val ex = new PreRestartException(self, e, cause, optionalMessage)
          publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
        } finally {
          clearActorFields(failedActor)
        }
      }
      assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status)
      if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
    } else {
      // need to keep that suspend counter balanced
      faultResume(causedByFailure = null)
    }

  /**
   * Do suspend the actor in response to a failure of a parent (i.e. the
   * “recursive suspend” feature).
   */
  protected def faultSuspend(): Unit = {
    // done always to keep that suspend counter balanced
    suspendNonRecursive()
    suspendChildren()
  }

  /**
   * Do resume the actor in response to a failure.
   *
   * @param causedByFailure signifies if it was our own failure which
   *        prompted this action.
   */
  protected def faultResume(causedByFailure: Throwable): Unit = {
    if (actor == null) {
      system.eventStream.publish(Error(self.path.toString, clazz(actor),
        "changing Resume into Create after " + causedByFailure))
      faultCreate()
    } else if (actor.context == null && causedByFailure != null) {
      system.eventStream.publish(Error(self.path.toString, clazz(actor),
        "changing Resume into Restart after " + causedByFailure))
      faultRecreate(causedByFailure)
    } else {
      val perp = perpetrator
      // done always to keep that suspend counter balanced
      // must happen “atomically”
      try resumeNonRecursive()
      finally if (causedByFailure != null) clearFailed()
      resumeChildren(causedByFailure, perp)
    }
  }

  /**
   * Do create the actor in response to a failure.
   */
  protected def faultCreate(): Unit = {
    assert(mailbox.isSuspended, "mailbox must be suspended during failed creation, status=" + mailbox.status)
    assert(perpetrator == self)

    setReceiveTimeout(Duration.Undefined)
    cancelReceiveTimeout

    // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
    children foreach stop

    if (!setChildrenTerminationReason(ChildrenContainer.Creation())) finishCreate()
  }

  private def finishCreate(): Unit = {
    try resumeNonRecursive()
    finally clearFailed()
    try create(None)
    catch handleNonFatalOrInterruptedException { e ⇒
      handleInvokeFailure(Nil, e)
    }
  }

  protected def terminate() {
    setReceiveTimeout(Duration.Undefined)
    cancelReceiveTimeout

    // prevent Deadletter(Terminated) messages
    unwatchWatchedActors(actor)

    // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
    children foreach stop

    if (systemImpl.aborting) {
      // separate iteration because this is a very rare case that should not penalize normal operation
      children foreach {
        case ref: ActorRefScope if !ref.isLocal ⇒ self.sendSystemMessage(DeathWatchNotification(ref, true, false))
        case _                                  ⇒
      }
    }

    val wasTerminating = isTerminating

    if (setChildrenTerminationReason(ChildrenContainer.Termination)) {
      if (!wasTerminating) {
        // do not process normal messages while waiting for all children to terminate
        suspendNonRecursive()
        // do not propagate failures during shutdown to the supervisor
        setFailed(self)
        if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))
      }
    } else {
      setTerminated()
      finishTerminate()
    }
  }

  final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
    // prevent any further messages to be processed until the actor has been restarted
    if (!isFailed) try {
      suspendNonRecursive()
      // suspend children
      val skip: Set[ActorRef] = currentMessage match {
        case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) }
        case _                                ⇒ { setFailed(self); Set.empty }
      }
      suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
      t match {
        // tell supervisor
        case _: InterruptedException ⇒
          // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
          parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
        case _ ⇒
          // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
          parent.sendSystemMessage(Failed(self, t, uid))
      }
    } catch handleNonFatalOrInterruptedException { e ⇒
      publish(Error(e, self.path.toString, clazz(actor),
        "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
      try children foreach stop
      finally finishTerminate()
    }
  }

  private def finishTerminate() {
    val a = actor
    /* The following order is crucial for things to work properly. Only change this if you're very confident and lucky.
     *
     * Please note that if a parent is also a watcher then ChildTerminated and Terminated must be processed in this
     * specific order.
     */
    try if (a ne null) a.aroundPostStop()
    catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) }
    finally try dispatcher.detach(this)
    finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
    finally try tellWatchersWeDied()
    finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
    finally {
      if (system.settings.DebugLifecycle)
        publish(Debug(self.path.toString, clazz(a), "stopped"))

      clearActorFields(a)
      clearActorCellFields(this)
      actor = null
    }
  }

  private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = {
    // need to keep a snapshot of the surviving children before the new actor instance creates new ones
    val survivors = children

    try {
      try resumeNonRecursive()
      finally clearFailed() // must happen in any case, so that failure is propagated

      val freshActor = newActor()
      actor = freshActor // this must happen before postRestart has a chance to fail
      if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.

      freshActor.aroundPostRestart(cause)
      if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))

      // only after parent is up and running again do restart the children which were not stopped
      survivors foreach (child ⇒
        try child.asInstanceOf[InternalActorRef].restart(cause)
        catch handleNonFatalOrInterruptedException { e ⇒
          publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
        })
    } catch handleNonFatalOrInterruptedException { e ⇒
      clearActorFields(actor) // in order to prevent preRestart() from happening again
      handleInvokeFailure(survivors, new PostRestartException(self, e, cause))
    }
  }

  final protected def handleFailure(f: Failed): Unit = {
    currentMessage = Envelope(f, f.child, system)
    getChildByRef(f.child) match {
      /*
       * only act upon the failure, if it comes from a currently known child;
       * the UID protects against reception of a Failed from a child which was
       * killed in preRestart and re-created in postRestart
       */
      case Some(stats) if stats.uid == f.uid ⇒
        if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
      case Some(stats) ⇒
        publish(Debug(self.path.toString, clazz(actor),
          "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
      case None ⇒
        publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
    }
  }

  final protected def handleChildTerminated(child: ActorRef): Unit = {
    val status = removeChildAndGetStateChange(child)
    /*
     * if this fails, we do nothing in case of terminating/restarting state,
     * otherwise tell the supervisor etc. (in that second case, the match
     * below will hit the empty default case, too)
     */
    if (actor != null) {
      try actor.supervisorStrategy.handleChildTerminated(this, child, children)
      catch handleNonFatalOrInterruptedException { e ⇒
        publish(Error(e, self.path.toString, clazz(actor), "handleChildTerminated failed"))
        handleInvokeFailure(Nil, e)
      }
    }
    /*
     * if the removal changed the state of the (terminating) children container,
     * then we are continuing the previously suspended recreate/create/terminate action
     */
    status match {
      case Some(c @ ChildrenContainer.Recreation(cause)) ⇒ finishRecreate(cause, actor)
      case Some(c @ ChildrenContainer.Creation()) ⇒ finishCreate()
      case Some(ChildrenContainer.Termination) ⇒ finishTerminate()
      case _ ⇒
    }
  }

  final protected def handleNonFatalOrInterruptedException(thunk: (Throwable) ⇒ Unit): Catcher[Unit] = {
    case e: InterruptedException ⇒
      thunk(e)
      Thread.currentThread().interrupt()
    case NonFatal(e) ⇒
      thunk(e)
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka FaultHandling.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.