|
Akka/Scala example source code file (Persistent.scala)
The Persistent.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import java.lang.{ Iterable ⇒ JIterable }
import java.util.{ List ⇒ JList }
import scala.collection.immutable
import akka.actor.{ ActorContext, ActorRef }
import akka.japi.Util.immutableSeq
import akka.pattern.PromiseActorRef
import akka.persistence.serialization.Message
/**
* INTERNAL API
*
* Marks messages which can be resequenced by the [[akka.persistence.journal.AsyncWriteJournal]].
*
* In essence it is either an [[NonPersistentRepr]] or [[Persistent]].
*/
private[persistence] sealed trait Resequenceable {
def payload: Any
def sender: ActorRef
}
/**
* INTERNAL API
* Message which can be resequenced by the Journal, but will not be persisted.
*/
private[persistence] final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends Resequenceable
/** Persistent message. */
@deprecated("Use akka.persistence.PersistentActor instead.", since = "2.3.4")
sealed abstract class Persistent extends Resequenceable {
/**
* This persistent message's payload.
*/
//#payload
def payload: Any
//#payload
/**
* This persistent message's sequence number.
*/
//#sequence-nr
def sequenceNr: Long
//#sequence-nr
/**
* Creates a new persistent message with the specified `payload`.
*/
def withPayload(payload: Any): Persistent
}
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
object Persistent {
/**
* Java API: creates a new persistent message. Must only be used outside processors.
*
* @param payload payload of new persistent message.
*/
def create(payload: Any): Persistent =
create(payload, null)
/**
* Java API: creates a new persistent message, derived from the specified current message. The current
* message can be obtained inside a [[Processor]] by calling `getCurrentPersistentMessage()`.
*
* @param payload payload of new persistent message.
* @param currentPersistentMessage current persistent message.
*/
def create(payload: Any, currentPersistentMessage: Persistent): Persistent =
apply(payload)(Option(currentPersistentMessage))
/**
* Creates a new persistent message, derived from an implicit current message.
* When used inside a [[Processor]], this is the optional current [[Persistent]]
* message of that processor.
*
* @param payload payload of the new persistent message.
* @param currentPersistentMessage optional current persistent message, defaults to `None`.
*/
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent =
currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentRepr(payload))
/**
* [[Persistent]] extractor.
*/
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
def unapply(persistent: Persistent): Option[(Any, Long)] =
Some((persistent.payload, persistent.sequenceNr))
}
/**
* Persistent message that has been delivered by a [[Channel]] or [[PersistentChannel]]. Channel
* destinations that receive messages of this type can confirm their receipt by calling [[confirm]].
*/
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
sealed abstract class ConfirmablePersistent extends Persistent {
/**
* Called by [[Channel]] and [[PersistentChannel]] destinations to confirm the receipt of a
* persistent message.
*/
def confirm(): Unit
/**
* Number of redeliveries. Only greater than zero if message has been redelivered by a [[Channel]]
* or [[PersistentChannel]].
*/
def redeliveries: Int
}
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
object ConfirmablePersistent {
/**
* [[ConfirmablePersistent]] extractor.
*/
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
def unapply(persistent: ConfirmablePersistent): Option[(Any, Long, Int)] =
Some((persistent.payload, persistent.sequenceNr, persistent.redeliveries))
}
/**
* Instructs a [[Processor]] to atomically write the contained [[Persistent]] messages to the
* journal. The processor receives the written messages individually as [[Persistent]] messages.
* During recovery, they are also replayed individually.
*/
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
final case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
object PersistentBatch {
/**
* Java API.
*/
def create(persistentBatch: JIterable[Persistent]) =
PersistentBatch(immutableSeq(persistentBatch))
}
/**
* Plugin API: confirmation entry written by journal plugins.
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
trait PersistentConfirmation {
@deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4")
final def processorId: String = persistenceId
def persistenceId: String
def channelId: String
def sequenceNr: Long
}
/**
* Plugin API: persistent message identifier.
*/
@deprecated("deleteMessages will be removed.", since = "2.3.4")
trait PersistentId {
/**
* Persistent id that journals a persistent message
*/
def processorId: String
/**
* Persistent id that journals a persistent message
*/
def persistenceId: String = processorId
/**
* A persistent message's sequence number.
*/
def sequenceNr: Long
}
/**
* INTERNAL API.
*/
@deprecated("deleteMessages will be removed.", since = "2.3.4")
private[persistence] final case class PersistentIdImpl(processorId: String, sequenceNr: Long) extends PersistentId
/**
* Plugin API: representation of a persistent message in the journal plugin API.
*
* @see [[journal.SyncWriteJournal]]
* @see [[journal.AsyncWriteJournal]]
* @see [[journal.AsyncRecovery]]
*/
trait PersistentRepr extends Persistent with Resequenceable with PersistentId with Message {
// todo we want to get rid of the Persistent() wrapper from user land; PersistentRepr is here to stay. #15230
import scala.collection.JavaConverters._
/**
* This persistent message's payload.
*/
def payload: Any
/**
* `true` if this message is marked as deleted.
*/
def deleted: Boolean
/**
* Number of redeliveries. Only greater than zero if message has been redelivered by a [[Channel]]
* or [[PersistentChannel]].
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def redeliveries: Int
/**
* Channel ids of delivery confirmations that are available for this message. Only non-empty
* for replayed messages.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def confirms: immutable.Seq[String]
/**
* Java API, Plugin API: channel ids of delivery confirmations that are available for this
* message. Only non-empty for replayed messages.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def getConfirms: JList[String] = confirms.asJava
/**
* `true` only if this message has been delivered by a channel.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def confirmable: Boolean
/**
* Delivery confirmation message.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def confirmMessage: Delivered
/**
* Delivery confirmation message.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def confirmTarget: ActorRef
/**
* Sender of this message.
*/
def sender: ActorRef
/**
* INTERNAL API.
*/
private[persistence] def prepareWrite(sender: ActorRef): PersistentRepr
/**
* INTERNAL API.
*/
private[persistence] def prepareWrite()(implicit context: ActorContext): PersistentRepr =
prepareWrite(if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender)
/**
* Creates a new copy of this [[PersistentRepr]].
*/
def update(
sequenceNr: Long = sequenceNr,
@deprecatedName('processorId) persistenceId: String = persistenceId,
deleted: Boolean = deleted,
@deprecated("Channel will be removed.", since = "2.3.4") redeliveries: Int = redeliveries,
@deprecated("Channel will be removed.", since = "2.3.4") confirms: immutable.Seq[String] = confirms,
@deprecated("Channel will be removed.", since = "2.3.4") confirmMessage: Delivered = confirmMessage,
@deprecated("Channel will be removed.", since = "2.3.4") confirmTarget: ActorRef = confirmTarget,
sender: ActorRef = sender): PersistentRepr
}
object PersistentRepr {
/**
* Plugin API: value of an undefined processor or channel id.
*/
val Undefined = ""
/**
* Plugin API.
*/
def apply(
payload: Any,
sequenceNr: Long = 0L,
@deprecatedName('processorId) persistenceId: String = PersistentRepr.Undefined,
deleted: Boolean = false,
@deprecated("Channel will be removed.", since = "2.3.4") redeliveries: Int = 0,
@deprecated("Channel will be removed.", since = "2.3.4") confirms: immutable.Seq[String] = Nil,
@deprecated("Channel will be removed.", since = "2.3.4") confirmable: Boolean = false,
@deprecated("Channel will be removed.", since = "2.3.4") confirmMessage: Delivered = null,
@deprecated("Channel will be removed.", since = "2.3.4") confirmTarget: ActorRef = null,
sender: ActorRef = null) =
if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, persistenceId, deleted, redeliveries, confirms, confirmMessage, confirmTarget, sender)
else PersistentImpl(payload, sequenceNr, persistenceId, deleted, confirms, sender)
/**
* Java API, Plugin API.
*/
def create = apply _
}
/**
* INTERNAL API.
*/
private[persistence] final case class PersistentImpl(
payload: Any,
sequenceNr: Long,
@deprecatedName('processorId) override val persistenceId: String,
deleted: Boolean,
confirms: immutable.Seq[String],
sender: ActorRef) extends Persistent with PersistentRepr {
def withPayload(payload: Any): Persistent =
copy(payload = payload)
def prepareWrite(sender: ActorRef) =
copy(sender = sender)
def update(
sequenceNr: Long,
@deprecatedName('processorId) persistenceId: String,
deleted: Boolean,
redeliveries: Int,
confirms: immutable.Seq[String],
confirmMessage: Delivered,
confirmTarget: ActorRef,
sender: ActorRef) =
copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, confirms = confirms, sender = sender)
val redeliveries: Int = 0
val confirmable: Boolean = false
val confirmMessage: Delivered = null
val confirmTarget: ActorRef = null
@deprecated("Use persistenceId.", since = "2.3.4")
override def processorId = persistenceId
}
/**
* INTERNAL API.
*/
@deprecated("ConfirmablePersistent will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4")
private[persistence] final case class ConfirmablePersistentImpl(
payload: Any,
sequenceNr: Long,
@deprecatedName('processorId) override val persistenceId: String,
deleted: Boolean,
redeliveries: Int,
confirms: immutable.Seq[String],
confirmMessage: Delivered,
confirmTarget: ActorRef,
sender: ActorRef) extends ConfirmablePersistent with PersistentRepr {
def withPayload(payload: Any): ConfirmablePersistent =
copy(payload = payload)
def confirm(): Unit =
if (confirmTarget != null) confirmTarget ! confirmMessage
def confirmable = true
def prepareWrite(sender: ActorRef) =
copy(sender = sender, confirmMessage = null, confirmTarget = null)
def update(sequenceNr: Long, @deprecatedName('processorId) persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Delivered, confirmTarget: ActorRef, sender: ActorRef) =
copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, redeliveries = redeliveries, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender)
@deprecated("Use persistenceId.", since = "2.3.4")
override def processorId = persistenceId
}
/**
* INTERNAL API.
*/
@deprecated("ConfirmablePersistent will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4")
private[persistence] object ConfirmablePersistentImpl {
def apply(persistent: PersistentRepr, confirmMessage: Delivered, confirmTarget: ActorRef = null): ConfirmablePersistentImpl =
ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.persistenceId, persistent.deleted, persistent.redeliveries, persistent.confirms, confirmMessage, confirmTarget, persistent.sender)
}
Other Akka source code examplesHere is a short list of links related to this Akka Persistent.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.