|
Akka/Scala example source code file (Recovery.scala)
The Recovery.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence import akka.actor._ import akka.dispatch.Envelope import akka.persistence.JournalProtocol._ import akka.persistence.SnapshotProtocol.LoadSnapshotResult import scala.util.control.NonFatal /** * Recovery state machine that loads snapshots and replays messages. * * @see [[PersistentActor]] * @see [[PersistentView]] */ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { /** * INTERNAL API. * * Recovery state. */ private[persistence] trait State { def aroundReceive(receive: Receive, message: Any): Unit protected def process(receive: Receive, message: Any) = receive.applyOrElse(message, unhandled) protected def processPersistent(receive: Receive, persistent: Persistent) = withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled)) protected def recordFailure(cause: Throwable): Unit = { _recoveryFailureCause = cause _recoveryFailureMessage = context.asInstanceOf[ActorCell].currentMessage } } /** * INTERNAL API. * * Initial state, waits for `Recover` request, submit a `LoadSnapshot` request to the snapshot * store and changes to `recoveryStarted` state. */ private[persistence] val recoveryPending = new State { override def toString: String = "recovery pending" def aroundReceive(receive: Receive, message: Any): Unit = message match { case Recover(fromSnap, toSnr, replayMax) ⇒ _currentState = recoveryStarted(replayMax) loadSnapshot(snapshotterId, fromSnap, toSnr) case _ ⇒ receiverStash.stash() } } /** * INTERNAL API. * * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * message to the actor's current behavior. Then initiates a message replay, either starting * from the loaded snapshot or from scratch, and switches to `replayStarted` state. * * @param replayMax maximum number of messages to replay. */ private[persistence] def recoveryStarted(replayMax: Long) = new State { override def toString: String = s"recovery started (replayMax = [${replayMax}])" def aroundReceive(receive: Receive, message: Any) = message match { case r: Recover ⇒ // ignore case LoadSnapshotResult(sso, toSnr) ⇒ sso.foreach { case SelectedSnapshot(metadata, snapshot) ⇒ updateLastSequenceNr(metadata.sequenceNr) process(receive, SnapshotOffer(metadata, snapshot)) } _currentState = replayStarted(await = true) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) case other ⇒ receiverStash.stash() } } /** * INTERNAL API. * * Processes replayed messages, if any. The actor's current behavior is invoked with the replayed * [[Persistent]] messages. If processing of a replayed message fails, the exception is caught and * stored for being thrown later and state is changed to `recoveryFailed`. If replay succeeds the * `onReplaySuccess` method is called, otherwise `onReplayFailure`. * * @param await if `true` processing of further messages will be delayed until replay completes, * otherwise, the actor's behavior is invoked immediately with these messages. */ private[persistence] def replayStarted(await: Boolean) = new State { override def toString: String = s"replay started (await = [${await}])" def aroundReceive(receive: Receive, message: Any) = message match { case r: Recover ⇒ // ignore case ReplayedMessage(p) ⇒ try processPersistent(receive, p) catch { case NonFatal(t) ⇒ _currentState = replayFailed // delay throwing exception to prepareRestart recordFailure(t) } case ReplayMessagesSuccess ⇒ onReplaySuccess(receive, await) case ReplayMessagesFailure(cause) ⇒ onReplayFailure(receive, await, cause) case other ⇒ if (await) receiverStash.stash() else process(receive, other) } } /** * INTERNAL API. * * Consumes remaining replayed messages and then changes to `prepareRestart`. The * message that caused the exception during replay, is re-added to the mailbox and * re-received in `prepareRestart`. */ private[persistence] val replayFailed = new State { override def toString: String = "replay failed" def aroundReceive(receive: Receive, message: Any) = message match { case ReplayMessagesFailure(_) ⇒ replayCompleted() // journal couldn't tell the maximum stored sequence number, hence the next // replay must be a full replay (up to the highest stored sequence number) updateLastSequenceNr(Long.MaxValue) case ReplayMessagesSuccess ⇒ replayCompleted() case ReplayedMessage(p) ⇒ updateLastSequenceNr(p) case r: Recover ⇒ // ignore case _ ⇒ receiverStash.stash() } def replayCompleted(): Unit = { _currentState = prepareRestart mailbox.enqueueFirst(self, _recoveryFailureMessage) } } /** * INTERNAL API. * * Re-receives the replayed message that caused an exception and re-throws that exception. */ private[persistence] val prepareRestart = new State { override def toString: String = "prepare restart" def aroundReceive(receive: Receive, message: Any) = message match { case ReplayedMessage(_) ⇒ throw _recoveryFailureCause case _ ⇒ // ignore } } private var _recoveryFailureCause: Throwable = _ private var _recoveryFailureMessage: Envelope = _ private var _lastSequenceNr: Long = 0L private var _currentPersistent: Persistent = _ /** * Id of the processor for which messages should be replayed. */ @deprecated("Override `persistenceId` instead. Processor will be removed.", since = "2.3.4") def processorId: String = extension.persistenceId(self) // TODO: remove processorId /** * Id of the persistent entity for which messages should be replayed. */ def persistenceId: String /** INTERNAL API */ private[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = try { _currentPersistent = persistent updateLastSequenceNr(persistent) body(persistent) } finally _currentPersistent = null /** INTERNAL API. */ private[persistence] def updateLastSequenceNr(persistent: Persistent): Unit = if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr /** INTERNAL API. */ private[persistence] def updateLastSequenceNr(value: Long): Unit = _lastSequenceNr = value /** * Returns the current persistent message if there is any. */ @deprecated("currentPersistentMessage will be removed, sequence number can be retrieved with `lastSequenceNr`.", since = "2.3.4") implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent) /** * Java API: returns the current persistent message or `null` if there is none. */ @deprecated("getCurrentPersistentMessage will be removed, sequence number can be retrieved with `lastSequenceNr`.", since = "2.3.4") def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null) /** * Highest received sequence number so far or `0L` if this actor hasn't received a persistent * message yet. Usually equal to the sequence number of `currentPersistentMessage` (unless a * receiver implementation is about to re-order persistent messages using `stash()` and `unstash()`). */ def lastSequenceNr: Long = _lastSequenceNr /** * Returns `lastSequenceNr`. */ def snapshotSequenceNr: Long = lastSequenceNr /** * INTERNAL API. */ private[persistence] var _currentState: State = recoveryPending /** * INTERNAL API. * * Called whenever a message replay succeeds. * * @param receive the actor's current behavior. * @param awaitReplay `awaitReplay` value of the calling `replayStarted` state. */ private[persistence] def onReplaySuccess(receive: Receive, awaitReplay: Boolean): Unit /** * INTERNAL API. * * Called whenever a message replay fails. * * @param receive the actor's current behavior. * @param awaitReplay `awaitReplay` value of the calling `replayStarted` state. * @param cause failure cause. */ private[persistence] def onReplayFailure(receive: Receive, awaitReplay: Boolean, cause: Throwable): Unit /** * INTERNAL API. */ private[persistence] val extension = Persistence(context.system) /** * INTERNAL API. */ private[persistence] lazy val journal = extension.journalFor(persistenceId) /** * INTERNAL API. */ private[persistence] val receiverStash = createStash() /** * INTERNAL API. */ override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = { _currentState.aroundReceive(receive, message) } } /** * Instructs a persistent actor to recover itself. Recovery will start from a snapshot if the persistent actor has * previously saved one or more snapshots and at least one of these snapshots matches the specified * `fromSnapshot` criteria. Otherwise, recovery will start from scratch by replaying all journaled * messages. * * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]] * message, followed by replayed messages, if any, that are younger than the snapshot, up to the * specified upper sequence number bound (`toSequenceNr`). * * @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default * is latest (= youngest) snapshot. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound. * @param replayMax maximum number of messages to replay. Default is no limit. */ @SerialVersionUID(1L) final case class Recover(fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest, toSequenceNr: Long = Long.MaxValue, replayMax: Long = Long.MaxValue) object Recover { /** * Java API. * * @see [[Recover]] */ def create() = Recover() /** * Java API. * * @see [[Recover]] */ def create(toSequenceNr: Long) = Recover(toSequenceNr = toSequenceNr) /** * Java API. * * @see [[Recover]] */ def create(fromSnapshot: SnapshotSelectionCriteria) = Recover(fromSnapshot = fromSnapshot) /** * Java API. * * @see [[Recover]] */ def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long) = Recover(fromSnapshot, toSequenceNr) /** * Java API. * * @see [[Recover]] */ def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long, replayMax: Long) = Recover(fromSnapshot, toSequenceNr, replayMax) } Other Akka source code examplesHere is a short list of links related to this Akka Recovery.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.