|
Akka/Scala example source code file (PersistentChannel.scala)
The PersistentChannel.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.AkkaException
import akka.actor._
import akka.persistence.JournalProtocol._
/**
* A [[PersistentChannel]] configuration object.
*
* @param redeliverMax Maximum number of redelivery attempts.
* @param redeliverInterval Interval between redelivery attempts.
* @param redeliverFailureListener Receiver of [[RedeliverFailure]] notifications which are sent when the number
* of redeliveries reaches `redeliverMax` for a sequence of messages. To enforce
* a redelivery of these messages, the listener has to [[Reset]] the persistent
* channel. Alternatively, it can also confirm these messages, preventing further
* redeliveries.
* @param replyPersistent If `true` the sender will receive the successfully stored [[Persistent]] message that has
* been submitted with a [[Deliver]] request, or a [[PersistenceFailure]] message in case of
* a persistence failure.
* @param pendingConfirmationsMax Message delivery is suspended by a channel if the number of pending reaches the
* specified value and is resumed again if the number of pending confirmations falls
* below `pendingConfirmationsMin`.
* @param pendingConfirmationsMin Message delivery is resumed if the number of pending confirmations falls below
* this limit. It is suspended again if it reaches `pendingConfirmationsMax`.
* Message delivery is enabled for a channel if the number of pending confirmations
* is below this limit, or, is resumed again if it falls below this limit.
* @param idleTimeout Maximum interval between read attempts made by a persistent channel. This settings applies,
* for example, after a journal failed to serve a read request. The next read request is then
* made after the configured timeout.
*/
@SerialVersionUID(1L)
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
final case class PersistentChannelSettings(
val redeliverMax: Int = 5,
val redeliverInterval: FiniteDuration = 5.seconds,
val redeliverFailureListener: Option[ActorRef] = None,
val replyPersistent: Boolean = false,
val pendingConfirmationsMax: Long = Long.MaxValue,
val pendingConfirmationsMin: Long = Long.MaxValue,
val idleTimeout: FiniteDuration = 1.minute) {
/**
* Java API.
*/
def withRedeliverMax(redeliverMax: Int): PersistentChannelSettings =
copy(redeliverMax = redeliverMax)
/**
* Java API.
*/
def withRedeliverInterval(redeliverInterval: FiniteDuration): PersistentChannelSettings =
copy(redeliverInterval = redeliverInterval)
/**
* Java API.
*/
def withRedeliverFailureListener(redeliverFailureListener: ActorRef): PersistentChannelSettings =
copy(redeliverFailureListener = Option(redeliverFailureListener))
/**
* Java API.
*/
def withReplyPersistent(replayPersistent: Boolean): PersistentChannelSettings =
copy(replyPersistent = replyPersistent)
/**
* Java API.
*/
def withPendingConfirmationsMax(pendingConfirmationsMax: Long): PersistentChannelSettings =
copy(pendingConfirmationsMax = pendingConfirmationsMax)
/**
* Java API.
*/
def withPendingConfirmationsMin(pendingConfirmationsMin: Long): PersistentChannelSettings =
copy(pendingConfirmationsMin = pendingConfirmationsMin)
/**
* Java API.
*/
def withIdleTimeout(idleTimeout: FiniteDuration): PersistentChannelSettings =
copy(idleTimeout = idleTimeout)
/**
* Converts this configuration object to [[ChannelSettings]].
*/
def toChannelSettings: ChannelSettings =
ChannelSettings(redeliverMax, redeliverInterval, redeliverFailureListener)
}
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
object PersistentChannelSettings {
/**
* Java API.
*/
def create() = PersistentChannelSettings.apply()
}
/**
* Resets a [[PersistentChannel]], forcing it to redeliver all unconfirmed persistent
* messages. This does not affect writing [[Deliver]] requests.
*/
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
case object Reset {
/**
* Java API.
*/
def getInstance() = this
}
/**
* Exception thrown by a [[PersistentChannel]] child actor to re-initiate delivery.
*/
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
class ResetException extends AkkaException("Channel reset on application request")
/**
* A [[PersistentChannel]] implements the same functionality as a [[Channel]] but additionally persists
* [[Deliver]] requests before they are served. Persistent channels are useful in combination with slow
* destinations or destinations that are unavailable for a long time. `Deliver` requests that have been
* persisted by a persistent channel are deleted when destinations confirm the receipt of the corresponding
* messages.
*
* The number of pending confirmations can be limited by a persistent channel based on the parameters of
* [[PersistentChannelSettings]]. It can suspend delivery when the number of pending confirmations reaches
* `pendingConfirmationsMax` and resume delivery again when this number falls below `pendingConfirmationsMin`.
* This prevents both flooding destinations with more messages than they can process and unlimited memory
* consumption by the channel. A persistent channel continues to persist [[Deliver]] request even when
* message delivery is temporarily suspended.
*
* A persistent channel can also reply to [[Deliver]] senders if the request has been successfully persisted
* or not (see `replyPersistent` parameter in [[PersistentChannelSettings]]). In case of success, the channel
* replies with the contained [[Persistent]] message, otherwise with a [[PersistenceFailure]] message.
*/
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
final class PersistentChannel private[akka] (_channelId: Option[String], channelSettings: PersistentChannelSettings) extends Actor {
private val id = _channelId match {
case Some(cid) ⇒ cid
case None ⇒ Persistence(context.system).channelId(self)
}
private val requestReader = context.actorOf(Props(classOf[RequestReader], id, channelSettings))
private val requestWriter = context.actorOf(Props(classOf[RequestWriter], id, channelSettings, requestReader))
def receive = {
case d @ Deliver(persistent: PersistentRepr, destination) ⇒
// Persist the Deliver request by sending reliableStorage a Persistent message
// with the Deliver request as payload. This persistent message is referred to
// as the wrapper message, whereas the persistent message contained in the Deliver
// request is referred to as wrapped message.
if (!persistent.confirms.contains(id)) requestWriter forward Persistent(d)
case Reset ⇒ requestReader ! Reset
}
}
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
object PersistentChannel {
/**
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with a
* generated id and default [[PersistentChannelSettings]].
*/
def props(): Props = props(PersistentChannelSettings())
/**
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with a
* generated id and specified `channelSettings`.
*
* @param channelSettings channel configuration object.
*/
def props(channelSettings: PersistentChannelSettings): Props =
Props(classOf[PersistentChannel], None, channelSettings)
/**
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with the
* specified id and default [[PersistentChannelSettings]].
*
* @param channelId channel id.
*/
def props(channelId: String): Props =
props(channelId, PersistentChannelSettings())
/**
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with the
* specified id and specified `channelSettings`.
*
* @param channelId channel id.
* @param channelSettings channel configuration object.
*/
def props(channelId: String, channelSettings: PersistentChannelSettings): Props =
Props(classOf[PersistentChannel], Some(channelId), channelSettings)
}
/**
* Plugin API.
*/
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
final case class DeliveredByPersistentChannel(
channelId: String,
persistentSequenceNr: Long,
deliverySequenceNr: Long = 0L,
channel: ActorRef = null) extends Delivered with PersistentId {
override def persistenceId: String = channelId
@deprecated("Use persistenceId.", since = "2.3.4")
override def processorId = persistenceId
def sequenceNr: Long = persistentSequenceNr
def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistentChannel =
copy(deliverySequenceNr = deliverySequenceNr, channel = channel)
}
/**
* INTERNAL API.
*/
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
private[persistence] class DeliveredByPersistentChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor {
private val publish = settings.internal.publishConfirmations
private val batchMax = settings.journal.maxConfirmationBatchSize
private var batching = false
private var batch = Vector.empty[DeliveredByPersistentChannel]
def receive = {
case DeleteMessagesSuccess(messageIds) ⇒
if (batch.isEmpty) batching = false else journalBatch()
messageIds.foreach {
case c: DeliveredByPersistentChannel ⇒
c.channel ! c
if (publish) context.system.eventStream.publish(c)
}
case DeleteMessagesFailure(_) ⇒
if (batch.isEmpty) batching = false else journalBatch()
case d: DeliveredByPersistentChannel ⇒
addToBatch(d)
if (!batching || maxBatchSizeReached) journalBatch()
case m ⇒ journal forward m
}
def addToBatch(pc: DeliveredByPersistentChannel): Unit =
batch = batch :+ pc
def maxBatchSizeReached: Boolean =
batch.length >= batchMax
def journalBatch(): Unit = {
journal ! DeleteMessages(batch, true, Some(self))
batch = Vector.empty
batching = true
}
}
/**
* Writes [[Deliver]] requests to the journal.
*/
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
private class RequestWriter(channelId: String, channelSettings: PersistentChannelSettings, reader: ActorRef) extends Processor {
import RequestWriter._
import channelSettings._
private val cbJournal = extension.confirmationBatchingJournalForChannel(channelId)
override val persistenceId = channelId
def receive = {
case p @ Persistent(Deliver(wrapped: PersistentRepr, _), _) ⇒
if (!recoveryRunning && wrapped.persistenceId != PersistentRepr.Undefined) {
// Write a delivery confirmation to the journal so that replayed Deliver
// requests from a sending processor are not persisted again. Replaying
// Deliver requests is now the responsibility of this processor
// and confirmation by destination is done to the wrapper p.sequenceNr.
cbJournal ! DeliveredByChannel(wrapped.persistenceId, channelId, wrapped.sequenceNr)
}
if (!recoveryRunning && replyPersistent)
sender() ! wrapped
case p: PersistenceFailure ⇒
if (replyPersistent) sender() ! p
}
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = {
super.aroundReceive(receive, message)
message match {
case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒
// activate reader after to reduce delivery latency
reader ! RequestsWritten
case _ ⇒
}
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
self ! Recover(replayMax = 0L)
}
override def preStart(): Unit = {
self ! Recover(replayMax = 0L)
}
}
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
private object RequestWriter {
case object RequestsWritten
}
/**
* Reads [[Deliver]] requests from the journal and processes them. The number of `Deliver` requests
* processed per iteration depends on
*
* - `pendingConfirmationsMax` parameter in [[PersistentChannelSettings]]
* - `pendingConfirmationsMin` parameter in [[PersistentChannelSettings]] and the
* - current number of pending confirmations.
*
* @see [[PersistentChannel]]
*/
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
private class RequestReader(channelId: String, channelSettings: PersistentChannelSettings) extends Actor with Recovery {
import RequestWriter._
import channelSettings._
private val delivery = context.actorOf(Props(classOf[ReliableDelivery], channelSettings.toChannelSettings))
private val idle: State = new State {
override def toString: String = "idle"
def aroundReceive(receive: Receive, message: Any): Unit = message match {
case r: Recover ⇒ // ignore
case other ⇒ process(receive, other)
}
}
def receive = {
case p @ Persistent(d @ Deliver(wrapped: PersistentRepr, destination), snr) ⇒
val wrapper = p.asInstanceOf[PersistentRepr]
val prepared = prepareDelivery(wrapped, wrapper)
numReplayed += 1
numPending += 1
delivery forward d.copy(prepared)
case d: Delivered ⇒
delivery forward d
numPending = math.max(numPending - 1L, 0L)
if (numPending == pendingConfirmationsMin) onReadRequest()
case d @ RedeliverFailure(ms) ⇒
val numPendingPrev = numPending
numPending = math.max(numPending - ms.length, 0L)
if (numPendingPrev > pendingConfirmationsMin && numPending <= pendingConfirmationsMin) onReadRequest()
redeliverFailureListener.foreach(_.tell(d, context.parent))
case RequestsWritten | ReceiveTimeout ⇒
if (numPending <= pendingConfirmationsMin) onReadRequest()
case Reset ⇒ throw new ResetException
}
def onReplaySuccess(receive: Receive, await: Boolean): Unit = {
onReplayComplete()
if (numReplayed > 0 && numPending <= pendingConfirmationsMin) onReadRequest()
numReplayed = 0L
}
def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = {
onReplayComplete()
}
override def persistenceId: String = channelId
def snapshotterId: String =
s"${channelId}-reader"
private val dbJournal = extension.deletionBatchingJournalForChannel(channelId)
/**
* Number of delivery requests replayed (read) per iteration.
*/
private var numReplayed = 0L
/**
* Number of pending confirmations.
*/
private var numPending = 0L
context.setReceiveTimeout(channelSettings.idleTimeout)
private def onReplayComplete(): Unit = {
_currentState = idle
receiverStash.unstashAll()
}
private def onReadRequest(): Unit = if (_currentState == idle) {
_currentState = replayStarted(await = false)
dbJournal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, pendingConfirmationsMax - numPending, persistenceId, self)
}
/**
* @param wrapped persistent message contained in a deliver request
* @param wrapper persistent message that contains a deliver request
*/
private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = {
ConfirmablePersistentImpl(wrapped,
confirmTarget = dbJournal,
confirmMessage = DeliveredByPersistentChannel(channelId, wrapper.sequenceNr, channel = self))
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try receiverStash.unstashAll() finally super.preRestart(reason, message)
}
override def preStart(): Unit = {
super.preStart()
self ! Recover(replayMax = 0L)
self ! RequestsWritten // considers savepoint loaded from snapshot (TODO)
}
}
Other Akka source code examplesHere is a short list of links related to this Akka PersistentChannel.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.