|
Akka/Scala example source code file (View.scala)
The View.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.persistence.JournalProtocol._
/**
* A view replicates the persistent message stream of a processor. Implementation classes receive the
* message stream as [[Persistent]] messages. These messages can be processed to update internal state
* in order to maintain an (eventual consistent) view of the state of the corresponding processor. A
* view can also run on a different node, provided that a replicated journal is used. Implementation
* classes reference a processor by implementing `persistenceId`.
*
* Views can also store snapshots of internal state by calling [[#saveSnapshot]]. The snapshots of a view
* are independent of those of the referenced processor. During recovery, a saved snapshot is offered
* to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger
* than the snapshot. Default is to offer the latest saved snapshot.
*
* By default, a view automatically updates itself with an interval returned by `autoUpdateInterval`.
* This method can be overridden by implementation classes to define a view instance-specific update
* interval. The default update interval for all views of an actor system can be configured with the
* `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional
* view updates by sending the view [[Update]] requests. See also methods
*
* - [[#autoUpdate]] for turning automated updates on or off
* - [[#autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*
* Views can also use channels to communicate with destinations in the same way as processors can do.
*/
@deprecated("Use `akka.persistence.PersistentView` instead.", since = "2.3.4")
trait View extends Actor with Recovery {
import context.dispatcher
/**
* INTERNAL API.
*
* Extends the `replayStarted` state of [[Recovery]] with logic to handle [[Update]] requests
* sent by users.
*/
private[persistence] override def replayStarted(await: Boolean) = new State {
private var delegateAwaiting = await
private var delegate = View.super.replayStarted(await)
override def toString: String = delegate.toString
override def aroundReceive(receive: Receive, message: Any) = message match {
case Update(false, _) ⇒ // ignore
case u @ Update(true, _) if !delegateAwaiting ⇒
delegateAwaiting = true
delegate = View.super.replayStarted(await = true)
delegate.aroundReceive(receive, u)
case other ⇒
delegate.aroundReceive(receive, other)
}
}
/**
* When receiving an [[Update]] request, switches to `replayStarted` state and triggers
* an incremental message replay. Invokes the actor's current behavior for any other
* received message.
*/
private val idle: State = new State {
override def toString: String = "idle"
def aroundReceive(receive: Receive, message: Any): Unit = message match {
case r: Recover ⇒ // ignore
case Update(awaitUpdate, replayMax) ⇒
_currentState = replayStarted(await = awaitUpdate)
journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
case other ⇒ process(receive, other)
}
}
/**
* INTERNAL API.
*/
private[persistence] def onReplaySuccess(receive: Receive, await: Boolean): Unit =
onReplayComplete(await)
/**
* INTERNAL API.
*/
private[persistence] def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit =
onReplayComplete(await)
/**
* Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`.
*/
private def onReplayComplete(await: Boolean): Unit = {
_currentState = idle
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax)))
if (await) receiverStash.unstashAll()
}
private val _viewId = extension.persistenceId(self)
private val viewSettings = extension.settings.view
private var schedule: Option[Cancellable] = None
/**
* View id. Defaults to this view's path and can be overridden.
*/
def viewId: String = _viewId
/**
* Returns `viewId`.
*/
def snapshotterId: String = viewId
/**
* Persistence id. Defaults to this persistent-actors's path and can be overridden.
*/
override def persistenceId: String = processorId
/**
* If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`.
* If `false`, applications must explicitly update this view by sending [[Update]] requests. The default
* value can be configured with the `akka.persistence.view.auto-update` configuration key. This method
* can be overridden by implementation classes to return non-default values.
*/
def autoUpdate: Boolean =
viewSettings.autoUpdate
/**
* The interval for automated updates. The default value can be configured with the
* `akka.persistence.view.auto-update-interval` configuration key. This method can be
* overridden by implementation classes to return non-default values.
*/
def autoUpdateInterval: FiniteDuration =
viewSettings.autoUpdateInterval
/**
* The maximum number of messages to replay per update. The default value can be configured with the
* `akka.persistence.view.auto-update-replay-max` configuration key. This method can be overridden by
* implementation classes to return non-default values.
*/
def autoUpdateReplayMax: Long =
viewSettings.autoUpdateReplayMax
/**
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
* messages (following that snapshot).
*/
override def preStart(): Unit = {
super.preStart()
self ! Recover(replayMax = autoUpdateReplayMax)
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try receiverStash.unstashAll() finally super.preRestart(reason, message)
}
override def postStop(): Unit = {
schedule.foreach(_.cancel())
super.postStop()
}
}
/**
* Java API.
*
* @see [[View]]
*/
@deprecated("Use `akka.persistence.UntypedPersistentView instead.", since = "2.3.4")
abstract class UntypedView extends UntypedActor with View
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]])
*
* @see [[View]]
*/
@deprecated("Use `akka.persistence.AbstractPersistentView` instead.", since = "2.3.4")
abstract class AbstractView extends AbstractActor with View
Other Akka source code examplesHere is a short list of links related to this Akka View.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.