|
Akka/Scala example source code file (AtLeastOnceDelivery.scala)
The AtLeastOnceDelivery.scala Akka example source code/** * Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence import scala.annotation.tailrec import scala.collection.breakOut import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import akka.actor.Actor import akka.actor.ActorPath import akka.persistence.serialization.Message object AtLeastOnceDelivery { /** * Snapshot of current `AtLeastOnceDelivery` state. Can be retrieved with * [[AtLeastOnceDelivery#getDeliverySnapshot]] and saved with [[PersistentActor#saveSnapshot]]. * During recovery the snapshot received in [[SnapshotOffer]] should be set * with [[AtLeastOnceDelivery.setDeliverySnapshot]]. */ @SerialVersionUID(1L) case class AtLeastOnceDeliverySnapshot(currentDeliveryId: Long, unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]) extends Message { /** * Java API */ def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = { import scala.collection.JavaConverters._ unconfirmedDeliveries.asJava } } /** * @see [[AtLeastOnceDelivery#warnAfterNumberOfUnconfirmedAttempts]] */ @SerialVersionUID(1L) case class UnconfirmedWarning(unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]) { /** * Java API */ def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = { import scala.collection.JavaConverters._ unconfirmedDeliveries.asJava } } /** * Information about a message that has not been confirmed. Included in [[UnconfirmedWarning]] * and [[AtLeastOnceDeliverySnapshot]]. */ case class UnconfirmedDelivery(deliveryId: Long, destination: ActorPath, message: Any) { /** * Java API */ def getMessage(): AnyRef = message.asInstanceOf[AnyRef] } /** * @see [[AtLeastOnceDelivery#maxUnconfirmedMessages]] */ class MaxUnconfirmedMessagesExceededException(message: String) extends RuntimeException(message) private object Internal { case class Delivery(destination: ActorPath, message: Any, timestamp: Long, attempt: Int) case object RedeliveryTick } } /** * Mix-in this trait with your `PersistentActor` to send messages with at-least-once * delivery semantics to destinations. It takes care of re-sending messages when they * have not been confirmed within a configurable timeout. Use the [[#deliver]] method to * send a message to a destination. Call the [[#confirmDelivery]] method when the destination * has replied with a confirmation message. * * At-least-once delivery implies that original message send order is not always retained * and the destination may receive duplicate messages due to possible resends. * * The interval between redelivery attempts can be defined by [[#redeliverInterval]]. * After a number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message * will be sent to `self`. The re-sending will still continue, but you can choose to call * [[#confirmDelivery]] to cancel the re-sending. * * The `AtLeastOnceDelivery` trait has a state consisting of unconfirmed messages and a * sequence number. It does not store this state itself. You must persist events corresponding * to the `deliver` and `confirmDelivery` invocations from your `PersistentActor` so that the * state can be restored by calling the same methods during the recovery phase of the * `PersistentActor`. Sometimes these events can be derived from other business level events, * and sometimes you must create separate events. During recovery calls to `deliver` * will not send out the message, but it will be sent later if no matching `confirmDelivery` * was performed. * * Support for snapshots is provided by [[#getDeliverySnapshot]] and [[#setDeliverySnapshot]]. * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages. * If you need a custom snapshot for other parts of the actor state you must also include the * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot` * as a blob in your custom snapshot. */ trait AtLeastOnceDelivery extends Processor { // FIXME The reason for extending Processor instead of PersistentActor is // the class hierarchy for UntypedPersistentActorWithAtLeastOnceDelivery import AtLeastOnceDelivery._ import AtLeastOnceDelivery.Internal._ /** * Interval between redelivery attempts. * * The default value can be configured with the * `akka.persistence.at-least-once-delivery.redeliver-interval` * configuration key. This method can be overridden by implementation classes to return * non-default values. */ def redeliverInterval: FiniteDuration = defaultRedeliverInterval private val defaultRedeliverInterval: FiniteDuration = Persistence(context.system).settings.atLeastOnceDelivery.redeliverInterval /** * After this number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message * will be sent to `self`. The count is reset after a restart. * * The default value can be configured with the * `akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts` * configuration key. This method can be overridden by implementation classes to return * non-default values. */ def warnAfterNumberOfUnconfirmedAttempts: Int = defaultWarnAfterNumberOfUnconfirmedAttempts private val defaultWarnAfterNumberOfUnconfirmedAttempts: Int = Persistence(context.system).settings.atLeastOnceDelivery.warnAfterNumberOfUnconfirmedAttempts /** * Maximum number of unconfirmed messages that this actor is allowed to hold in memory. * If this number is exceed [[#deliver]] will not accept more messages and it will throw * [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]. * * The default value can be configured with the * `akka.persistence.at-least-once-delivery.max-unconfirmed-messages` * configuration key. This method can be overridden by implementation classes to return * non-default values. */ def maxUnconfirmedMessages: Int = defaultMaxUnconfirmedMessages private val defaultMaxUnconfirmedMessages: Int = Persistence(context.system).settings.atLeastOnceDelivery.maxUnconfirmedMessages private val redeliverTask = { import context.dispatcher val interval = redeliverInterval / 2 context.system.scheduler.schedule(interval, interval, self, RedeliveryTick) } private var deliverySequenceNr = 0L private var unconfirmed = immutable.SortedMap.empty[Long, Delivery] private def nextDeliverySequenceNr(): Long = { deliverySequenceNr += 1 deliverySequenceNr } /** * Scala API: Send the message created by the `deliveryIdToMessage` function to * the `destination` actor. It will retry sending the message until * the delivery is confirmed with [[#confirmDelivery]]. Correlation * between `deliver` and `confirmDelivery` is performed with the * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` * function. The `deliveryId` is typically passed in the message to the * destination, which replies with a message containing the same `deliveryId`. * * The `deliveryId` is a strictly monotonically increasing sequence number without * gaps. The same sequence is used for all destinations of the actor, i.e. when sending * to multiple destinations the destinations will see gaps in the sequence if no * translation is performed. * * During recovery this method will not send out the message, but it will be sent * later if no matching `confirmDelivery` was performed. * * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorPath, deliveryIdToMessage: Long ⇒ Any): Unit = { if (unconfirmed.size >= maxUnconfirmedMessages) throw new MaxUnconfirmedMessagesExceededException( s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]") val deliveryId = nextDeliverySequenceNr() val now = System.nanoTime() val d = Delivery(destination, deliveryIdToMessage(deliveryId), now, attempt = 0) if (recoveryRunning) unconfirmed = unconfirmed.updated(deliveryId, d) else send(deliveryId, d, now) } /** * Call this method when a message has been confirmed by the destination, * or to abort re-sending. * @see [[#deliver]] * @return `true` the first time the `deliveryId` is confirmed, i.e. `false` for duplicate confirm */ def confirmDelivery(deliveryId: Long): Boolean = { if (unconfirmed.contains(deliveryId)) { unconfirmed -= deliveryId true } else false } /** * Number of messages that have not been confirmed yet. */ def numberOfUnconfirmed: Int = unconfirmed.size private def redeliverOverdue(): Unit = { val now = System.nanoTime() val deadline = now - redeliverInterval.toNanos var warnings = Vector.empty[UnconfirmedDelivery] unconfirmed foreach { case (deliveryId, delivery) ⇒ if (delivery.timestamp <= deadline) { send(deliveryId, delivery, now) if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts) warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message) } } if (warnings.nonEmpty) self ! UnconfirmedWarning(warnings) } private def send(deliveryId: Long, d: Delivery, timestamp: Long): Unit = { context.actorSelection(d.destination) ! d.message unconfirmed = unconfirmed.updated(deliveryId, d.copy(timestamp = timestamp, attempt = d.attempt + 1)) } /** * Full state of the `AtLeastOnceDelivery`. It can be saved with [[PersistentActor#saveSnapshot]]. * During recovery the snapshot received in [[SnapshotOffer]] should be set * with [[#setDeliverySnapshot]]. * * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages. * If you need a custom snapshot for other parts of the actor state you must also include the * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot` * as a blob in your custom snapshot. */ def getDeliverySnapshot: AtLeastOnceDeliverySnapshot = AtLeastOnceDeliverySnapshot(deliverySequenceNr, unconfirmed.map { case (deliveryId, d) ⇒ UnconfirmedDelivery(deliveryId, d.destination, d.message) }(breakOut)) /** * If snapshot from [[#getDeliverySnapshot]] was saved it will be received during recovery * in a [[SnapshotOffer]] message and should be set with this method. */ def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit = { deliverySequenceNr = snapshot.currentDeliveryId val now = System.nanoTime() unconfirmed = snapshot.unconfirmedDeliveries.map(d ⇒ d.deliveryId -> Delivery(d.destination, d.message, now, 0))(breakOut) } /** * INTERNAL API */ override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { redeliverTask.cancel() super.aroundPreRestart(reason, message) } /** * INTERNAL API */ override protected[akka] def aroundPostStop(): Unit = { redeliverTask.cancel() super.aroundPostStop() } /** * INTERNAL API */ override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = message match { case RedeliveryTick ⇒ redeliverOverdue() case _ ⇒ super.aroundReceive(receive, message) } } /** * Java API: Use this class instead of `UntypedPersistentActor` to send messages * with at-least-once delivery semantics to destinations. * Full documentation in [[AtLeastOnceDelivery]]. * * @see [[AtLeastOnceDelivery]] */ abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPersistentActor with AtLeastOnceDelivery { /** * Java API: Send the message created by the `deliveryIdToMessage` function to * the `destination` actor. It will retry sending the message until * the delivery is confirmed with [[#confirmDelivery]]. Correlation * between `deliver` and `confirmDelivery` is performed with the * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` * function. The `deliveryId` is typically passed in the message to the * destination, which replies with a message containing the same `deliveryId`. * * The `deliveryId` is a strictly monotonically increasing sequence number without * gaps. The same sequence is used for all destinations, i.e. when sending to * multiple destinations the destinations will see gaps in the sequence if no * translation is performed. * * During recovery this method will not send out the message, but it will be sent * later if no matching `confirmDelivery` was performed. * * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = super.deliver(destination, id ⇒ deliveryIdToMessage.apply(id)) } /** * Java API compatible with lambda expressions * * Use this class instead of `UntypedPersistentActor` to send messages * with at-least-once delivery semantics to destinations. * Full documentation in [[AtLeastOnceDelivery]]. * * @see [[AtLeastOnceDelivery]] */ abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPersistentActor with AtLeastOnceDelivery { /** * Java API: Send the message created by the `deliveryIdToMessage` function to * the `destination` actor. It will retry sending the message until * the delivery is confirmed with [[#confirmDelivery]]. Correlation * between `deliver` and `confirmDelivery` is performed with the * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` * function. The `deliveryId` is typically passed in the message to the * destination, which replies with a message containing the same `deliveryId`. * * The `deliveryId` is a strictly monotonically increasing sequence number without * gaps. The same sequence is used for all destinations, i.e. when sending to * multiple destinations the destinations will see gaps in the sequence if no * translation is performed. * * During recovery this method will not send out the message, but it will be sent * later if no matching `confirmDelivery` was performed. * * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. */ def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = super.deliver(destination, id ⇒ deliveryIdToMessage.apply(id)) } Other Akka source code examplesHere is a short list of links related to this Akka AtLeastOnceDelivery.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.