alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (View.scala)

This example Akka source code file (View.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, any, boolean, concurrent, duration, persistence, receive, recover, state, string, time, unit, update, use, view

The View.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.persistence

import scala.concurrent.duration._

import akka.actor._
import akka.persistence.JournalProtocol._

/**
 * A view replicates the persistent message stream of a processor. Implementation classes receive the
 * message stream as [[Persistent]] messages. These messages can be processed to update internal state
 * in order to maintain an (eventual consistent) view of the state of the corresponding processor. A
 * view can also run on a different node, provided that a replicated journal is used. Implementation
 * classes reference a processor by implementing `persistenceId`.
 *
 * Views can also store snapshots of internal state by calling [[#saveSnapshot]]. The snapshots of a view
 * are independent of those of the referenced processor. During recovery, a saved snapshot is offered
 * to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger
 * than the snapshot. Default is to offer the latest saved snapshot.
 *
 * By default, a view automatically updates itself with an interval returned by `autoUpdateInterval`.
 * This method can be overridden by implementation classes to define a view instance-specific update
 * interval. The default update interval for all views of an actor system can be configured with the
 * `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional
 * view updates by sending the view [[Update]] requests. See also methods
 *
 *  - [[#autoUpdate]] for turning automated updates on or off
 *  - [[#autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
 *
 * Views can also use channels to communicate with destinations in the same way as processors can do.
 */
@deprecated("Use `akka.persistence.PersistentView` instead.", since = "2.3.4")
trait View extends Actor with Recovery {
  import context.dispatcher

  /**
   * INTERNAL API.
   *
   * Extends the `replayStarted` state of [[Recovery]] with logic to handle [[Update]] requests
   * sent by users.
   */
  private[persistence] override def replayStarted(await: Boolean) = new State {
    private var delegateAwaiting = await
    private var delegate = View.super.replayStarted(await)

    override def toString: String = delegate.toString

    override def aroundReceive(receive: Receive, message: Any) = message match {
      case Update(false, _) ⇒ // ignore
      case u @ Update(true, _) if !delegateAwaiting ⇒
        delegateAwaiting = true
        delegate = View.super.replayStarted(await = true)
        delegate.aroundReceive(receive, u)
      case other ⇒
        delegate.aroundReceive(receive, other)
    }
  }

  /**
   * When receiving an [[Update]] request, switches to `replayStarted` state and triggers
   * an incremental message replay. Invokes the actor's current behavior for any other
   * received message.
   */
  private val idle: State = new State {
    override def toString: String = "idle"

    def aroundReceive(receive: Receive, message: Any): Unit = message match {
      case r: Recover ⇒ // ignore
      case Update(awaitUpdate, replayMax) ⇒
        _currentState = replayStarted(await = awaitUpdate)
        journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
      case other ⇒ process(receive, other)
    }
  }

  /**
   * INTERNAL API.
   */
  private[persistence] def onReplaySuccess(receive: Receive, await: Boolean): Unit =
    onReplayComplete(await)

  /**
   * INTERNAL API.
   */
  private[persistence] def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit =
    onReplayComplete(await)

  /**
   * Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`.
   */
  private def onReplayComplete(await: Boolean): Unit = {
    _currentState = idle
    if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax)))
    if (await) receiverStash.unstashAll()
  }

  private val _viewId = extension.persistenceId(self)
  private val viewSettings = extension.settings.view

  private var schedule: Option[Cancellable] = None

  /**
   * View id. Defaults to this view's path and can be overridden.
   */
  def viewId: String = _viewId

  /**
   * Returns `viewId`.
   */
  def snapshotterId: String = viewId

  /**
   * Persistence id. Defaults to this persistent-actors's path and can be overridden.
   */
  override def persistenceId: String = processorId

  /**
   * If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`.
   * If `false`, applications must explicitly update this view by sending [[Update]] requests. The default
   * value can be configured with the `akka.persistence.view.auto-update` configuration key. This method
   * can be overridden by implementation classes to return non-default values.
   */
  def autoUpdate: Boolean =
    viewSettings.autoUpdate

  /**
   * The interval for automated updates. The default value can be configured with the
   * `akka.persistence.view.auto-update-interval` configuration key. This method can be
   * overridden by implementation classes to return non-default values.
   */
  def autoUpdateInterval: FiniteDuration =
    viewSettings.autoUpdateInterval

  /**
   * The maximum number of messages to replay per update. The default value can be configured with the
   * `akka.persistence.view.auto-update-replay-max` configuration key. This method can be overridden by
   * implementation classes to return non-default values.
   */
  def autoUpdateReplayMax: Long =
    viewSettings.autoUpdateReplayMax

  /**
   * Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
   * messages (following that snapshot).
   */
  override def preStart(): Unit = {
    super.preStart()
    self ! Recover(replayMax = autoUpdateReplayMax)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    try receiverStash.unstashAll() finally super.preRestart(reason, message)
  }

  override def postStop(): Unit = {
    schedule.foreach(_.cancel())
    super.postStop()
  }
}

/**
 * Java API.
 *
 * @see [[View]]
 */
@deprecated("Use `akka.persistence.UntypedPersistentView instead.", since = "2.3.4")
abstract class UntypedView extends UntypedActor with View

/**
 * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]])
 *
 * @see [[View]]
 */
@deprecated("Use `akka.persistence.AbstractPersistentView` instead.", since = "2.3.4")
abstract class AbstractView extends AbstractActor with View

Other Akka source code examples

Here is a short list of links related to this Akka View.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.