|
Akka/Scala example source code file (Processor.scala)
The Processor.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence import akka.AkkaException import akka.actor._ import akka.dispatch._ import java.util.concurrent.atomic.AtomicInteger /** * An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted. * * {{{ * import akka.persistence.{ Persistent, Processor } * * class MyProcessor extends Processor { * def receive = { * case Persistent(payload, sequenceNr) => // message has been written to journal * case other => // message has not been written to journal * } * } * * val processor = actorOf(Props[MyProcessor], name = "myProcessor") * * processor ! Persistent("foo") * processor ! "bar" * }}} * * During start and restart, persistent messages are replayed to a processor so that it can recover internal * state from these messages. New messages sent to a processor during recovery do not interfere with replayed * messages, hence applications don't need to wait for a processor to complete its recovery. * * Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life * cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by * sending it a [[Recover]] message. * * [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence * starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message * * During recovery, a processor internally buffers new messages until recovery completes, so that new messages * do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the * ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the * ''user stash'' for stashing/unstashing both persistent and transient messages. * * Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved * snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any, * that are younger than the snapshot. Default is to offer the latest saved snapshot. * * @see [[UntypedProcessor]] * @see [[Recover]] * @see [[PersistentBatch]] */ @deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") trait Processor extends ProcessorImpl { /** * Persistence id. Defaults to this persistent-actors's path and can be overridden. */ override def persistenceId: String = processorId } /** * INTERNAL API */ private[akka] object ProcessorImpl { // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) private val instanceIdCounter = new AtomicInteger } /** INTERNAL API */ @deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") private[akka] trait ProcessorImpl extends Actor with Recovery { // TODO: remove Processor in favor of PersistentActor #15230 import JournalProtocol._ private[persistence] val instanceId: Int = ProcessorImpl.instanceIdCounter.incrementAndGet() /** * Processes the highest stored sequence number response from the journal and then switches * to `processing` state. */ private val initializing = new State { override def toString: String = "initializing" def aroundReceive(receive: Receive, message: Any) = message match { case ReadHighestSequenceNrSuccess(highest) ⇒ _currentState = processing sequenceNr = highest receiverStash.unstashAll() onRecoveryCompleted(receive) case ReadHighestSequenceNrFailure(cause) ⇒ onRecoveryFailure(receive, cause) case other ⇒ receiverStash.stash() } } /** * Journals and processes new messages, both persistent and transient. */ private val processing = new State { override def toString: String = "processing" private var batching = false def aroundReceive(receive: Receive, message: Any) = message match { case r: Recover ⇒ // ignore case ReplayedMessage(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash case WriteMessageSuccess(p: PersistentRepr, _) ⇒ processPersistent(receive, p) case WriteMessageSuccess(r: Resequenceable, _) ⇒ process(receive, r) case WriteMessageFailure(p, cause, _) ⇒ process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) case LoopMessageSuccess(m, _) ⇒ process(receive, m) case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ if (processorBatch.isEmpty) batching = false else journalBatch() case p: PersistentRepr ⇒ addToBatch(p) if (!batching || maxBatchSizeReached) journalBatch() case n: NonPersistentRepr ⇒ addToBatch(n) if (!batching || maxBatchSizeReached) journalBatch() case pb: PersistentBatch ⇒ // submit all batched messages before submitting this user batch (isolated) if (!processorBatch.isEmpty) journalBatch() addToBatch(pb) journalBatch() case m ⇒ // submit all batched messages before looping this message if (processorBatch.isEmpty) batching = false else journalBatch() journal forward LoopMessage(m, self, instanceId) } def addToBatch(p: Resequenceable): Unit = p match { case p: PersistentRepr ⇒ processorBatch = processorBatch :+ p.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), sender = sender()) case r ⇒ processorBatch = processorBatch :+ r } def addToBatch(pb: PersistentBatch): Unit = pb.batch.foreach(addToBatch) def maxBatchSizeReached: Boolean = processorBatch.length >= extension.settings.journal.maxMessageBatchSize def journalBatch(): Unit = { flushJournalBatch() batching = true } } /** * INTERNAL API. * * Switches to `initializing` state and requests the highest stored sequence number from the journal. */ private[persistence] def onReplaySuccess(receive: Receive, awaitReplay: Boolean): Unit = { _currentState = initializing journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self) } /** * INTERNAL API. */ private[persistence] def onReplayFailure(receive: Receive, awaitReplay: Boolean, cause: Throwable): Unit = onRecoveryFailure(receive, cause) /** * Invokes this processor's behavior with a `RecoveryFailure` message. */ private def onRecoveryFailure(receive: Receive, cause: Throwable): Unit = receive.applyOrElse(RecoveryFailure(cause), unhandled) /** * Invokes this processor's behavior with a `RecoveryFinished` message. */ private def onRecoveryCompleted(receive: Receive): Unit = receive.applyOrElse(RecoveryCompleted, unhandled) private val _persistenceId = extension.persistenceId(self) private var processorBatch = Vector.empty[Resequenceable] private var sequenceNr: Long = 0L /** * Processor id. Defaults to this processor's path and can be overridden. */ @deprecated("Override `persistenceId: String` instead. Processor will be removed.", since = "2.3.4") override def processorId: String = _persistenceId // TODO: remove processorId /** * Returns `persistenceId`. */ def snapshotterId: String = persistenceId /** * Returns `true` if this processor is currently recovering. */ def recoveryRunning: Boolean = _currentState != processing /** * Returns `true` if this processor has successfully finished recovery. */ def recoveryFinished: Boolean = _currentState == processing /** * Marks a persistent message, identified by `sequenceNr`, as deleted. A message marked as deleted is * not replayed during recovery. This method is usually called inside `preRestartProcessor` when a * persistent message caused an exception. Processors that want to re-receive that persistent message * during recovery should not call this method. * * @param sequenceNr sequence number of the persistent message to be deleted. */ @deprecated("deleteMessage(sequenceNr) will be removed. Instead, validate before persist, and use deleteMessages for pruning.", since = "2.3.4") def deleteMessage(sequenceNr: Long): Unit = { deleteMessage(sequenceNr, permanent = false) } /** * Deletes a persistent message identified by `sequenceNr`. If `permanent` is set to `false`, * the persistent message is marked as deleted in the journal, otherwise it is permanently * deleted from the journal. A deleted message is not replayed during recovery. This method * is usually called inside `preRestartProcessor` when a persistent message caused an exception. * Processors that want to re-receive that persistent message during recovery should not call * this method. * * @param sequenceNr sequence number of the persistent message to be deleted. * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. */ @deprecated("deleteMessage(sequenceNr) will be removed. Instead, validate before persist, and use deleteMessages for pruning.", since = "2.3.4") def deleteMessage(sequenceNr: Long, permanent: Boolean): Unit = { journal ! DeleteMessages(List(PersistentIdImpl(persistenceId, sequenceNr)), permanent) } /** * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. * * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. */ def deleteMessages(toSequenceNr: Long): Unit = { deleteMessages(toSequenceNr, permanent = true) } /** * Deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. If `permanent` * is set to `false`, the persistent messages are marked as deleted in the journal, otherwise * they permanently deleted from the journal. * * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. */ def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = { journal ! DeleteMessagesTo(persistenceId, toSequenceNr, permanent) } /** * INTERNAL API */ private[akka] def flushJournalBatch(): Unit = { journal ! WriteMessages(processorBatch, self, instanceId) processorBatch = Vector.empty } /** * INTERNAL API. */ override protected[akka] def aroundPreStart(): Unit = { try preStart() finally super.preStart() } /** * INTERNAL API. */ override protected[akka] def aroundPostStop(): Unit = { try unstashAll(unstashFilterPredicate) finally postStop() } /** * INTERNAL API. */ override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { try { receiverStash.prepend(processorBatch.map(p ⇒ Envelope(p, p.sender, context.system))) receiverStash.unstashAll() unstashAll(unstashFilterPredicate) } finally { message match { case Some(WriteMessageSuccess(m, _)) ⇒ preRestartDefault(reason, Some(m)) case Some(LoopMessageSuccess(m, _)) ⇒ preRestartDefault(reason, Some(m)) case Some(ReplayedMessage(m)) ⇒ preRestartDefault(reason, Some(m)) case mo ⇒ preRestartDefault(reason, None) } } } /** * User-overridable callback. Called when a processor is started. Default implementation sends * a `Recover()` to `self`. */ @throws(classOf[Exception]) override def preStart(): Unit = { self ! Recover() } /** * User-overridable callback. Called before a processor is restarted. Default implementation sends * a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`. */ override def preRestart(reason: Throwable, message: Option[Any]): Unit = { message match { case Some(_) ⇒ self ! Recover(toSequenceNr = lastSequenceNr) case None ⇒ self ! Recover() } } /** * Calls [[preRestart]] and then `super.preRestart()`. If processor implementation classes want to * opt out from stopping child actors, they should override this method and call [[preRestart]] only. */ def preRestartDefault(reason: Throwable, message: Option[Any]): Unit = { try preRestart(reason, message) finally super.preRestart(reason, message) } override def unhandled(message: Any): Unit = { message match { case RecoveryCompleted ⇒ // mute case RecoveryFailure(cause) ⇒ val errorMsg = s"Processor killed after recovery failure (persisten id = [${persistenceId}]). " + "To avoid killing processors on recovery failure, a processor must handle RecoveryFailure messages. " + "RecoveryFailure was caused by: " + cause throw new ActorKilledException(errorMsg) case PersistenceFailure(payload, sequenceNumber, cause) ⇒ val errorMsg = "Processor killed after persistence failure " + s"(persistent id = [${persistenceId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " + "To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " + "PersistenceFailure was caused by: " + cause throw new ActorKilledException(errorMsg) case m ⇒ super.unhandled(m) } } private def nextSequenceNr(): Long = { sequenceNr += 1L sequenceNr } private val unstashFilterPredicate: Any ⇒ Boolean = { case _: WriteMessageSuccess ⇒ false case _: ReplayedMessage ⇒ false case _ ⇒ true } } /** * Sent to a [[Processor]] if a journal fails to write a [[Persistent]] message. If * not handled, an `akka.actor.ActorKilledException` is thrown by that processor. * * @param payload payload of the persistent message. * @param sequenceNr sequence number of the persistent message. * @param cause failure cause. */ @SerialVersionUID(1L) case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) /** * Sent to a [[Processor]] if a journal fails to replay messages or fetch that processor's * highest sequence number. If not handled, the prossor will be stopped. */ @SerialVersionUID(1L) case class RecoveryFailure(cause: Throwable) abstract class RecoveryCompleted /** * Sent to a [[Processor]] when the journal replay has been finished. */ @SerialVersionUID(1L) case object RecoveryCompleted extends RecoveryCompleted { /** * Java API: get the singleton instance */ def getInstance = this } /** * Java API: an actor that persists (journals) messages of type [[Persistent]]. Messages of other types * are not persisted. * * {{{ * import akka.persistence.Persistent; * import akka.persistence.Processor; * * class MyProcessor extends UntypedProcessor { * public void onReceive(Object message) throws Exception { * if (message instanceof Persistent) { * // message has been written to journal * Persistent persistent = (Persistent)message; * Object payload = persistent.payload(); * Long sequenceNr = persistent.sequenceNr(); * // ... * } else { * // message has not been written to journal * } * } * } * * // ... * * ActorRef processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor"); * * processor.tell(Persistent.create("foo"), null); * processor.tell("bar", null); * }}} * * During start and restart, persistent messages are replayed to a processor so that it can recover internal * state from these messages. New messages sent to a processor during recovery do not interfere with replayed * messages, hence applications don't need to wait for a processor to complete its recovery. * * Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life * cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by * sending it a [[Recover]] message. * * [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence * starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message. * * During recovery, a processor internally buffers new messages until recovery completes, so that new messages * do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the * ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the * ''user stash'' for stashing/unstashing both persistent and transient messages. * * Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved * snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any, * that are younger than the snapshot. Default is to offer the latest saved snapshot. * * @see [[Processor]] * @see [[Recover]] * @see [[PersistentBatch]] */ @deprecated("UntypedProcessor will be removed. Instead extend `akka.persistence.UntypedPersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") abstract class UntypedProcessor extends UntypedActor with Processor /** * Java API: compatible with lambda expressions * * An actor that persists (journals) messages of type [[Persistent]]. Messages of other types * are not persisted. * <p/> * Example: * <pre> * class MyProcessor extends AbstractProcessor { * public MyProcessor() { * receive(ReceiveBuilder. * match(Persistent.class, p -> { * Object payload = p.payload(); * Long sequenceNr = p.sequenceNr(); * // ... * }).build() * ); * } * } * * // ... * * ActorRef processor = context().actorOf(Props.create(MyProcessor.class), "myProcessor"); * * processor.tell(Persistent.create("foo"), null); * processor.tell("bar", null); * </pre> * * During start and restart, persistent messages are replayed to a processor so that it can recover internal * state from these messages. New messages sent to a processor during recovery do not interfere with replayed * messages, hence applications don't need to wait for a processor to complete its recovery. * * Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life * cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by * sending it a [[Recover]] message. * * [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence * starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message. * * During recovery, a processor internally buffers new messages until recovery completes, so that new messages * do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the * ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the * ''user stash'' for stashing/unstashing both persistent and transient messages. * * Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved * snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any, * that are younger than the snapshot. Default is to offer the latest saved snapshot. * * @see [[Processor]] * @see [[Recover]] * @see [[PersistentBatch]] */ @deprecated("AbstractProcessor will be removed. Instead extend `akka.persistence.AbstractPersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") abstract class AbstractProcessor extends AbstractActor with Processor Other Akka source code examplesHere is a short list of links related to this Akka Processor.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.