|
Akka/Scala example source code file (AtLeastOnceDeliveryFailureSpec.scala)
The AtLeastOnceDeliveryFailureSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.language.postfixOps import com.typesafe.config.ConfigFactory import akka.actor._ import akka.testkit._ object AtLeastOnceDeliveryFailureSpec { val config = ConfigFactory.parseString( s""" akka.persistence.sender.chaos.live-processing-failure-rate = 0.3 akka.persistence.sender.chaos.replay-processing-failure-rate = 0.1 akka.persistence.destination.chaos.confirm-failure-rate = 0.3 akka.persistence.journal.plugin = "akka.persistence.journal.chaos" akka.persistence.journal.chaos.write-failure-rate = 0.3 akka.persistence.journal.chaos.confirm-failure-rate = 0.2 akka.persistence.journal.chaos.delete-failure-rate = 0.3 akka.persistence.journal.chaos.replay-failure-rate = 0.25 akka.persistence.journal.chaos.read-highest-failure-rate = 0.1 akka.persistence.journal.chaos.class = akka.persistence.journal.chaos.ChaosJournal akka.persistence.snapshot-store.local.dir = "target/snapshots-at-least-once-delivery-failure-spec/" """) val numMessages = 10 case object Start case class Done(ints: Vector[Int]) case class ProcessingFailure(i: Int) case class JournalingFailure(i: Int) case class Msg(deliveryId: Long, i: Int) case class Confirm(deliveryId: Long, i: Int) sealed trait Evt case class MsgSent(i: Int) extends Evt case class MsgConfirmed(deliveryId: Long, i: Int) extends Evt trait ChaosSupport { this: Actor ⇒ def random = ThreadLocalRandom.current def probe: ActorRef var state = Vector.empty[Int] def contains(i: Int): Boolean = state.contains(i) def add(i: Int): Unit = { state :+= i if (state.length == numMessages) probe ! Done(state) } def shouldFail(rate: Double) = random.nextDouble() < rate } class ChaosSender(destination: ActorRef, val probe: ActorRef) extends PersistentActor with ChaosSupport with ActorLogging with AtLeastOnceDelivery { val config = context.system.settings.config.getConfig("akka.persistence.sender.chaos") val liveProcessingFailureRate = config.getDouble("live-processing-failure-rate") val replayProcessingFailureRate = config.getDouble("replay-processing-failure-rate") override def redeliverInterval = 500.milliseconds override def processorId = "chaosSender" def receiveCommand: Receive = { case i: Int ⇒ val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate if (contains(i)) { log.debug(debugMessage(s"ignored duplicate ${i}")) } else { persist(MsgSent(i)) { evt ⇒ updateState(evt) if (shouldFail(failureRate)) throw new TestException(debugMessage(s"failed at payload ${i}")) else log.debug(debugMessage(s"processed payload ${i}")) } } case Confirm(deliveryId, i) ⇒ persist(MsgConfirmed(deliveryId, i))(updateState) case PersistenceFailure(MsgSent(i), _, _) ⇒ // inform sender about journaling failure so that it can resend sender() ! JournalingFailure(i) case PersistenceFailure(MsgConfirmed(_, i), _, _) ⇒ // ok, will be redelivered } def receiveRecover: Receive = { case evt: Evt ⇒ updateState(evt) case RecoveryFailure(_) ⇒ // journal failed during recovery, throw exception to re-recover processor throw new TestException(debugMessage("recovery failed")) } def updateState(evt: Evt): Unit = evt match { case MsgSent(i) ⇒ add(i) deliver(destination.path, deliveryId ⇒ Msg(deliveryId, i)) case MsgConfirmed(deliveryId, i) ⇒ confirmDelivery(deliveryId) } private def debugMessage(msg: String): String = s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})" } class ChaosDestination(val probe: ActorRef) extends Actor with ChaosSupport with ActorLogging { val config = context.system.settings.config.getConfig("akka.persistence.destination.chaos") val confirmFailureRate = config.getDouble("confirm-failure-rate") def receive = { case m @ Msg(deliveryId, i) ⇒ if (shouldFail(confirmFailureRate)) { log.error(debugMessage("confirm message failed", m)) } else if (contains(i)) { log.debug(debugMessage("ignored duplicate", m)) sender() ! Confirm(deliveryId, i) } else { add(i) sender() ! Confirm(deliveryId, i) log.debug(debugMessage("received and confirmed message", m)) } } private def debugMessage(msg: String, m: Msg): String = s"[destination] ${msg} (message = $m)" } class ChaosApp(probe: ActorRef) extends Actor with ActorLogging { val destination = context.actorOf(Props(classOf[ChaosDestination], probe), "destination") val snd = context.actorOf(Props(classOf[ChaosSender], destination, probe), "sender") def receive = { case Start ⇒ 1 to numMessages foreach (snd ! _) case ProcessingFailure(i) ⇒ snd ! i log.debug(s"resent ${i} after processing failure") case JournalingFailure(i) ⇒ snd ! i log.debug(s"resent ${i} after journaling failure") } } } class AtLeastOnceDeliveryFailureSpec extends AkkaSpec(AtLeastOnceDeliveryFailureSpec.config) with Cleanup with ImplicitSender { import AtLeastOnceDeliveryFailureSpec._ "AtLeastOnceDelivery" must { "tolerate and recover from random failures" in { system.actorOf(Props(classOf[ChaosApp], testActor), "chaosApp") ! Start expectDone() // by sender expectDone() // by destination system.actorOf(Props(classOf[ChaosApp], testActor), "chaosApp2") // recovery of new instance should have same outcome expectDone() // by sender // destination doesn't receive messages again because all have been confirmed already } } def expectDone() = within(numMessages.seconds) { expectMsgType[Done].ints.sorted should be(1 to numMessages toVector) } } Other Akka source code examplesHere is a short list of links related to this Akka AtLeastOnceDeliveryFailureSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.