|
Akka/Scala example source code file (ProcessorStashSpec.scala)
The ProcessorStashSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence import com.typesafe.config._ import akka.actor._ import akka.testkit._ object ProcessorStashSpec { class StashingProcessor(name: String) extends NamedProcessor(name) { var state: List[String] = Nil val behaviorA: Actor.Receive = { case Persistent("a", snr) ⇒ update("a", snr) context.become(behaviorB) case Persistent("b", snr) ⇒ update("b", snr) case Persistent("c", snr) ⇒ update("c", snr) unstashAll() case "x" ⇒ update("x") case "boom" ⇒ throw new TestException("boom") case Persistent("boom", _) ⇒ throw new TestException("boom") case GetState ⇒ sender() ! state.reverse } val behaviorB: Actor.Receive = { case Persistent("b", _) ⇒ stash() context.become(behaviorA) case "x" ⇒ stash() } def receive = behaviorA def update(payload: String, snr: Long = 0L) { state = s"${payload}-${snr}" :: state } } class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) { override def preRestart(reason: Throwable, message: Option[Any]) = { message match { case Some(m: Persistent) ⇒ if (recoveryRunning) deleteMessage(m.sequenceNr) case _ ⇒ } super.preRestart(reason, message) } } } abstract class ProcessorStashSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { import ProcessorStashSpec._ "A processor" must { "support user stash and unstash operations for persistent messages" in { val p1 = namedProcessor[StashingProcessor] p1 ! Persistent("a") p1 ! Persistent("b") p1 ! Persistent("c") p1 ! GetState expectMsg(List("a-1", "c-3", "b-2")) val p2 = namedProcessor[StashingProcessor] p2 ! Persistent("a") p2 ! Persistent("b") p2 ! Persistent("c") p2 ! GetState expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5")) } "support user stash and unstash operations for persistent and transient messages" in { val p1 = namedProcessor[StashingProcessor] p1 ! Persistent("a") p1 ! "x" p1 ! Persistent("b") p1 ! Persistent("c") p1 ! GetState expectMsg(List("a-1", "c-3", "x-0", "b-2")) val p2 = namedProcessor[StashingProcessor] p2 ! Persistent("a") p2 ! "x" p2 ! Persistent("b") p2 ! Persistent("c") p2 ! GetState expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "x-0", "b-5")) } "support restarts between user stash and unstash operations" in { val p1 = namedProcessor[StashingProcessor] p1 ! Persistent("a") p1 ! Persistent("b") p1 ! "boom" p1 ! Persistent("c") p1 ! GetState expectMsg(List("a-1", "c-3", "b-2")) val p2 = namedProcessor[StashingProcessor] p2 ! Persistent("a") p2 ! Persistent("b") p2 ! "boom" p2 ! Persistent("c") p2 ! GetState expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5")) } "support multiple restarts between user stash and unstash operations" in { val p1 = namedProcessor[RecoveryFailureStashingProcessor] p1 ! Persistent("a") p1 ! Persistent("b") p1 ! Persistent("boom") p1 ! Persistent("c") p1 ! GetState expectMsg(List("a-1", "c-4", "b-2")) val p2 = namedProcessor[RecoveryFailureStashingProcessor] p2 ! Persistent("a") p2 ! Persistent("b") p2 ! Persistent("boom") p2 ! Persistent("c") p2 ! GetState expectMsg(List("a-1", "c-4", "b-2", "a-5", "c-8", "b-6")) } } } class LeveldbProcessorStashSpec extends ProcessorStashSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorStashSpec")) class InmemProcessorStashSpec extends ProcessorStashSpec(PersistenceSpec.config("inmem", "InmemProcessorStashSpec")) Other Akka source code examplesHere is a short list of links related to this Akka ProcessorStashSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.