|
Akka/Scala example source code file (PersistentView.scala)
The PersistentView.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.persistence.JournalProtocol._
/**
* Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with
* all messages from the corresponding persistent id's journal that have not yet been consumed by the view.
* To update a view with messages that have been written after handling this request, another `Update`
* request must be sent to the view.
*
* @param await if `true`, processing of further messages sent to the view will be delayed until the
* incremental message replay, triggered by this update request, completes. If `false`,
* any message sent to the view may interleave with replayed [[Persistent]] message
* stream.
* @param replayMax maximum number of messages to replay when handling this update request. Defaults
* to `Long.MaxValue` (i.e. no limit).
*/
@SerialVersionUID(1L)
final case class Update(await: Boolean = false, replayMax: Long = Long.MaxValue)
object Update {
/**
* Java API.
*/
def create() =
Update()
/**
* Java API.
*/
def create(await: Boolean) =
Update(await)
/**
* Java API.
*/
def create(await: Boolean, replayMax: Long) =
Update(await, replayMax)
}
/**
* A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive
* the message stream directly from the Journal. These messages can be processed to update internal state
* in order to maintain an (eventual consistent) view of the state of the corresponding persistent actor. A
* persistent view can also run on a different node, provided that a replicated journal is used.
*
* Implementation classes refer to a persistent actors' message stream by implementing `persistenceId`
* with the corresponding (shared) identifier value.
*
* Views can also store snapshots of internal state by calling [[autoUpdate]]. The snapshots of a view
* are independent of those of the referenced persistent actor. During recovery, a saved snapshot is offered
* to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger
* than the snapshot. Default is to offer the latest saved snapshot.
*
* By default, a view automatically updates itself with an interval returned by `autoUpdateInterval`.
* This method can be overridden by implementation classes to define a view instance-specific update
* interval. The default update interval for all views of an actor system can be configured with the
* `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional
* view updates by sending the view [[Update]] requests. See also methods
*
* - [[autoUpdate]] for turning automated updates on or off
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*/
trait PersistentView extends Actor with Recovery {
import context.dispatcher
/**
* INTERNAL API.
*
* Extends the `replayStarted` state of [[Recovery]] with logic to handle [[Update]] requests
* sent by users.
*/
private[persistence] override def replayStarted(await: Boolean) = new State {
private var delegateAwaiting = await
private var delegate = PersistentView.super.replayStarted(await)
override def toString: String = delegate.toString
override def aroundReceive(receive: Receive, message: Any) = message match {
case Update(false, _) ⇒ // ignore
case u @ Update(true, _) if !delegateAwaiting ⇒
delegateAwaiting = true
delegate = PersistentView.super.replayStarted(await = true)
delegate.aroundReceive(receive, u)
case other ⇒
delegate.aroundReceive(receive, other)
}
}
/**
* When receiving an [[Update]] request, switches to `replayStarted` state and triggers
* an incremental message replay. Invokes the actor's current behavior for any other
* received message.
*/
private val idle: State = new State {
override def toString: String = "idle"
def aroundReceive(receive: Receive, message: Any): Unit = message match {
case r: Recover ⇒ // ignore
case Update(awaitUpdate, replayMax) ⇒
_currentState = replayStarted(await = awaitUpdate)
journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
case other ⇒ process(receive, other)
}
}
/**
* INTERNAL API.
*/
private[persistence] def onReplaySuccess(receive: Receive, await: Boolean): Unit =
onReplayComplete(await)
/**
* INTERNAL API.
*/
private[persistence] def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit =
onReplayComplete(await)
/**
* Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`.
*/
private def onReplayComplete(await: Boolean): Unit = {
_currentState = idle
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax)))
if (await) receiverStash.unstashAll()
}
/**
* INTERNAL API
* WARNING: This implementation UNWRAPS PERSISTENT() before delivering to the receive block.
*/
override private[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit =
super.withCurrentPersistent(persistent) { p ⇒
receive.applyOrElse(p.payload, unhandled)
}
private val viewSettings = extension.settings.view
private var schedule: Option[Cancellable] = None
/**
* View id is used as identifier for snapshots performed by this [[PersistentView]].
* This allows the View to keep separate snapshots of data than the [[PersistentActor]] originating the message stream.
*
*
* The usual case is to have a *different* id set as `viewId` than `persistenceId`,
* although it is possible to share the same id with an [[PersistentActor]] - for example to decide about snapshots
* based on some average or sum, calculated by this view.
*
* Example:
* {{{
* class SummingView extends PersistentView {
* override def persistenceId = "count-123"
* override def viewId = "count-123-sum" // this view is performing summing,
* // so this view's snapshots reside under the "-sum" suffixed id
*
* // ...
* }
* }}}
*/
def viewId: String
/**
* Returns `viewId`.
*/
def snapshotterId: String = viewId
/**
* If `true`, the currently processed message was persisted (is sent from the Journal).
* If `false`, the currently processed message comes from another actor (from "user-land").
*/
def isPersistent: Boolean =
currentPersistentMessage.isDefined
/**
* If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`.
* If `false`, applications must explicitly update this view by sending [[Update]] requests. The default
* value can be configured with the `akka.persistence.view.auto-update` configuration key. This method
* can be overridden by implementation classes to return non-default values.
*/
def autoUpdate: Boolean =
viewSettings.autoUpdate
/**
* The interval for automated updates. The default value can be configured with the
* `akka.persistence.view.auto-update-interval` configuration key. This method can be
* overridden by implementation classes to return non-default values.
*/
def autoUpdateInterval: FiniteDuration =
viewSettings.autoUpdateInterval
/**
* The maximum number of messages to replay per update. The default value can be configured with the
* `akka.persistence.view.auto-update-replay-max` configuration key. This method can be overridden by
* implementation classes to return non-default values.
*/
def autoUpdateReplayMax: Long =
viewSettings.autoUpdateReplayMax
/**
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
* messages (following that snapshot).
*/
override def preStart(): Unit = {
super.preStart()
self ! Recover(replayMax = autoUpdateReplayMax)
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try receiverStash.unstashAll() finally super.preRestart(reason, message)
}
override def postStop(): Unit = {
schedule.foreach(_.cancel())
super.postStop()
}
}
/**
* Java API.
*
* @see [[PersistentView]]
*/
abstract class UntypedPersistentView extends UntypedActor with PersistentView
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]])
*
* @see [[PersistentView]]
*/
abstract class AbstractPersistentView extends AbstractActor with PersistentView
Other Akka source code examplesHere is a short list of links related to this Akka PersistentView.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.