|
Akka/Scala example source code file (Eventsourced.scala)
The Eventsourced.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import java.lang.{ Iterable ⇒ JIterable }
import akka.actor.{ AbstractActor, UntypedActor }
import akka.japi.{ Procedure, Util }
import akka.persistence.JournalProtocol._
import scala.collection.immutable
/**
* INTERNAL API.
*
* Event sourcing mixin for a [[Processor]].
*/
private[persistence] trait Eventsourced extends ProcessorImpl {
// TODO consolidate these traits as PersistentActor #15230
/**
* Processor recovery state. Waits for recovery completion and then changes to
* `processingCommands`
*/
private val recovering: State = new State {
// cache the recoveryBehavior since it's a def for binary compatibility in 2.3.x
private val _recoveryBehavior: Receive = recoveryBehavior
override def toString: String = "recovering"
def aroundReceive(receive: Receive, message: Any) {
// Since we are recovering we can ignore the receive behavior from the stack
Eventsourced.super.aroundReceive(_recoveryBehavior, message)
message match {
case _: ReadHighestSequenceNrSuccess | _: ReadHighestSequenceNrFailure ⇒
currentState = processingCommands
case _ ⇒
}
}
}
/**
* Command processing state. If event persistence is pending after processing a
* command, event persistence is triggered and state changes to `persistingEvents`.
*
* There's no need to loop commands though the journal any more i.e. they can now be
* directly offered as `LoopSuccess` to the state machine implemented by `Processor`.
*/
private val processingCommands: State = new State {
override def toString: String = "processing commands"
def aroundReceive(receive: Receive, message: Any) = message match {
case _: ConfirmablePersistent ⇒
doAroundReceive(receive, message)
case PersistentBatch(b) ⇒
throw new UnsupportedOperationException("Persistent command batches not supported")
case _: PersistentRepr ⇒
throw new UnsupportedOperationException("Persistent commands not supported")
case WriteMessageSuccess(p, id) ⇒
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == instanceId) {
withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload))
onWriteComplete()
}
case LoopMessageSuccess(l, id) ⇒
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == instanceId) {
pendingInvocations.peek().handler(l)
onWriteComplete()
}
case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s)
case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f)
case _ ⇒
doAroundReceive(receive, message)
}
private def doAroundReceive(receive: Receive, message: Any): Unit = {
Eventsourced.super.aroundReceive(receive, LoopMessageSuccess(message, instanceId))
if (pendingStashingPersistInvocations > 0) {
currentState = persistingEvents
}
if (resequenceableEventBatch.nonEmpty) flushBatch()
else processorStash.unstash()
}
private def onWriteComplete(): Unit = {
pendingInvocations.pop()
}
}
/**
* Event persisting state. Remains until pending events are persisted and then changes
* state to `processingCommands`. Only events to be persisted are processed. All other
* messages are stashed internally.
*/
private val persistingEvents: State = new State {
override def toString: String = "persisting events"
def aroundReceive(receive: Receive, message: Any): Unit = message match {
case _: ConfirmablePersistent ⇒
processorStash.stash()
case PersistentBatch(b) ⇒
b foreach {
case p: PersistentRepr ⇒ deleteMessage(p.sequenceNr, permanent = true)
case r ⇒ // ignore, nothing to delete (was not a persistent message)
}
throw new UnsupportedOperationException("Persistent command batches not supported")
case p: PersistentRepr ⇒
deleteMessage(p.sequenceNr, permanent = true)
throw new UnsupportedOperationException("Persistent commands not supported")
case WriteMessageSuccess(p, id) ⇒
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == instanceId) {
withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload))
onWriteComplete()
}
case e @ WriteMessageFailure(p, _, id) ⇒
Eventsourced.super.aroundReceive(receive, message) // stops actor by default
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == instanceId)
onWriteComplete()
case LoopMessageSuccess(l, id) ⇒
if (id == instanceId) {
pendingInvocations.peek().handler(l)
onWriteComplete()
}
case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s)
case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f)
case other ⇒ processorStash.stash()
}
private def onWriteComplete(): Unit = {
pendingInvocations.pop() match {
case _: StashingHandlerInvocation ⇒
// enables an early return to `processingCommands`, because if this counter hits `0`,
// we know the remaining pendingInvocations are all `persistAsync` created, which
// means we can go back to processing commands also - and these callbacks will be called as soon as possible
pendingStashingPersistInvocations -= 1
case _ ⇒ // do nothing
}
if (pendingStashingPersistInvocations == 0) {
currentState = processingCommands
processorStash.unstash()
}
}
}
/**
* INTERNAL API.
*
* This is a def and not a val because of binary compatibility in 2.3.x.
* It is cached where it is used.
*/
private def recoveryBehavior: Receive = {
case Persistent(payload, _) if recoveryRunning && receiveRecover.isDefinedAt(payload) ⇒
receiveRecover(payload)
case s: SnapshotOffer if receiveRecover.isDefinedAt(s) ⇒
receiveRecover(s)
case f: RecoveryFailure if receiveRecover.isDefinedAt(f) ⇒
receiveRecover(f)
case RecoveryCompleted if receiveRecover.isDefinedAt(RecoveryCompleted) ⇒
receiveRecover(RecoveryCompleted)
}
private sealed trait PendingHandlerInvocation {
def evt: Any
def handler: Any ⇒ Unit
}
/** forces processor to stash incoming commands untill all these invocations are handled */
private final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation
/** does not force the processor to stash commands; Originates from either `persistAsync` or `defer` calls */
private final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation
/** Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands */
private var pendingStashingPersistInvocations: Long = 0
/** Holds user-supplied callbacks for persist/persistAsync calls */
private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
private var resequenceableEventBatch: List[Resequenceable] = Nil
// When using only `persistAsync` and `defer` max throughput is increased by using the
// batching implemented in `Processor`, but when using `persist` we want to use the atomic
// PeristentBatch for the emitted events. This implementation can be improved when
// Processor and Eventsourced are consolidated into one class
private var useProcessorBatching: Boolean = true
private var currentState: State = recovering
private val processorStash = createStash()
private def flushBatch() {
if (useProcessorBatching)
resequenceableEventBatch.reverse foreach { Eventsourced.super.aroundReceive(receive, _) }
else
Eventsourced.super.aroundReceive(receive, PersistentBatch(resequenceableEventBatch.reverse))
resequenceableEventBatch = Nil
useProcessorBatching = true
}
/**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the inherited user stash.
*
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
final def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
pendingStashingPersistInvocations += 1
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
resequenceableEventBatch = PersistentRepr(event) :: resequenceableEventBatch
useProcessorBatching = false
}
/**
* Asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persist[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit =
events.foreach(persist(_)(handler))
/**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method.
*
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
final def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
resequenceableEventBatch = PersistentRepr(event) :: resequenceableEventBatch
}
/**
* Asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit =
events.foreach(persistAsync(_)(handler))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: A ⇒ Unit): Unit = {
if (pendingInvocations.isEmpty) {
handler(event)
} else {
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
resequenceableEventBatch = NonPersistentRepr(event, sender()) :: resequenceableEventBatch
}
}
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit =
events.foreach(defer(_)(handler))
/**
* Recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot.
*
* This handler must not have side-effects other than changing persistent actor state i.e. it
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If recovery fails, the actor will be stopped. This can be customized by
* handling [[RecoveryFailure]].
*
* @see [[Recover]]
*/
def receiveRecover: Receive
/**
* Command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced persistent actors should not be [[Persistent]] messages.
*/
def receiveCommand: Receive
override def unstashAll() {
// Internally, all messages are processed by unstashing them from
// the internal stash one-by-one. Hence, an unstashAll() from the
// user stash must be prepended to the internal stash.
processorStash.prepend(clearStash())
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Receive, message: Any) {
currentState.aroundReceive(receive, message)
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
// flushJournalBatch will send outstanding persistAsync and defer events to the journal
// and also prevent those to be unstashed in Processor.aroundPreRestart
flushJournalBatch()
super.aroundPreRestart(reason, message)
}
/**
* Calls `super.preRestart` then unstashes all messages from the internal stash.
*/
override def preRestart(reason: Throwable, message: Option[Any]) {
processorStash.unstashAll()
super.preRestart(reason, message)
}
/**
* Calls `super.postStop` then unstashes all messages from the internal stash.
*/
override def postStop() {
processorStash.unstashAll()
super.postStop()
}
/**
* INTERNAL API.
*
* Only here for binary compatibility in 2.3.x.
*/
protected[persistence] val initialBehavior: Receive = recoveryBehavior orElse {
case msg if receiveCommand.isDefinedAt(msg) ⇒
receiveCommand(msg)
}
}
/**
* An event sourced processor.
*/
@deprecated("EventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
trait EventsourcedProcessor extends Processor with Eventsourced {
// TODO remove Processor #15230
def receive = receiveCommand
}
/**
* An persistent Actor - can be used to implement command or event sourcing.
*/
trait PersistentActor extends ProcessorImpl with Eventsourced {
def receive = receiveCommand
}
/**
* Java API: an persistent actor - can be used to implement command or event sourcing.
*/
abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl with Eventsourced {
final def onReceive(message: Any) = onReceiveCommand(message)
final def receiveRecover: Receive = {
case msg ⇒ onReceiveRecover(msg)
}
final def receiveCommand: Receive = {
case msg ⇒ onReceiveCommand(msg)
}
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the inherited user stash.
*
* An event `handler` may close over persistent actor state and modify it. The `getSender()` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[onReceiveCommand]].
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
*/
final def persist[A](event: A, handler: Procedure[A]): Unit =
persist(event)(event ⇒ handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted.
* @param handler handler for each persisted `events`
*/
final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persist(Util.immutableSeq(events))(event ⇒ handler(event))
/**
* JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method.
*
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
final def persistAsync[A](event: A)(handler: Procedure[A]): Unit =
super[Eventsourced].persistAsync(event)(event ⇒ handler(event))
/**
* JAVA API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persistAsync[A](events: JIterable[A])(handler: A ⇒ Unit): Unit =
super[Eventsourced].persistAsync(Util.immutableSeq(events))(event ⇒ handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: Procedure[A]): Unit =
super[Eventsourced].defer(event)(event ⇒ handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit =
super[Eventsourced].defer(Util.immutableSeq(events))(event ⇒ handler(event))
/**
* Java API: recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot.
*
* This handler must not have side-effects other than changing persistent actor state i.e. it
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If recovery fails, the actor will be stopped. This can be customized by
* handling [[RecoveryFailure]].
*
* @see [[Recover]]
*/
@throws(classOf[Exception])
def onReceiveRecover(msg: Any): Unit
/**
* Java API: command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced persistent actors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the persistent actor.
*/
@throws(classOf[Exception])
def onReceiveCommand(msg: Any): Unit
}
/**
* Java API: an persistent actor - can be used to implement command or event sourcing.
*/
abstract class AbstractPersistentActor extends AbstractActor with PersistentActor with Eventsourced {
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the inherited user stash.
*
* An event `handler` may close over persistent actor state and modify it. The `getSender()` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
*/
final def persist[A](event: A, handler: Procedure[A]): Unit =
persist(event)(event ⇒ handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted.
* @param handler handler for each persisted `events`
*/
final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persist(Util.immutableSeq(events))(event ⇒ handler(event))
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
final def persistAsync[A](event: A, handler: Procedure[A]): Unit =
persistAsync(event)(event ⇒ handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: Procedure[A]): Unit =
super.defer(event)(event ⇒ handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit =
super.defer(Util.immutableSeq(events))(event ⇒ handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAsync(Util.immutableSeq(events))(event ⇒ handler(event))
override def receive = super[PersistentActor].receive
}
/**
* Java API: an event sourced processor.
*/
@deprecated("UntypedEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
abstract class UntypedEventsourcedProcessor extends UntypedPersistentActor {
override def persistenceId: String = processorId
}
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]):
* command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
abstract class AbstractEventsourcedProcessor extends AbstractPersistentActor {
override def persistenceId: String = processorId
}
Other Akka source code examplesHere is a short list of links related to this Akka Eventsourced.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.