|
Akka/Scala example source code file (Persistence.scala)
The Persistence.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import com.typesafe.config.Config
import akka.actor._
import akka.dispatch.Dispatchers
import akka.persistence.journal.AsyncWriteJournal
import akka.util.Helpers.ConfigOps
/**
* Persistence configuration.
*/
final class PersistenceSettings(config: Config) {
object journal {
val maxMessageBatchSize: Int =
config.getInt("journal.max-message-batch-size")
val maxConfirmationBatchSize: Int =
config.getInt("journal.max-confirmation-batch-size")
val maxDeletionBatchSize: Int =
config.getInt("journal.max-deletion-batch-size")
}
object view {
val autoUpdate: Boolean =
config.getBoolean("view.auto-update")
val autoUpdateInterval: FiniteDuration =
config.getMillisDuration("view.auto-update-interval")
val autoUpdateReplayMax: Long =
posMax(config.getLong("view.auto-update-replay-max"))
private def posMax(v: Long) =
if (v < 0) Long.MaxValue else v
}
object atLeastOnceDelivery {
val redeliverInterval: FiniteDuration =
config.getMillisDuration("at-least-once-delivery.redeliver-interval")
val warnAfterNumberOfUnconfirmedAttempts: Int =
config.getInt("at-least-once-delivery.warn-after-number-of-unconfirmed-attempts")
val maxUnconfirmedMessages: Int =
config.getInt("at-least-once-delivery.max-unconfirmed-messages")
}
/**
* INTERNAL API.
*
* These config options are only used internally for testing
* purposes and are therefore not defined in reference.conf
*/
private[persistence] object internal {
val publishPluginCommands: Boolean = {
val path = "publish-plugin-commands"
config.hasPath(path) && config.getBoolean(path)
}
val publishConfirmations: Boolean = {
val path = "publish-confirmations"
config.hasPath(path) && config.getBoolean(path)
}
}
}
/**
* Persistence extension.
*/
object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
/**
* Java API.
*/
override def get(system: ActorSystem): Persistence = super.get(system)
def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system)
def lookup() = Persistence
}
/**
* Persistence extension.
*/
class Persistence(val system: ExtendedActorSystem) extends Extension {
private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher"
private val config = system.settings.config.getConfig("akka.persistence")
val settings = new PersistenceSettings(config)
private val snapshotStore = createPlugin("snapshot-store") { _ ⇒
DefaultPluginDispatcherId
}
private val journal = createPlugin("journal") { clazz ⇒
if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId
else DefaultPluginDispatcherId
}
private val confirmationBatchLayer = system.systemActorOf(
Props(classOf[DeliveredByChannelBatching], journal, settings), "confirmation-batch-layer")
private val deletionBatchLayer = system.systemActorOf(
Props(classOf[DeliveredByPersistentChannelBatching], journal, settings), "deletion-batch-layer")
/**
* Creates a canonical processor id from a processor actor ref.
*/
@deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4")
def processorId(processor: ActorRef): String = id(processor)
/**
* Creates a canonical persistent actor id from a processor actor ref.
*/
def persistenceId(persistentActor: ActorRef): String = id(persistentActor)
/**
* Creates a canonical channel id from a channel actor ref.
*/
@deprecated("Channels will be removed. You may want to use `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
def channelId(channel: ActorRef): String = id(channel)
/**
* Returns a snapshot store for a processor identified by `persistenceId`.
*/
def snapshotStoreFor(persistenceId: String): ActorRef = {
// Currently returns a snapshot store singleton but this methods allows for later
// optimizations where each processor can have its own snapshot store actor.
snapshotStore
}
/**
* Returns a journal for a processor identified by `persistenceId`.
*/
def journalFor(persistenceId: String): ActorRef = {
// Currently returns a journal singleton but this methods allows for later
// optimizations where each processor can have its own journal actor.
journal
}
/**
* INTERNAL API.
*/
private[persistence] def confirmationBatchingJournalForChannel(channelId: String): ActorRef =
confirmationBatchLayer
/**
* INTERNAL API.
*/
private[persistence] def deletionBatchingJournalForChannel(channelId: String): ActorRef =
deletionBatchLayer
private def createPlugin(pluginType: String)(dispatcherSelector: Class[_] ⇒ String) = {
val pluginConfigPath = config.getString(s"${pluginType}.plugin")
val pluginConfig = system.settings.config.getConfig(pluginConfigPath)
val pluginClassName = pluginConfig.getString("class")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
system.systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType)
}
private def id(ref: ActorRef) = ref.path.toStringWithoutAddress
}
Other Akka source code examplesHere is a short list of links related to this Akka Persistence.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.