|
Akka/Scala example source code file (SnapshotSpec.scala)
The SnapshotSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence import akka.actor._ import akka.testkit._ object SnapshotSpec { case object TakeSnapshot class SaveSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { var state = List.empty[String] def receive = { case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state case TakeSnapshot ⇒ saveSnapshot(state) case SaveSnapshotSuccess(md) ⇒ probe ! md.sequenceNr case GetState ⇒ probe ! state.reverse } } class LoadSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { def receive = { case Persistent(payload, snr) ⇒ probe ! s"${payload}-${snr}" case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) case other ⇒ probe ! other } override def preStart() = () } final case class Delete1(metadata: SnapshotMetadata) final case class DeleteN(criteria: SnapshotSelectionCriteria) class DeleteSnapshotTestProcessor(name: String, probe: ActorRef) extends LoadSnapshotTestProcessor(name, probe) { override def receive = receiveDelete orElse super.receive def receiveDelete: Receive = { case Delete1(metadata) ⇒ deleteSnapshot(metadata.sequenceNr, metadata.timestamp) case DeleteN(criteria) ⇒ deleteSnapshots(criteria) } } } class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotSpec")) with PersistenceSpec with ImplicitSender { import SnapshotSpec._ import SnapshotProtocol._ override protected def beforeEach() { super.beforeEach() val processor = system.actorOf(Props(classOf[SaveSnapshotTestProcessor], name, testActor)) processor ! Persistent("a") processor ! TakeSnapshot processor ! Persistent("b") processor ! TakeSnapshot processor ! Persistent("c") processor ! Persistent("d") processor ! TakeSnapshot processor ! Persistent("e") processor ! Persistent("f") expectMsgAllOf(1L, 2L, 4L) } "A processor" must { "recover state starting from the most recent snapshot" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) val persistenceId = name processor ! Recover() expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } expectMsg("e-5") expectMsg("f-6") expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching an upper sequence number bound" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) val persistenceId = name processor ! Recover(toSequenceNr = 3) expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } expectMsg("c-3") expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) val persistenceId = name processor ! Recover(toSequenceNr = 4) processor ! "done" expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } expectMsg(RecoveryCompleted) expectMsg("done") } "recover state starting from the most recent snapshot matching criteria" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) val persistenceId = name processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } expectMsg("c-3") expectMsg("d-4") expectMsg("e-5") expectMsg("f-6") expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) val persistenceId = name processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } expectMsg("c-3") expectMsg(RecoveryCompleted) } "recover state from scratch if snapshot based recovery is disabled" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) processor ! Recover(fromSnapshot = SnapshotSelectionCriteria.None, toSequenceNr = 3) expectMsg("a-1") expectMsg("b-2") expectMsg("c-3") expectMsg(RecoveryCompleted) } "support single message deletions" in { val deleteProbe = TestProbe() val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) val persistenceId = name system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshot]) // recover processor from 3rd snapshot and then delete snapshot processor1 ! Recover(toSequenceNr = 4) processor1 ! "done" val metadata = expectMsgPF() { case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) md } expectMsg(RecoveryCompleted) expectMsg("done") processor1 ! Delete1(metadata) deleteProbe.expectMsgType[DeleteSnapshot] // recover processor from 2nd snapshot (3rd was deleted) plus replayed messages val processor2 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) processor2 ! Recover(toSequenceNr = 4) expectMsgPF() { case (md @ SnapshotMetadata(`persistenceId`, 2, _), state) ⇒ state should be(List("a-1", "b-2").reverse) md } expectMsg("c-3") expectMsg("d-4") expectMsg(RecoveryCompleted) } "support bulk message deletions" in { val deleteProbe = TestProbe() val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) val persistenceId = name system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshots]) // recover processor and the delete first three (= all) snapshots processor1 ! Recover(toSequenceNr = 4) processor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4)) expectMsgPF() { case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) } expectMsg(RecoveryCompleted) deleteProbe.expectMsgType[DeleteSnapshots] // recover processor from replayed messages (all snapshots deleted) val processor2 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) processor2 ! Recover(toSequenceNr = 4) expectMsg("a-1") expectMsg("b-2") expectMsg("c-3") expectMsg("d-4") expectMsg(RecoveryCompleted) } } } Other Akka source code examplesHere is a short list of links related to this Akka SnapshotSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.