|
Akka/Scala example source code file (PersistentViewSpec.scala)
The PersistentViewSpec.scala Akka example source code
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor._
import akka.persistence.JournalProtocol.ReplayMessages
import akka.testkit._
import com.typesafe.config.Config
import scala.concurrent.duration._
object PersistentViewSpec {
private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
def receiveCommand = {
case msg ⇒
persist(msg) { m ⇒ probe ! s"${m}-${lastSequenceNr}" }
}
override def receiveRecover: Receive = {
case _ ⇒ // do nothing...
}
}
private class TestPersistentView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends PersistentView {
def this(name: String, probe: ActorRef, interval: FiniteDuration) =
this(name, probe, interval, None)
def this(name: String, probe: ActorRef) =
this(name, probe, 100.milliseconds)
override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system)
override val persistenceId: String = name
override val viewId: String = name + "-view"
var last: String = _
def receive = {
case "get" ⇒
probe ! last
case "boom" ⇒
throw new TestException("boom")
case payload if isPersistent && shouldFailOn(payload) ⇒
throw new TestException("boom")
case payload if isPersistent ⇒
last = s"replicated-${payload}-${lastSequenceNr}"
probe ! last
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
failAt = None
}
def shouldFailOn(m: Any): Boolean =
failAt.foldLeft(false) { (a, f) ⇒ a || (m == f) }
}
private class PassiveTestPersistentView(name: String, probe: ActorRef, var failAt: Option[String]) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
override def autoUpdate: Boolean = false
override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery
var last: String = _
def receive = {
case "get" ⇒
probe ! last
case payload if isPersistent && shouldFailOn(payload) ⇒
throw new TestException("boom")
case payload ⇒
last = s"replicated-${payload}-${lastSequenceNr}"
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
failAt = None
}
def shouldFailOn(m: Any): Boolean =
failAt.foldLeft(false) { (a, f) ⇒ a || (m == f) }
}
private class ActiveTestPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
override def autoUpdateInterval: FiniteDuration = 50.millis
override def autoUpdateReplayMax: Long = 2
def receive = {
case payload ⇒
probe ! s"replicated-${payload}-${lastSequenceNr}"
}
}
private class PersistentOrNotTestPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
def receive = {
case payload if isPersistent ⇒ probe ! s"replicated-${payload}-${lastSequenceNr}"
case payload ⇒ probe ! s"normal-${payload}-${lastSequenceNr}"
}
}
private class SnapshottingPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = s"${name}-replicator"
override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system)
var last: String = _
def receive = {
case "get" ⇒
probe ! last
case "snap" ⇒
saveSnapshot(last)
case "restart" ⇒
throw new TestException("restart requested")
case SaveSnapshotSuccess(_) ⇒
probe ! "snapped"
case SnapshotOffer(metadata, snapshot: String) ⇒
last = snapshot
probe ! last
case payload ⇒
last = s"replicated-${payload}-${lastSequenceNr}"
probe ! last
}
}
}
abstract class PersistentViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import akka.persistence.PersistentViewSpec._
var persistentActor: ActorRef = _
var view: ActorRef = _
var persistentActorProbe: TestProbe = _
var viewProbe: TestProbe = _
override protected def beforeEach(): Unit = {
super.beforeEach()
persistentActorProbe = TestProbe()
viewProbe = TestProbe()
persistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, persistentActorProbe.ref))
persistentActor ! "a"
persistentActor ! "b"
persistentActorProbe.expectMsg("a-1")
persistentActorProbe.expectMsg("b-2")
}
override protected def afterEach(): Unit = {
system.stop(persistentActor)
system.stop(view)
super.afterEach()
}
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[Delivered])
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Delivered]
def subscribeToReplay(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[ReplayMessages])
"A persistent view" must {
"receive past updates from a processor" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"receive live updates from a processor" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
viewProbe.expectMsg("replicated-c-3")
}
"run updates at specified interval" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 2.seconds))
// initial update is done on start
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
// live updates takes 5 seconds to replicate
persistentActor ! "c"
viewProbe.expectNoMsg(1.second)
viewProbe.expectMsg("replicated-c-3")
}
"run updates on user request" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view ! Update(await = false)
viewProbe.expectMsg("replicated-c-3")
}
"run updates on user request and await update" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view ! Update(await = true)
view ! "get"
viewProbe.expectMsg("replicated-c-3")
}
"run updates again on failure outside an update cycle" in {
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
view ! "boom"
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"run updates again on failure during an update cycle" in {
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view = system.actorOf(Props(classOf[TestPersistentView], name, viewProbe.ref, 5.seconds, Some("b")))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
"run size-limited updates on user request" in {
persistentActor ! "c"
persistentActor ! "d"
persistentActor ! "e"
persistentActor ! "f"
persistentActorProbe.expectMsg("c-3")
persistentActorProbe.expectMsg("d-4")
persistentActorProbe.expectMsg("e-5")
persistentActorProbe.expectMsg("f-6")
view = system.actorOf(Props(classOf[PassiveTestPersistentView], name, viewProbe.ref, None))
view ! Update(await = true, replayMax = 2)
view ! "get"
viewProbe.expectMsg("replicated-b-2")
view ! Update(await = true, replayMax = 1)
view ! "get"
viewProbe.expectMsg("replicated-c-3")
view ! Update(await = true, replayMax = 4)
view ! "get"
viewProbe.expectMsg("replicated-f-6")
}
"run size-limited updates automatically" in {
val replayProbe = TestProbe()
persistentActor ! "c"
persistentActor ! "d"
persistentActorProbe.expectMsg("c-3")
persistentActorProbe.expectMsg("d-4")
subscribeToReplay(replayProbe)
view = system.actorOf(Props(classOf[ActiveTestPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("replicated-d-4")
replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) ⇒ }
replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) ⇒ }
replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) ⇒ }
}
"check if an incoming message is persistent" in {
persistentActor ! "c"
persistentActorProbe.expectMsg("c-3")
view = system.actorOf(Props(classOf[PersistentOrNotTestPersistentView], name, viewProbe.ref))
view ! "d"
view ! "e"
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("normal-d-3")
viewProbe.expectMsg("normal-e-3")
persistentActor ! "f"
viewProbe.expectMsg("replicated-f-4")
}
"take snapshots" in {
view = system.actorOf(Props(classOf[SnapshottingPersistentView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
view ! "snap"
viewProbe.expectMsg("snapped")
view ! "restart"
persistentActor ! "c"
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
}
}
class LeveldbPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentViewSpec"))
class InmemPersistentViewSpec extends PersistentViewSpec(PersistenceSpec.config("inmem", "InmemPersistentViewSpec"))
Other Akka source code examplesHere is a short list of links related to this Akka PersistentViewSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.