|
Akka/Scala example source code file (Channel.scala)
The Channel.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import java.lang.{ Iterable ⇒ JIterable }
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor._
import akka.persistence.serialization.Message
import akka.persistence.JournalProtocol._
// TODO: remove Channel
/**
* A [[Channel]] configuration object.
*
* @param redeliverMax Maximum number of redelivery attempts.
* @param redeliverInterval Interval between redelivery attempts.
* @param redeliverFailureListener Receiver of [[RedeliverFailure]] notifications which are sent when the number
* of redeliveries reaches `redeliverMax` for a sequence of messages. To enforce
* a redelivery of these messages, the listener has to restart the sending processor.
* Alternatively, it can also confirm these messages, preventing further redeliveries.
*/
@SerialVersionUID(1L)
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
final case class ChannelSettings(
val redeliverMax: Int = 5,
val redeliverInterval: FiniteDuration = 5.seconds,
val redeliverFailureListener: Option[ActorRef] = None) {
/**
* Java API.
*/
def withRedeliverMax(redeliverMax: Int): ChannelSettings =
copy(redeliverMax = redeliverMax)
/**
* Java API.
*/
def withRedeliverInterval(redeliverInterval: FiniteDuration): ChannelSettings =
copy(redeliverInterval = redeliverInterval)
/**
* Java API.
*/
def withRedeliverFailureListener(redeliverFailureListener: ActorRef): ChannelSettings =
copy(redeliverFailureListener = Option(redeliverFailureListener))
}
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
object ChannelSettings {
/**
* Java API.
*/
def create() = ChannelSettings.apply()
}
/**
* A channel is used by [[Processor]]s (and [[View]]s) for sending [[Persistent]] messages to destinations.
* The main responsibility of a channel is to prevent redundant delivery of replayed messages to destinations
* when a processor is recovered.
*
* A channel is instructed to deliver a persistent message to a destination with the [[Deliver]] command. A
* destination is provided as `ActorPath` and messages are sent via that path's `ActorSelection`.
*
* {{{
* class ForwardExample extends Processor {
* val destination = context.actorOf(Props[MyDestination])
* val channel = context.actorOf(Channel.props(), "myChannel")
*
* def receive = {
* case m @ Persistent(payload, _) =>
* // forward modified message to destination
* channel forward Deliver(m.withPayload(s"fw: ${payload}"), destination.path)
* }
* }
* }}}
*
* To reply to the sender of a persistent message, the `sender` reference should be used as channel
* destination.
*
* {{{
* class ReplyExample extends Processor {
* val channel = context.actorOf(Channel.props(), "myChannel")
*
* def receive = {
* case m @ Persistent(payload, _) =>
* // reply modified message to sender
* channel ! Deliver(m.withPayload(s"re: ${payload}"), sender.path)
* }
* }
* }}}
*
* Redundant delivery of messages to destinations is only prevented if the receipt of these messages
* is explicitly confirmed. Therefore, persistent messages that are delivered via a channel are of type
* [[ConfirmablePersistent]]. Their receipt can be confirmed by a destination by calling the `confirm()`
* method on these messages.
*
* {{{
* class MyDestination extends Actor {
* def receive = {
* case cp @ ConfirmablePersistent(payload, sequenceNr, redeliveries) => cp.confirm()
* }
* }
* }}}
*
* If a destination does not confirm the receipt of a `ConfirmablePersistent` message, it will be redelivered
* by the channel according to the parameters in [[ChannelSettings]]. Redelivered messages have a `redeliveries`
* value greater than zero.
*
* If the maximum number of redeliveries is reached for certain messages, they are removed from the channel and
* a `redeliverFailureListener` (if specified, see [[ChannelSettings]]) is notified about these messages with a
* [[RedeliverFailure]] message. Besides other application-specific tasks, this listener can restart the sending
* processor to enforce a redelivery of these messages or confirm these messages to prevent further redeliveries.
*
* @see [[Deliver]]
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
final class Channel private[akka] (_channelId: Option[String], channelSettings: ChannelSettings) extends Actor {
import channelSettings._
private val id = _channelId match {
case Some(cid) ⇒ cid
case None ⇒ Persistence(context.system).channelId(self)
}
private val journal = Persistence(context.system).confirmationBatchingJournalForChannel(id)
private val delivery = context.actorOf(Props(classOf[ReliableDelivery], channelSettings))
def receive = {
case d @ Deliver(persistent: PersistentRepr, _) ⇒
if (!persistent.confirms.contains(id)) delivery forward d.copy(prepareDelivery(persistent))
case d: RedeliverFailure ⇒ redeliverFailureListener.foreach(_ ! d)
case d: Delivered ⇒ delivery forward d
}
private def prepareDelivery(persistent: PersistentRepr): PersistentRepr =
ConfirmablePersistentImpl(persistent,
confirmTarget = journal,
confirmMessage = DeliveredByChannel(persistent.persistenceId, id, persistent.sequenceNr, channel = self))
}
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
object Channel {
/**
* Returns a channel actor configuration object for creating a [[Channel]] with a
* generated id and default [[ChannelSettings]].
*/
def props(): Props =
props(ChannelSettings())
/**
* Returns a channel actor configuration object for creating a [[Channel]] with a
* generated id and specified `channelSettings`.
*
* @param channelSettings channel configuration object.
*/
def props(channelSettings: ChannelSettings): Props =
Props(classOf[Channel], None, channelSettings)
/**
* Returns a channel actor configuration object for creating a [[Channel]] with the
* specified id and default [[ChannelSettings]].
*
* @param channelId channel id.
*/
def props(channelId: String): Props =
props(channelId, ChannelSettings())
/**
* Returns a channel actor configuration object for creating a [[Channel]] with the
* specified id and specified `channelSettings`.
*
* @param channelId channel id.
* @param channelSettings channel configuration object.
*/
def props(channelId: String, channelSettings: ChannelSettings): Props =
Props(classOf[Channel], Some(channelId), channelSettings)
}
/**
* Instructs a [[Channel]] or [[PersistentChannel]] to deliver a `persistent` message to
* a `destination`.
*
* @param persistent persistent message.
* @param destination persistent message destination.
*/
@SerialVersionUID(1L)
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
final case class Deliver(persistent: Persistent, destination: ActorPath) extends Message
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
object Deliver {
/**
* Java API.
*/
def create(persistent: Persistent, destination: ActorPath) = Deliver(persistent, destination)
}
/**
* Plugin API: confirmation message generated by receivers of [[ConfirmablePersistent]] messages
* by calling `ConfirmablePersistent.confirm()`.
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
trait Delivered extends Message {
def channelId: String
def persistentSequenceNr: Long
def deliverySequenceNr: Long
def channel: ActorRef
/**
* INTERNAL API.
*/
private[persistence] def update(deliverySequenceNr: Long = deliverySequenceNr, channel: ActorRef = channel): Delivered
}
/**
* Plugin API.
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
final case class DeliveredByChannel(
@deprecatedName('processorId) persistenceId: String,
channelId: String,
persistentSequenceNr: Long,
deliverySequenceNr: Long = 0L,
channel: ActorRef = null) extends Delivered with PersistentConfirmation {
def sequenceNr: Long = persistentSequenceNr
def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByChannel =
copy(deliverySequenceNr = deliverySequenceNr, channel = channel)
}
/**
* INTERNAL API.
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
private[persistence] class DeliveredByChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor {
private val publish = settings.internal.publishConfirmations
private val batchMax = settings.journal.maxConfirmationBatchSize
private var batching = false
private var batch = Vector.empty[DeliveredByChannel]
def receive = {
case WriteConfirmationsSuccess(confirmations) ⇒
if (batch.isEmpty) batching = false else journalBatch()
confirmations.foreach { c ⇒
val dbc = c.asInstanceOf[DeliveredByChannel]
if (dbc.channel != null) dbc.channel ! c
if (publish) context.system.eventStream.publish(c)
}
case WriteConfirmationsFailure(_) ⇒
if (batch.isEmpty) batching = false else journalBatch()
case d: DeliveredByChannel ⇒
addToBatch(d)
if (!batching || maxBatchSizeReached) journalBatch()
case m ⇒ journal forward m
}
def addToBatch(pc: DeliveredByChannel): Unit =
batch = batch :+ pc
def maxBatchSizeReached: Boolean =
batch.length >= batchMax
def journalBatch(): Unit = {
journal ! WriteConfirmations(batch, self)
batch = Vector.empty
batching = true
}
}
/**
* Notification message to inform channel listeners about messages that have reached the maximum
* number of redeliveries.
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
final case class RedeliverFailure(messages: immutable.Seq[ConfirmablePersistent]) {
/**
* Java API.
*/
def getMessages: JIterable[ConfirmablePersistent] = messages.asJava
}
/**
* Reliably deliver messages contained in [[Deliver]] requests to their destinations. Unconfirmed
* messages are redelivered according to the parameters in [[ChannelSettings]].
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
private class ReliableDelivery(redeliverSettings: ChannelSettings) extends Actor {
import redeliverSettings._
import ReliableDelivery._
private val redelivery = context.actorOf(Props(classOf[Redelivery], redeliverSettings))
private var deliveryAttempts: DeliveryAttempts = immutable.SortedMap.empty
private var deliverySequenceNr: Long = 0L
def receive = {
case d @ Deliver(persistent: ConfirmablePersistentImpl, destination) ⇒
val dsnr = nextDeliverySequenceNr()
val psnr = persistent.sequenceNr
val confirm = persistent.confirmMessage.update(deliverySequenceNr = dsnr)
val updated = persistent.update(confirmMessage = confirm, sequenceNr = if (psnr == 0) dsnr else psnr)
context.actorSelection(destination).tell(updated, sender())
deliveryAttempts += (dsnr -> DeliveryAttempt(updated, destination, sender()))
case d: Delivered ⇒
deliveryAttempts -= d.deliverySequenceNr
redelivery forward d
case Redeliver ⇒
val limit = System.nanoTime - redeliverInterval.toNanos
val (older, younger) = deliveryAttempts.span { case (_, a) ⇒ a.timestamp < limit }
redelivery ! Redeliver(older, redeliverMax)
deliveryAttempts = younger
}
private def nextDeliverySequenceNr(): Long = {
deliverySequenceNr += 1
deliverySequenceNr
}
}
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
private object ReliableDelivery {
type DeliveryAttempts = immutable.SortedMap[Long, DeliveryAttempt]
type FailedAttempts = Vector[ConfirmablePersistentImpl]
final case class DeliveryAttempt(persistent: ConfirmablePersistentImpl, destination: ActorPath, sender: ActorRef, timestamp: Long = System.nanoTime) {
def incrementRedeliveryCount =
copy(persistent.update(redeliveries = persistent.redeliveries + 1))
}
final case class Redeliver(attempts: DeliveryAttempts, redeliveryMax: Int)
}
/**
* Redelivery process used by [[ReliableDelivery]].
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
private class Redelivery(redeliverSettings: ChannelSettings) extends Actor {
import context.dispatcher
import redeliverSettings._
import ReliableDelivery._
private var redeliveryAttempts: DeliveryAttempts = immutable.SortedMap.empty
private var redeliverySchedule: Cancellable = _
def receive = {
case Redeliver(as, max) ⇒
val (attempts, failed) = (redeliveryAttempts ++ as).foldLeft[(DeliveryAttempts, FailedAttempts)]((immutable.SortedMap.empty, Vector.empty)) {
case ((attempts, failed), (k, attempt)) ⇒
val persistent = attempt.persistent
if (persistent.redeliveries >= redeliverMax) {
(attempts, failed :+ persistent)
} else {
val updated = attempt.incrementRedeliveryCount
context.actorSelection(updated.destination).tell(updated.persistent, updated.sender)
(attempts.updated(k, updated), failed)
}
}
redeliveryAttempts = attempts
scheduleRedelivery()
failed.headOption.foreach(_.confirmMessage.channel ! RedeliverFailure(failed))
case c: Delivered ⇒
redeliveryAttempts -= c.deliverySequenceNr
}
override def preStart(): Unit =
scheduleRedelivery()
override def postStop(): Unit =
redeliverySchedule.cancel()
private def scheduleRedelivery(): Unit =
redeliverySchedule = context.system.scheduler.scheduleOnce(redeliverInterval, context.parent, Redeliver)
}
Other Akka source code examplesHere is a short list of links related to this Akka Channel.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.