|
Akka/Scala example source code file (SnapshotStoreSpec.scala)
The SnapshotStoreSpec.scala Akka example source code
package akka.persistence.snapshot
import scala.collection.immutable.Seq
import akka.actor._
import akka.persistence._
import akka.persistence.SnapshotProtocol._
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
object SnapshotStoreSpec {
val config = ConfigFactory.parseString("akka.persistence.publish-plugin-commands = on")
}
/**
* This spec aims to verify custom akka-persistence [[SnapshotStore]] implementations.
* Plugin authors are highly encouraged to include it in their plugin's test suites.
*
* In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll`
* methods (don't forget to call `super` in your overriden methods).
*
* For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]].
*
* @see [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]]
*/
trait SnapshotStoreSpec extends PluginSpec {
implicit lazy val system = ActorSystem("SnapshotStoreSpec", config.withFallback(SnapshotStoreSpec.config))
private var senderProbe: TestProbe = _
private var metadata: Seq[SnapshotMetadata] = Nil
override protected def beforeEach(): Unit = {
super.beforeEach()
senderProbe = TestProbe()
metadata = writeSnapshots()
}
def snapshotStore: ActorRef =
extension.snapshotStoreFor(null)
def writeSnapshots(): Seq[SnapshotMetadata] = {
1 to 5 map { i ⇒
val metadata = SnapshotMetadata(pid, i + 10)
snapshotStore.tell(SaveSnapshot(metadata, s"s-${i}"), senderProbe.ref)
senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) ⇒ md }
}
}
"A snapshot store" must {
"not load a snapshot given an invalid processor id" in {
snapshotStore.tell(LoadSnapshot("invalid", SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
}
"not load a snapshot given non-matching timestamp criteria" in {
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest.copy(maxTimestamp = 100), Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
}
"not load a snapshot given non-matching sequence number criteria" in {
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(7), Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, 7), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(None, 7))
}
"load the most recent snapshot" in {
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(4), s"s-5")), Long.MaxValue))
}
"load the most recent snapshot matching an upper sequence number bound" in {
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(13), Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), Long.MaxValue))
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, 13), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), 13))
}
"load the most recent snapshot matching upper sequence number and timestamp bounds" in {
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(13, metadata(2).timestamp), Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), Long.MaxValue))
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest.copy(maxTimestamp = metadata(2).timestamp), 13), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), 13))
}
"delete a single snapshot identified by snapshot metadata" in {
val md = metadata(2)
val cmd = DeleteSnapshot(md)
val sub = TestProbe()
subscribe[DeleteSnapshot](sub.ref)
snapshotStore ! cmd
sub.expectMsg(cmd)
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp), Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(1), s"s-2")), Long.MaxValue))
}
"delete all snapshots matching upper sequence number and timestamp bounds" in {
val md = metadata(2)
val cmd = DeleteSnapshots(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp))
val sub = TestProbe()
subscribe[DeleteSnapshots](sub.ref)
snapshotStore ! cmd
sub.expectMsg(cmd)
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp), Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(metadata(3).sequenceNr, metadata(3).timestamp), Long.MaxValue), senderProbe.ref)
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(3), s"s-4")), Long.MaxValue))
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka SnapshotStoreSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.