|
Akka/Scala example source code file (JournalSpec.scala)
The JournalSpec.scala Akka example source codepackage akka.persistence.journal import scala.collection.immutable.Seq import akka.actor._ import akka.persistence._ import akka.persistence.JournalProtocol._ import akka.testkit._ import com.typesafe.config._ object JournalSpec { val config = ConfigFactory.parseString( """ |akka.persistence.publish-confirmations = on |akka.persistence.publish-plugin-commands = on """.stripMargin) case class Confirmation(persistenceId: String, channelId: String, sequenceNr: Long) extends PersistentConfirmation } /** * This spec aims to verify custom akka-persistence Journal implementations. * Plugin authors are highly encouraged to include it in their plugin's test suites. * * In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll` * methods (don't forget to call `super` in your overriden methods). * * For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.journal.JavaJournalSpec]]. * * @see [[akka.persistence.journal.JournalPerfSpec]] * @see [[akka.persistence.japi.journal.JavaJournalPerfSpec]] */ trait JournalSpec extends PluginSpec { import JournalSpec._ implicit lazy val system: ActorSystem = ActorSystem("JournalSpec", config.withFallback(JournalSpec.config)) private var senderProbe: TestProbe = _ private var receiverProbe: TestProbe = _ override protected def beforeEach(): Unit = { super.beforeEach() senderProbe = TestProbe() receiverProbe = TestProbe() writeMessages(1, 5, pid, senderProbe.ref) } def journal: ActorRef = extension.journalFor(null) def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage = ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, deleted, confirms, senderProbe.ref)) def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = { val msgs = from to to map { i ⇒ PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) } val probe = TestProbe() journal ! WriteMessages(msgs, probe.ref, actorInstanceId) probe.expectMsg(WriteMessagesSuccessful) from to to foreach { i ⇒ probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) ⇒ payload should be(s"a-${i}") } } } "A journal" must { "replay all messages" in { journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) 1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } receiverProbe.expectMsg(ReplayMessagesSuccess) } "replay messages using a lower sequence number bound" in { journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) 3 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } receiverProbe.expectMsg(ReplayMessagesSuccess) } "replay messages using an upper sequence number bound" in { journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref) 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } receiverProbe.expectMsg(ReplayMessagesSuccess) } "replay messages using a count limit" in { journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref) 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } receiverProbe.expectMsg(ReplayMessagesSuccess) } "replay messages using a lower and upper sequence number bound" in { journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref) 2 to 4 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } receiverProbe.expectMsg(ReplayMessagesSuccess) } "replay messages using a lower and upper sequence number bound and a count limit" in { journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref) 2 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } receiverProbe.expectMsg(ReplayMessagesSuccess) } "replay a single if lower sequence number bound equals upper sequence number bound" in { journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref) 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } receiverProbe.expectMsg(ReplayMessagesSuccess) } "replay a single message if count limit equals 1" in { journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref) 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } receiverProbe.expectMsg(ReplayMessagesSuccess) } "not replay messages if count limit equals 0" in { journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref) receiverProbe.expectMsg(ReplayMessagesSuccess) } "not replay messages if lower sequence number bound is greater than upper sequence number bound" in { journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref) receiverProbe.expectMsg(ReplayMessagesSuccess) } "not replay permanently deleted messages (range deletion)" in { val cmd = DeleteMessagesTo(pid, 3, true) val sub = TestProbe() subscribe[DeleteMessagesTo](sub.ref) journal ! cmd sub.expectMsg(cmd) journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) List(4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } } "replay logically deleted messages with deleted field set to true (range deletion)" in { val cmd = DeleteMessagesTo(pid, 3, false) val sub = TestProbe() subscribe[DeleteMessagesTo](sub.ref) journal ! cmd sub.expectMsg(cmd) journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true) 1 to 5 foreach { i ⇒ i match { case 1 | 2 | 3 ⇒ receiverProbe.expectMsg(replayedMessage(i, deleted = true)) case 4 | 5 ⇒ receiverProbe.expectMsg(replayedMessage(i)) } } } "replay confirmed messages with corresponding channel ids contained in the confirmed field" in { val confs = List(Confirmation(pid, "c1", 3), Confirmation(pid, "c2", 3)) val lpid = pid journal ! WriteConfirmations(confs, receiverProbe.ref) receiverProbe.expectMsg(WriteConfirmationsSuccess(confs)) journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true) 1 to 5 foreach { i ⇒ i match { case 1 | 2 | 4 | 5 ⇒ receiverProbe.expectMsg(replayedMessage(i)) case 3 ⇒ receiverProbe.expectMsgPF() { case ReplayedMessage(PersistentImpl(payload, `i`, `lpid`, false, confirms, _)) ⇒ confirms should have length (2) confirms should contain("c1") confirms should contain("c2") } } } } "ignore orphan deletion markers" in { val msgIds = List(PersistentIdImpl(pid, 3), PersistentIdImpl(pid, 4)) journal ! DeleteMessages(msgIds, true, Some(receiverProbe.ref)) // delete message receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds)) journal ! DeleteMessages(msgIds, false, Some(receiverProbe.ref)) // write orphan marker receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds)) journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) List(1, 2, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } } "ignore orphan confirmation markers" in { val msgIds = List(PersistentIdImpl(pid, 3)) journal ! DeleteMessages(msgIds, true, Some(receiverProbe.ref)) // delete message receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds)) val confs = List(Confirmation(pid, "c1", 3), Confirmation(pid, "c2", 3)) journal ! WriteConfirmations(confs, receiverProbe.ref) receiverProbe.expectMsg(WriteConfirmationsSuccess(confs)) journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) List(1, 2, 4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } } "return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in { journal ! ReadHighestSequenceNr(3L, pid, receiverProbe.ref) receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5)) journal ! ReadHighestSequenceNr(5L, pid, receiverProbe.ref) receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5)) } "return a highest stored sequence number == 0 if the persistent actor has not yet written messages" in { journal ! ReadHighestSequenceNr(0L, "non-existing-pid", receiverProbe.ref) receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(0)) } } } Other Akka source code examplesHere is a short list of links related to this Akka JournalSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.