|
Akka/Scala example source code file (ProcessorSpec.scala)
The ProcessorSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.collection.immutable.Seq
import com.typesafe.config._
import akka.actor._
import akka.testkit._
object ProcessorSpec {
class RecoverTestProcessor(name: String) extends NamedProcessor(name) {
var state = List.empty[String]
def receive = {
case "boom" ⇒ throw new TestException("boom")
case Persistent("boom", _) ⇒ throw new TestException("boom")
case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state
case GetState ⇒ sender() ! state.reverse
}
override def preRestart(reason: Throwable, message: Option[Any]) = {
message match {
case Some(m: Persistent) ⇒ deleteMessage(m.sequenceNr) // delete message from journal
case _ ⇒ // ignore
}
super.preRestart(reason, message)
}
}
class RecoverOffTestProcessor(name: String) extends RecoverTestProcessor(name) with TurnOffRecoverOnStart
class StoredSenderTestProcessor(name: String) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, _) ⇒ sender() ! payload
}
}
class RecoveryStatusTestProcessor(name: String) extends NamedProcessor(name) {
def receive = {
case Persistent("c", _) if !recoveryRunning ⇒ sender() ! "c"
case Persistent(payload, _) if recoveryRunning ⇒ sender() ! payload
}
}
class BehaviorChangeTestProcessor(name: String) extends NamedProcessor(name) {
val acceptA: Actor.Receive = {
case Persistent("a", _) ⇒
sender() ! "a"
context.become(acceptB)
}
val acceptB: Actor.Receive = {
case Persistent("b", _) ⇒
sender() ! "b"
context.become(acceptA)
}
def receive = acceptA
}
class FsmTestProcessor(name: String) extends NamedProcessor(name) with FSM[String, Int] {
startWith("closed", 0)
when("closed") {
case Event(Persistent("a", _), counter) ⇒
goto("open") using (counter + 1) replying (counter)
}
when("open") {
case Event(Persistent("b", _), counter) ⇒
goto("closed") using (counter + 1) replying (counter)
}
}
class OutboundMessageTestProcessor(name: String) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, snr) ⇒ sender() ! Persistent(snr)
}
}
class ResumeTestException extends TestException("test")
class ResumeTestSupervisor(name: String) extends Actor {
val processor = context.actorOf(Props(classOf[ResumeTestProcessor], name))
override val supervisorStrategy =
OneForOneStrategy() {
case _: ResumeTestException ⇒ SupervisorStrategy.Resume
}
def receive = {
case m ⇒ processor forward m
}
}
class ResumeTestProcessor(name: String) extends NamedProcessor(name) {
var state: List[String] = Nil
def receive = {
case "boom" ⇒ throw new ResumeTestException
case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state
case GetState ⇒ sender() ! state.reverse
}
}
class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message match {
case Some(m: Persistent) ⇒ if (recoveryRunning) deleteMessage(m.sequenceNr)
case _ ⇒
}
super.preRestart(reason, message)
}
}
class AnyReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
val failOnReplayedA: Actor.Receive = {
case Persistent("a", _) if recoveryRunning ⇒ throw new TestException("boom")
}
override def receive = failOnReplayedA orElse super.receive
}
final case class Delete1(snr: Long)
final case class DeleteN(toSnr: Long)
class DeleteMessageTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def receive = deleteReceive orElse super.receive
def deleteReceive: Actor.Receive = {
case Delete1(snr) ⇒ deleteMessage(snr)
case DeleteN(toSnr) ⇒ deleteMessages(toSnr)
}
}
}
abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ProcessorSpec._
import JournalProtocol._
override protected def beforeEach() {
super.beforeEach()
val processor = namedProcessor[RecoverTestProcessor]
processor ! Persistent("a")
processor ! Persistent("b")
processor ! GetState
expectMsg(List("a-1", "b-2"))
}
"A processor" must {
"recover state on explicit request" in {
val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Recover()
processor ! GetState
expectMsg(List("a-1", "b-2"))
}
"recover state automatically" in {
val processor = namedProcessor[RecoverTestProcessor]
processor ! GetState
expectMsg(List("a-1", "b-2"))
}
"recover state automatically on restart" in {
val processor = namedProcessor[RecoverTestProcessor]
processor ! "boom"
processor ! GetState
expectMsg(List("a-1", "b-2"))
}
"buffer new messages until recovery completed" in {
val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Persistent("c")
processor ! Recover()
processor ! Persistent("d")
processor ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4"))
}
"ignore redundant recovery requests" in {
val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Persistent("c")
processor ! Recover()
processor ! Persistent("d")
processor ! Recover()
processor ! Persistent("e")
processor ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5"))
}
"buffer new messages until restart-recovery completed" in {
val processor = namedProcessor[RecoverTestProcessor]
processor ! "boom"
processor ! Persistent("c")
processor ! Persistent("d")
processor ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4"))
}
"allow deletion of journaled messages on failure" in {
val processor = namedProcessor[RecoverTestProcessor]
processor ! Persistent("boom") // journaled message causes failure and will be deleted
processor ! GetState
expectMsg(List("a-1", "b-2"))
}
"allow deletion of journaled messages on failure and buffer new messages until restart-recovery completed" in {
val processor = namedProcessor[RecoverTestProcessor]
processor ! Persistent("boom") // journaled message causes failure and will be deleted
processor ! Persistent("c")
processor ! Persistent("d")
processor ! GetState
expectMsg(List("a-1", "b-2", "c-4", "d-5")) // deleted message leaves gap in sequence
}
"store sender references and restore them for replayed messages" in {
namedProcessor[StoredSenderTestProcessor]
List("a", "b") foreach (expectMsg(_))
}
"properly indicate its recovery status" in {
val processor = namedProcessor[RecoveryStatusTestProcessor]
processor ! Persistent("c")
List("a", "b", "c") foreach (expectMsg(_))
}
"continue journaling when changing behavior" in {
val processor = namedProcessor[BehaviorChangeTestProcessor]
processor ! Persistent("a")
processor ! Persistent("b")
List("a", "b", "a", "b") foreach (expectMsg(_))
}
"derive outbound messages from the current message" in {
val processor = namedProcessor[OutboundMessageTestProcessor]
processor ! Persistent("c")
1 to 3 foreach { _ ⇒ expectMsgPF() { case Persistent(payload, snr) ⇒ payload should be(snr) } }
}
"support recovery with upper sequence number bound" in {
val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Recover(toSequenceNr = 1L)
processor ! GetState
expectMsg(List("a-1"))
}
"never replace journaled messages" in {
val processor1 = namedProcessor[RecoverOffTestProcessor]
processor1 ! Recover(toSequenceNr = 1L)
processor1 ! Persistent("c")
processor1 ! GetState
expectMsg(List("a-1", "c-3"))
val processor2 = namedProcessor[RecoverOffTestProcessor]
processor2 ! Recover()
processor2 ! GetState
expectMsg(List("a-1", "b-2", "c-3"))
}
"be able to skip restart recovery when being resumed" in {
val supervisor1 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor"))
supervisor1 ! Persistent("a")
supervisor1 ! Persistent("b")
supervisor1 ! GetState
expectMsg(List("a-1", "b-2"))
val supervisor2 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor"))
supervisor2 ! Persistent("c")
supervisor2 ! "boom"
supervisor2 ! Persistent("d")
supervisor2 ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4"))
val supervisor3 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor"))
supervisor3 ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4"))
}
"be able to re-run restart recovery when it fails with last replayed message" in {
val processor = namedProcessor[LastReplayedMsgFailsTestProcessor]
processor ! Persistent("c")
processor ! Persistent("boom")
processor ! Persistent("d")
processor ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-5"))
}
"be able to re-run initial recovery when it fails with a message that is not the last replayed message" in {
val processor = namedProcessor[AnyReplayedMsgFailsTestProcessor]
processor ! Persistent("c")
processor ! GetState
expectMsg(List("b-2", "c-3"))
}
"be able to re-run restart recovery when it fails with a message that is not the last replayed message" in {
val processor = system.actorOf(Props(classOf[AnyReplayedMsgFailsTestProcessor], "other")) // new processor, no initial replay
processor ! Persistent("b")
processor ! Persistent("a")
processor ! Persistent("c")
processor ! Persistent("d")
processor ! Persistent("e")
processor ! Persistent("f")
processor ! Persistent("g")
processor ! Persistent("h")
processor ! Persistent("i")
processor ! "boom"
processor ! Persistent("j")
processor ! GetState
expectMsg(List("b-1", "c-3", "d-4", "e-5", "f-6", "g-7", "h-8", "i-9", "j-10"))
}
"support batch writes" in {
val processor = namedProcessor[RecoverTestProcessor]
processor ! PersistentBatch(Seq(Persistent("c"), Persistent("d"), Persistent("e")))
processor ! Persistent("f")
processor ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5", "f-6"))
}
"support single message deletions" in {
val deleteProbe = TestProbe()
system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteMessages])
val processor1 = namedProcessor[DeleteMessageTestProcessor]
processor1 ! Persistent("c")
processor1 ! Persistent("d")
processor1 ! Persistent("e")
processor1 ! Delete1(4)
deleteProbe.expectMsgType[DeleteMessages]
val processor2 = namedProcessor[DeleteMessageTestProcessor]
processor2 ! GetState
expectMsg(List("a-1", "b-2", "c-3", "e-5"))
}
"support bulk message deletions" in {
val deleteProbe = TestProbe()
system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteMessagesTo])
val processor1 = namedProcessor[DeleteMessageTestProcessor]
processor1 ! Persistent("c")
processor1 ! Persistent("d")
processor1 ! Persistent("e")
processor1 ! DeleteN(4)
deleteProbe.expectMsgType[DeleteMessagesTo]
val processor2 = namedProcessor[DeleteMessageTestProcessor]
processor2 ! GetState
expectMsg(List("e-5"))
processor2 ! Persistent("f")
processor2 ! Persistent("g")
processor2 ! DeleteN(6)
deleteProbe.expectMsgType[DeleteMessagesTo]
val processor3 = namedProcessor[DeleteMessageTestProcessor]
processor3 ! GetState
expectMsg(List("g-7"))
}
}
"A processor" can {
"be a finite state machine" in {
val processor = namedProcessor[FsmTestProcessor]
processor ! Persistent("a")
processor ! Persistent("b")
List(0, 1, 2, 3) foreach (expectMsg(_))
}
}
}
class LeveldbProcessorSpec extends ProcessorSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorSpec"))
class InmemProcessorSpec extends ProcessorSpec(PersistenceSpec.config("inmem", "InmemProcessorSpec"))
Other Akka source code examplesHere is a short list of links related to this Akka ProcessorSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.