alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (JournalSpec.scala)

This example Akka source code file (JournalSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, collection, confirmation, deletemessages, int, journalspec, list, persistence, readhighestsequencenr, replayedmessage, replaymessages, string, test, testing, testkit, testprobe

The JournalSpec.scala Akka example source code

package akka.persistence.journal

import scala.collection.immutable.Seq

import akka.actor._
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.testkit._

import com.typesafe.config._

object JournalSpec {
  val config = ConfigFactory.parseString(
    """
      |akka.persistence.publish-confirmations = on
      |akka.persistence.publish-plugin-commands = on
    """.stripMargin)

  case class Confirmation(persistenceId: String, channelId: String, sequenceNr: Long) extends PersistentConfirmation
}

/**
 * This spec aims to verify custom akka-persistence Journal implementations.
 * Plugin authors are highly encouraged to include it in their plugin's test suites.
 *
 * In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll`
 * methods (don't forget to call `super` in your overriden methods).
 *
 * For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.journal.JavaJournalSpec]].
 *
 * @see [[akka.persistence.journal.JournalPerfSpec]]
 * @see [[akka.persistence.japi.journal.JavaJournalPerfSpec]]
 */
trait JournalSpec extends PluginSpec {
  import JournalSpec._

  implicit lazy val system: ActorSystem = ActorSystem("JournalSpec", config.withFallback(JournalSpec.config))

  private var senderProbe: TestProbe = _
  private var receiverProbe: TestProbe = _

  override protected def beforeEach(): Unit = {
    super.beforeEach()
    senderProbe = TestProbe()
    receiverProbe = TestProbe()
    writeMessages(1, 5, pid, senderProbe.ref)
  }

  def journal: ActorRef =
    extension.journalFor(null)

  def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage =
    ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, deleted, confirms, senderProbe.ref))

  def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = {
    val msgs = from to to map { i ⇒ PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) }
    val probe = TestProbe()

    journal ! WriteMessages(msgs, probe.ref, actorInstanceId)

    probe.expectMsg(WriteMessagesSuccessful)
    from to to foreach { i ⇒
      probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) ⇒ payload should be(s"a-${i}") }
    }
  }

  "A journal" must {
    "replay all messages" in {
      journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
      1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "replay messages using a lower sequence number bound" in {
      journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
      3 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "replay messages using an upper sequence number bound" in {
      journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref)
      1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "replay messages using a count limit" in {
      journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref)
      1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "replay messages using a lower and upper sequence number bound" in {
      journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref)
      2 to 4 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "replay messages using a lower and upper sequence number bound and a count limit" in {
      journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref)
      2 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "replay a single if lower sequence number bound equals upper sequence number bound" in {
      journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref)
      2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "replay a single message if count limit equals 1" in {
      journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref)
      2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "not replay messages if count limit equals 0" in {
      journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref)
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "not replay messages if lower  sequence number bound is greater than upper sequence number bound" in {
      journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref)
      receiverProbe.expectMsg(ReplayMessagesSuccess)
    }
    "not replay permanently deleted messages (range deletion)" in {
      val cmd = DeleteMessagesTo(pid, 3, true)
      val sub = TestProbe()

      subscribe[DeleteMessagesTo](sub.ref)
      journal ! cmd
      sub.expectMsg(cmd)

      journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
      List(4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
    }
    "replay logically deleted messages with deleted field set to true (range deletion)" in {
      val cmd = DeleteMessagesTo(pid, 3, false)
      val sub = TestProbe()

      subscribe[DeleteMessagesTo](sub.ref)
      journal ! cmd
      sub.expectMsg(cmd)

      journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true)
      1 to 5 foreach { i ⇒
        i match {
          case 1 | 2 | 3 ⇒ receiverProbe.expectMsg(replayedMessage(i, deleted = true))
          case 4 | 5     ⇒ receiverProbe.expectMsg(replayedMessage(i))
        }
      }
    }
    "replay confirmed messages with corresponding channel ids contained in the confirmed field" in {
      val confs = List(Confirmation(pid, "c1", 3), Confirmation(pid, "c2", 3))
      val lpid = pid

      journal ! WriteConfirmations(confs, receiverProbe.ref)
      receiverProbe.expectMsg(WriteConfirmationsSuccess(confs))

      journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true)
      1 to 5 foreach { i ⇒
        i match {
          case 1 | 2 | 4 | 5 ⇒ receiverProbe.expectMsg(replayedMessage(i))
          case 3 ⇒ receiverProbe.expectMsgPF() {
            case ReplayedMessage(PersistentImpl(payload, `i`, `lpid`, false, confirms, _)) ⇒
              confirms should have length (2)
              confirms should contain("c1")
              confirms should contain("c2")
          }
        }
      }
    }
    "ignore orphan deletion markers" in {
      val msgIds = List(PersistentIdImpl(pid, 3), PersistentIdImpl(pid, 4))
      journal ! DeleteMessages(msgIds, true, Some(receiverProbe.ref)) // delete message
      receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds))

      journal ! DeleteMessages(msgIds, false, Some(receiverProbe.ref)) // write orphan marker
      receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds))

      journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
      List(1, 2, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
    }
    "ignore orphan confirmation markers" in {
      val msgIds = List(PersistentIdImpl(pid, 3))
      journal ! DeleteMessages(msgIds, true, Some(receiverProbe.ref)) // delete message
      receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds))

      val confs = List(Confirmation(pid, "c1", 3), Confirmation(pid, "c2", 3))
      journal ! WriteConfirmations(confs, receiverProbe.ref)
      receiverProbe.expectMsg(WriteConfirmationsSuccess(confs))

      journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
      List(1, 2, 4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
    }
    "return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in {
      journal ! ReadHighestSequenceNr(3L, pid, receiverProbe.ref)
      receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5))

      journal ! ReadHighestSequenceNr(5L, pid, receiverProbe.ref)
      receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5))
    }
    "return a highest stored sequence number == 0 if the persistent actor has not yet written messages" in {
      journal ! ReadHighestSequenceNr(0L, "non-existing-pid", receiverProbe.ref)
      receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(0))
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka JournalSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.