alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (AtLeastOnceDeliverySpec.scala)

This example Akka source code file (AtLeastOnceDeliverySpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

a, action, actorref, akka, concurrent, duration, evt, int, map, props, req, string, test, testing, testprobe, time

The AtLeastOnceDeliverySpec.scala Akka example source code

/**
 * Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.persistence

import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import com.typesafe.config._
import akka.actor._
import akka.testkit._
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning

object AtLeastOnceDeliverySpec {

  case class Req(payload: String)
  case object ReqAck
  case object InvalidReq

  sealed trait Evt
  case class AcceptedReq(payload: String, destination: ActorPath) extends Evt
  case class ReqDone(id: Long) extends Evt

  case class Action(id: Long, payload: String)
  case class ActionAck(id: Long)
  case object Boom
  case object SaveSnap
  case class Snap(deliverySnapshot: AtLeastOnceDeliverySnapshot) // typically includes some user data as well

  def senderProps(testActor: ActorRef, name: String,
                  redeliverInterval: FiniteDuration, warnAfterNumberOfUnconfirmedAttempts: Int,
                  async: Boolean, destinations: Map[String, ActorPath]): Props =
    Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts, async, destinations))

  class Sender(testActor: ActorRef,
               name: String,
               override val redeliverInterval: FiniteDuration,
               override val warnAfterNumberOfUnconfirmedAttempts: Int,
               async: Boolean,
               destinations: Map[String, ActorPath])
    extends PersistentActor with AtLeastOnceDelivery with ActorLogging {

    override def persistenceId: String = name

    def updateState(evt: Evt): Unit = evt match {
      case AcceptedReq(payload, destination) ⇒
        deliver(destination, deliveryId ⇒ Action(deliveryId, payload))
      case ReqDone(id) ⇒
        confirmDelivery(id)
    }

    val receiveCommand: Receive = {
      case Req(payload) ⇒
        if (payload.isEmpty)
          sender() ! InvalidReq
        else {
          val destination = destinations(payload.take(1).toUpperCase)
          if (async)
            persistAsync(AcceptedReq(payload, destination)) { evt ⇒
              updateState(evt)
              sender() ! ReqAck
            }
          else
            persist(AcceptedReq(payload, destination)) { evt ⇒
              updateState(evt)
              sender() ! ReqAck
            }
        }

      case ActionAck(id) ⇒
        log.debug("Sender got ack {}", id)
        if (confirmDelivery(id))
          if (async)
            persistAsync(ReqDone(id)) { evt ⇒ updateState(evt) }
          else
            persist(ReqDone(id)) { evt ⇒ updateState(evt) }

      case Boom ⇒
        throw new RuntimeException("boom") with NoStackTrace

      case SaveSnap ⇒
        saveSnapshot(Snap(getDeliverySnapshot))

      case w: UnconfirmedWarning ⇒
        testActor ! w

    }

    def receiveRecover: Receive = {
      case evt: Evt ⇒ updateState(evt)
      case SnapshotOffer(_, Snap(deliverySnapshot)) ⇒
        setDeliverySnapshot(deliverySnapshot)

    }
  }

  def destinationProps(testActor: ActorRef): Props =
    Props(new Destination(testActor))

  class Destination(testActor: ActorRef) extends Actor with ActorLogging {

    var allReceived = Set.empty[Long]

    def receive = {
      case a @ Action(id, payload) ⇒
        // discard duplicates (naive impl)
        if (!allReceived.contains(id)) {
          log.debug("Destination got {}, all count {}", a, allReceived.size + 1)
          testActor ! a
          allReceived += id
        }
        sender() ! ActionAck(id)
    }
  }

  def unreliableProps(dropMod: Int, target: ActorRef): Props =
    Props(new Unreliable(dropMod, target))

  class Unreliable(dropMod: Int, target: ActorRef) extends Actor with ActorLogging {
    var count = 0
    def receive = {
      case msg ⇒
        count += 1
        if (count % dropMod != 0) {
          log.debug("Pass msg {} count {}", msg, count)
          target forward msg
        } else {
          log.debug("Drop msg {} count {}", msg, count)
        }
    }
  }

}

abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
  import AtLeastOnceDeliverySpec._

  "AtLeastOnceDelivery" must {
    "deliver messages in order when nothing is lost" in {
      val probeA = TestProbe()
      val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path)
      val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
      snd ! Req("a")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(1, "a"))
      probeA.expectNoMsg(1.second)
    }

    "re-deliver lost messages" in {
      val probeA = TestProbe()
      val dst = system.actorOf(destinationProps(probeA.ref))
      val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
      val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
      snd ! Req("a-1")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(1, "a-1"))

      snd ! Req("a-2")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(2, "a-2"))

      snd ! Req("a-3")
      snd ! Req("a-4")
      expectMsg(ReqAck)
      expectMsg(ReqAck)
      // a-3 was lost
      probeA.expectMsg(Action(4, "a-4"))
      // and then re-delivered
      probeA.expectMsg(Action(3, "a-3"))
      probeA.expectNoMsg(1.second)
    }

    "re-deliver lost messages after restart" in {
      val probeA = TestProbe()
      val dst = system.actorOf(destinationProps(probeA.ref))
      val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
      val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
      snd ! Req("a-1")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(1, "a-1"))

      snd ! Req("a-2")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(2, "a-2"))

      snd ! Req("a-3")
      snd ! Req("a-4")
      expectMsg(ReqAck)
      expectMsg(ReqAck)
      // a-3 was lost
      probeA.expectMsg(Action(4, "a-4"))

      // trigger restart
      snd ! Boom

      // and then re-delivered
      probeA.expectMsg(Action(3, "a-3"))

      snd ! Req("a-5")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(5, "a-5"))

      probeA.expectNoMsg(1.second)
    }

    "restore state from snapshot" in {
      val probeA = TestProbe()
      val dst = system.actorOf(destinationProps(probeA.ref))
      val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
      val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
      snd ! Req("a-1")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(1, "a-1"))

      snd ! Req("a-2")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(2, "a-2"))

      snd ! Req("a-3")
      snd ! Req("a-4")
      snd ! SaveSnap
      expectMsg(ReqAck)
      expectMsg(ReqAck)
      // a-3 was lost
      probeA.expectMsg(Action(4, "a-4"))

      // trigger restart
      snd ! Boom

      // and then re-delivered
      probeA.expectMsg(Action(3, "a-3"))

      snd ! Req("a-5")
      expectMsg(ReqAck)
      probeA.expectMsg(Action(5, "a-5"))

      probeA.expectNoMsg(1.second)
    }

    "warn about unconfirmed messages" in {
      val probeA = TestProbe()
      val probeB = TestProbe()
      val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path)
      val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, async = false, destinations), name)
      snd ! Req("a-1")
      snd ! Req("b-1")
      snd ! Req("b-2")
      expectMsg(ReqAck)
      expectMsg(ReqAck)
      expectMsg(ReqAck)
      val unconfirmed = receiveWhile(3.seconds) {
        case UnconfirmedWarning(unconfirmed) ⇒ unconfirmed
      }.flatten
      unconfirmed.map(_.destination).toSet should be(Set(probeA.ref.path, probeB.ref.path))
      unconfirmed.map(_.message).toSet should be(Set(Action(1, "a-1"), Action(2, "b-1"), Action(3, "b-2")))
      system.stop(snd)
    }

    "re-deliver many lost messages" in {
      val probeA = TestProbe()
      val probeB = TestProbe()
      val probeC = TestProbe()
      val dstA = system.actorOf(destinationProps(probeA.ref), "destination-a")
      val dstB = system.actorOf(destinationProps(probeB.ref), "destination-b")
      val dstC = system.actorOf(destinationProps(probeC.ref), "destination-c")
      val destinations = Map(
        "A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path,
        "B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path,
        "C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path)
      val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = true, destinations), name)
      val N = 100
      for (n ← 1 to N) {
        snd ! Req("a-" + n)
      }
      for (n ← 1 to N) {
        snd ! Req("b-" + n)
      }
      for (n ← 1 to N) {
        snd ! Req("c-" + n)
      }
      val deliverWithin = 20.seconds
      probeA.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "a-" + n).toSet)
      probeB.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "b-" + n).toSet)
      probeC.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "c-" + n).toSet)
    }

  }
}

class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec"))

Other Akka source code examples

Here is a short list of links related to this Akka AtLeastOnceDeliverySpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.