alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ActorWithBoundedStashSpec.scala)

This example Akka source code file (ActorWithBoundedStashSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, boundeddequebasedmailbox, concurrent, config, deadletters, dispatch, stash, stashoverflow, stashoverflowexception, string, test, testing, testkit, unit

The ActorWithBoundedStashSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.actor

import language.postfixOps

import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.BoundedDequeBasedMailbox
import akka.pattern.ask
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorSystem.Settings
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.Assertions.intercept
import org.scalatest.BeforeAndAfterEach

object ActorWithBoundedStashSpec {

  class StashingActor extends Actor with Stash {
    def receive = {
      case msg: String if msg.startsWith("hello") ⇒
        stash()
        sender() ! "ok"

      case "world" ⇒
        context.become(afterWorldBehaviour)
        unstashAll()

    }

    def afterWorldBehaviour: Receive = {
      case _ ⇒ stash()
    }
  }

  class StashingActorWithOverflow extends Actor with Stash {
    var numStashed = 0

    def receive = {
      case msg: String if msg.startsWith("hello") ⇒
        numStashed += 1
        try { stash(); sender() ! "ok" } catch {
          case _: StashOverflowException ⇒
            if (numStashed == 21) {
              sender() ! "STASHOVERFLOW"
              context stop self
            } else {
              sender() ! "Unexpected StashOverflowException: " + numStashed
            }
        }
    }
  }

  // bounded deque-based mailbox with capacity 10
  class Bounded10(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 500 millis)

  class Bounded100(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(100, 500 millis)

  val dispatcherId1 = "my-dispatcher-1"
  val dispatcherId2 = "my-dispatcher-2"
  val mailboxId1 = "my-mailbox-1"
  val mailboxId2 = "my-mailbox-2"

  val testConf: Config = ConfigFactory.parseString(s"""
    $dispatcherId1 {
      mailbox-type = "${classOf[Bounded10].getName}"
      stash-capacity = 20
    }
    $dispatcherId2 {
      mailbox-type = "${classOf[Bounded100].getName}"
      stash-capacity = 20
    }
    $mailboxId1 {
      mailbox-type = "${classOf[Bounded10].getName}"
      stash-capacity = 20
    }
    $mailboxId2 {
      mailbox-type = "${classOf[Bounded100].getName}"
      stash-capacity = 20
    }
    """)
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with BeforeAndAfterEach with DefaultTimeout with ImplicitSender {
  import ActorWithBoundedStashSpec._

  override def atStartup: Unit = {
    system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello.*")))
  }

  override def beforeEach(): Unit =
    system.eventStream.subscribe(testActor, classOf[DeadLetter])

  override def afterEach(): Unit =
    system.eventStream.unsubscribe(testActor, classOf[DeadLetter])

  def testDeadLetters(stasher: ActorRef): Unit = {
    // fill up stash
    for (n ← 1 to 11) {
      stasher ! "hello" + n
      expectMsg("ok")
    }

    // cause unstashAll with capacity violation
    stasher ! "world"
    expectMsg(DeadLetter("hello1", testActor, stasher))

    stasher ! PoisonPill
    // stashed messages are sent to deadletters when stasher is stopped
    for (n ← 2 to 11) expectMsg(DeadLetter("hello" + n, testActor, stasher))
  }

  def testStashOverflowException(stasher: ActorRef): Unit = {
    // fill up stash
    for (n ← 1 to 20) {
      stasher ! "hello" + n
      expectMsg("ok")
    }

    stasher ! "hello21"
    expectMsg("STASHOVERFLOW")

    // stashed messages are sent to deadletters when stasher is stopped,
    for (n ← 1 to 20) expectMsg(DeadLetter("hello" + n, testActor, stasher))
  }

  "An Actor with Stash" must {

    "end up in DeadLetters in case of a capacity violation when configured via dispatcher" in {
      val stasher = system.actorOf(Props[StashingActor].withDispatcher(dispatcherId1))
      testDeadLetters(stasher)
    }

    "end up in DeadLetters in case of a capacity violation when configured via mailbox" in {
      val stasher = system.actorOf(Props[StashingActor].withMailbox(mailboxId1))
      testDeadLetters(stasher)
    }

    "throw a StashOverflowException in case of a stash capacity violation when configured via dispatcher" in {
      val stasher = system.actorOf(Props[StashingActorWithOverflow].withDispatcher(dispatcherId2))
      testStashOverflowException(stasher)
    }

    "throw a StashOverflowException in case of a stash capacity violation when configured via mailbox" in {
      val stasher = system.actorOf(Props[StashingActorWithOverflow].withMailbox(mailboxId2))
      testStashOverflowException(stasher)
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ActorWithBoundedStashSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.