|
Akka/Scala example source code file (AtLeastOnceDeliverySpec.scala)
The AtLeastOnceDeliverySpec.scala Akka example source code
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import com.typesafe.config._
import akka.actor._
import akka.testkit._
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning
object AtLeastOnceDeliverySpec {
case class Req(payload: String)
case object ReqAck
case object InvalidReq
sealed trait Evt
case class AcceptedReq(payload: String, destination: ActorPath) extends Evt
case class ReqDone(id: Long) extends Evt
case class Action(id: Long, payload: String)
case class ActionAck(id: Long)
case object Boom
case object SaveSnap
case class Snap(deliverySnapshot: AtLeastOnceDeliverySnapshot) // typically includes some user data as well
def senderProps(testActor: ActorRef, name: String,
redeliverInterval: FiniteDuration, warnAfterNumberOfUnconfirmedAttempts: Int,
async: Boolean, destinations: Map[String, ActorPath]): Props =
Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts, async, destinations))
class Sender(testActor: ActorRef,
name: String,
override val redeliverInterval: FiniteDuration,
override val warnAfterNumberOfUnconfirmedAttempts: Int,
async: Boolean,
destinations: Map[String, ActorPath])
extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
override def persistenceId: String = name
def updateState(evt: Evt): Unit = evt match {
case AcceptedReq(payload, destination) ⇒
deliver(destination, deliveryId ⇒ Action(deliveryId, payload))
case ReqDone(id) ⇒
confirmDelivery(id)
}
val receiveCommand: Receive = {
case Req(payload) ⇒
if (payload.isEmpty)
sender() ! InvalidReq
else {
val destination = destinations(payload.take(1).toUpperCase)
if (async)
persistAsync(AcceptedReq(payload, destination)) { evt ⇒
updateState(evt)
sender() ! ReqAck
}
else
persist(AcceptedReq(payload, destination)) { evt ⇒
updateState(evt)
sender() ! ReqAck
}
}
case ActionAck(id) ⇒
log.debug("Sender got ack {}", id)
if (confirmDelivery(id))
if (async)
persistAsync(ReqDone(id)) { evt ⇒ updateState(evt) }
else
persist(ReqDone(id)) { evt ⇒ updateState(evt) }
case Boom ⇒
throw new RuntimeException("boom") with NoStackTrace
case SaveSnap ⇒
saveSnapshot(Snap(getDeliverySnapshot))
case w: UnconfirmedWarning ⇒
testActor ! w
}
def receiveRecover: Receive = {
case evt: Evt ⇒ updateState(evt)
case SnapshotOffer(_, Snap(deliverySnapshot)) ⇒
setDeliverySnapshot(deliverySnapshot)
}
}
def destinationProps(testActor: ActorRef): Props =
Props(new Destination(testActor))
class Destination(testActor: ActorRef) extends Actor with ActorLogging {
var allReceived = Set.empty[Long]
def receive = {
case a @ Action(id, payload) ⇒
// discard duplicates (naive impl)
if (!allReceived.contains(id)) {
log.debug("Destination got {}, all count {}", a, allReceived.size + 1)
testActor ! a
allReceived += id
}
sender() ! ActionAck(id)
}
}
def unreliableProps(dropMod: Int, target: ActorRef): Props =
Props(new Unreliable(dropMod, target))
class Unreliable(dropMod: Int, target: ActorRef) extends Actor with ActorLogging {
var count = 0
def receive = {
case msg ⇒
count += 1
if (count % dropMod != 0) {
log.debug("Pass msg {} count {}", msg, count)
target forward msg
} else {
log.debug("Drop msg {} count {}", msg, count)
}
}
}
}
abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import AtLeastOnceDeliverySpec._
"AtLeastOnceDelivery" must {
"deliver messages in order when nothing is lost" in {
val probeA = TestProbe()
val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a"))
probeA.expectNoMsg(1.second)
}
"re-deliver lost messages" in {
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
expectMsg(ReqAck)
expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
probeA.expectNoMsg(1.second)
}
"re-deliver lost messages after restart" in {
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
expectMsg(ReqAck)
expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// trigger restart
snd ! Boom
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
snd ! Req("a-5")
expectMsg(ReqAck)
probeA.expectMsg(Action(5, "a-5"))
probeA.expectNoMsg(1.second)
}
"restore state from snapshot" in {
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
snd ! SaveSnap
expectMsg(ReqAck)
expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// trigger restart
snd ! Boom
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
snd ! Req("a-5")
expectMsg(ReqAck)
probeA.expectMsg(Action(5, "a-5"))
probeA.expectNoMsg(1.second)
}
"warn about unconfirmed messages" in {
val probeA = TestProbe()
val probeB = TestProbe()
val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, async = false, destinations), name)
snd ! Req("a-1")
snd ! Req("b-1")
snd ! Req("b-2")
expectMsg(ReqAck)
expectMsg(ReqAck)
expectMsg(ReqAck)
val unconfirmed = receiveWhile(3.seconds) {
case UnconfirmedWarning(unconfirmed) ⇒ unconfirmed
}.flatten
unconfirmed.map(_.destination).toSet should be(Set(probeA.ref.path, probeB.ref.path))
unconfirmed.map(_.message).toSet should be(Set(Action(1, "a-1"), Action(2, "b-1"), Action(3, "b-2")))
system.stop(snd)
}
"re-deliver many lost messages" in {
val probeA = TestProbe()
val probeB = TestProbe()
val probeC = TestProbe()
val dstA = system.actorOf(destinationProps(probeA.ref), "destination-a")
val dstB = system.actorOf(destinationProps(probeB.ref), "destination-b")
val dstC = system.actorOf(destinationProps(probeC.ref), "destination-c")
val destinations = Map(
"A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path,
"B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path,
"C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path)
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = true, destinations), name)
val N = 100
for (n ← 1 to N) {
snd ! Req("a-" + n)
}
for (n ← 1 to N) {
snd ! Req("b-" + n)
}
for (n ← 1 to N) {
snd ! Req("c-" + n)
}
val deliverWithin = 20.seconds
probeA.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "a-" + n).toSet)
probeB.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "b-" + n).toSet)
probeC.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "c-" + n).toSet)
}
}
}
class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec"))
Other Akka source code examplesHere is a short list of links related to this Akka AtLeastOnceDeliverySpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.