alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Persistence.scala)

This example Akka source code file (Persistence.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, boolean, concurrent, defaultplugindispatcherid, dispatch, duration, extendedactorsystem, finiteduration, int, persistence, persistencesettings, props, string, time

The Persistence.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.persistence

import scala.concurrent.duration._

import com.typesafe.config.Config

import akka.actor._
import akka.dispatch.Dispatchers
import akka.persistence.journal.AsyncWriteJournal
import akka.util.Helpers.ConfigOps

/**
 * Persistence configuration.
 */
final class PersistenceSettings(config: Config) {
  object journal {
    val maxMessageBatchSize: Int =
      config.getInt("journal.max-message-batch-size")

    val maxConfirmationBatchSize: Int =
      config.getInt("journal.max-confirmation-batch-size")

    val maxDeletionBatchSize: Int =
      config.getInt("journal.max-deletion-batch-size")
  }

  object view {
    val autoUpdate: Boolean =
      config.getBoolean("view.auto-update")

    val autoUpdateInterval: FiniteDuration =
      config.getMillisDuration("view.auto-update-interval")

    val autoUpdateReplayMax: Long =
      posMax(config.getLong("view.auto-update-replay-max"))

    private def posMax(v: Long) =
      if (v < 0) Long.MaxValue else v
  }

  object atLeastOnceDelivery {

    val redeliverInterval: FiniteDuration =
      config.getMillisDuration("at-least-once-delivery.redeliver-interval")

    val warnAfterNumberOfUnconfirmedAttempts: Int =
      config.getInt("at-least-once-delivery.warn-after-number-of-unconfirmed-attempts")

    val maxUnconfirmedMessages: Int =
      config.getInt("at-least-once-delivery.max-unconfirmed-messages")
  }

  /**
   * INTERNAL API.
   *
   * These config options are only used internally for testing
   * purposes and are therefore not defined in reference.conf
   */
  private[persistence] object internal {
    val publishPluginCommands: Boolean = {
      val path = "publish-plugin-commands"
      config.hasPath(path) && config.getBoolean(path)
    }

    val publishConfirmations: Boolean = {
      val path = "publish-confirmations"
      config.hasPath(path) && config.getBoolean(path)
    }
  }
}

/**
 * Persistence extension.
 */
object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
  /**
   * Java API.
   */
  override def get(system: ActorSystem): Persistence = super.get(system)

  def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system)

  def lookup() = Persistence
}

/**
 * Persistence extension.
 */
class Persistence(val system: ExtendedActorSystem) extends Extension {
  private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher"
  private val config = system.settings.config.getConfig("akka.persistence")

  val settings = new PersistenceSettings(config)

  private val snapshotStore = createPlugin("snapshot-store") { _ ⇒
    DefaultPluginDispatcherId
  }

  private val journal = createPlugin("journal") { clazz ⇒
    if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId
    else DefaultPluginDispatcherId
  }

  private val confirmationBatchLayer = system.systemActorOf(
    Props(classOf[DeliveredByChannelBatching], journal, settings), "confirmation-batch-layer")

  private val deletionBatchLayer = system.systemActorOf(
    Props(classOf[DeliveredByPersistentChannelBatching], journal, settings), "deletion-batch-layer")

  /**
   * Creates a canonical processor id from a processor actor ref.
   */
  @deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4")
  def processorId(processor: ActorRef): String = id(processor)

  /**
   * Creates a canonical persistent actor id from a processor actor ref.
   */
  def persistenceId(persistentActor: ActorRef): String = id(persistentActor)

  /**
   * Creates a canonical channel id from a channel actor ref.
   */
  @deprecated("Channels will be removed. You may want to use `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
  def channelId(channel: ActorRef): String = id(channel)

  /**
   * Returns a snapshot store for a processor identified by `persistenceId`.
   */
  def snapshotStoreFor(persistenceId: String): ActorRef = {
    // Currently returns a snapshot store singleton but this methods allows for later
    // optimizations where each processor can have its own snapshot store actor.
    snapshotStore
  }

  /**
   * Returns a journal for a processor identified by `persistenceId`.
   */
  def journalFor(persistenceId: String): ActorRef = {
    // Currently returns a journal singleton but this methods allows for later
    // optimizations where each processor can have its own journal actor.
    journal
  }

  /**
   * INTERNAL API.
   */
  private[persistence] def confirmationBatchingJournalForChannel(channelId: String): ActorRef =
    confirmationBatchLayer

  /**
   * INTERNAL API.
   */
  private[persistence] def deletionBatchingJournalForChannel(channelId: String): ActorRef =
    deletionBatchLayer

  private def createPlugin(pluginType: String)(dispatcherSelector: Class[_] ⇒ String) = {
    val pluginConfigPath = config.getString(s"${pluginType}.plugin")
    val pluginConfig = system.settings.config.getConfig(pluginConfigPath)
    val pluginClassName = pluginConfig.getString("class")
    val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
    val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
    system.systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType)
  }

  private def id(ref: ActorRef) = ref.path.toStringWithoutAddress
}

Other Akka source code examples

Here is a short list of links related to this Akka Persistence.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.