|
Akka/Scala example source code file (ViewSpec.scala)
The ViewSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor._
import akka.persistence.JournalProtocol.ReplayMessages
import akka.testkit._
import com.typesafe.config.Config
import scala.concurrent.duration._
object ViewSpec {
private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
def receiveCommand = {
case msg ⇒
persist(msg) { m ⇒ probe ! s"${m}-${lastSequenceNr}" }
}
override def receiveRecover: Receive = {
case _ ⇒
}
}
private class TestView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends View {
def this(name: String, probe: ActorRef, interval: FiniteDuration) =
this(name, probe, interval, None)
def this(name: String, probe: ActorRef) =
this(name, probe, 100.milliseconds)
override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system)
override val processorId: String = name
var last: String = _
def receive = {
case "get" ⇒
probe ! last
case "boom" ⇒
throw new TestException("boom")
case Persistent(payload, _) if Some(payload) == failAt ⇒
throw new TestException("boom")
case Persistent(payload, sequenceNr) ⇒
last = s"replicated-${payload}-${sequenceNr}"
probe ! last
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
failAt = None
}
}
private class PassiveTestView(name: String, probe: ActorRef, var failAt: Option[String]) extends View {
override val persistenceId: String = name
override def autoUpdate: Boolean = false
override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery
var last: String = _
def receive = {
case "get" ⇒
probe ! last
case Persistent(payload, _) if Some(payload) == failAt ⇒
throw new TestException("boom")
case Persistent(payload, sequenceNr) ⇒
last = s"replicated-${payload}-${sequenceNr}"
}
override def postRestart(reason: Throwable): Unit = {
super.postRestart(reason)
failAt = None
}
}
private class ActiveTestView(name: String, probe: ActorRef) extends View {
override val persistenceId: String = name
override def autoUpdateInterval: FiniteDuration = 50.millis
override def autoUpdateReplayMax: Long = 2
def receive = {
case Persistent(payload, sequenceNr) ⇒
probe ! s"replicated-${payload}-${sequenceNr}"
}
}
private class TestDestination(probe: ActorRef) extends Actor {
def receive = {
case cp @ ConfirmablePersistent(payload, sequenceNr, _) ⇒
cp.confirm()
probe ! s"${payload}-${sequenceNr}"
}
}
private class EmittingView(name: String, destination: ActorRef) extends View {
override val persistenceId: String = name
override def autoUpdateInterval: FiniteDuration = 100.milliseconds.dilated(context.system)
val channel = context.actorOf(Channel.props(s"${name}-channel"))
def receive = {
case "restart" ⇒
throw new TestException("restart requested")
case Persistent(payload, sequenceNr) ⇒
channel ! Deliver(Persistent(s"emitted-${payload}"), destination.path)
}
}
private class SnapshottingView(name: String, probe: ActorRef) extends View {
override val persistenceId: String = name
override val viewId: String = s"${name}-replicator"
override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system)
var last: String = _
def receive = {
case "get" ⇒
probe ! last
case "snap" ⇒
saveSnapshot(last)
case "restart" ⇒
throw new TestException("restart requested")
case SaveSnapshotSuccess(_) ⇒
probe ! "snapped"
case SnapshotOffer(metadata, snapshot: String) ⇒
last = snapshot
probe ! last
case Persistent(payload, sequenceNr) ⇒
last = s"replicated-${payload}-${sequenceNr}"
probe ! last
}
}
}
abstract class ViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import akka.persistence.ViewSpec._
var persistor: ActorRef = _
var view: ActorRef = _
var processorProbe: TestProbe = _
var viewProbe: TestProbe = _
override protected def beforeEach(): Unit = {
super.beforeEach()
processorProbe = TestProbe()
viewProbe = TestProbe()
persistor = system.actorOf(Props(classOf[TestPersistentActor], name, processorProbe.ref))
persistor ! "a"
persistor ! "b"
processorProbe.expectMsg("a-1")
processorProbe.expectMsg("b-2")
}
override protected def afterEach(): Unit = {
system.stop(persistor)
system.stop(view)
super.afterEach()
}
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[Delivered])
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Delivered]
def subscribeToReplay(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[ReplayMessages])
"A view" must {
"receive past updates from a processor" in {
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"receive live updates from a processor" in {
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistor ! "c"
viewProbe.expectMsg("replicated-c-3")
}
"run updates at specified interval" in {
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 2.seconds))
// initial update is done on start
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
// live updates takes 5 seconds to replicate
persistor ! "c"
viewProbe.expectNoMsg(1.second)
viewProbe.expectMsg("replicated-c-3")
}
"run updates on user request" in {
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistor ! "c"
processorProbe.expectMsg("c-3")
view ! Update(await = false)
viewProbe.expectMsg("replicated-c-3")
}
"run updates on user request and await update" in {
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
persistor ! "c"
processorProbe.expectMsg("c-3")
view ! Update(await = true)
view ! "get"
viewProbe.expectMsg("replicated-c-3")
}
"run updates again on failure outside an update cycle" in {
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
view ! "boom"
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
}
"run updates again on failure during an update cycle" in {
persistor ! "c"
processorProbe.expectMsg("c-3")
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds, Some("b")))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
"run size-limited updates on user request" in {
persistor ! "c"
persistor ! "d"
persistor ! "e"
persistor ! "f"
processorProbe.expectMsg("c-3")
processorProbe.expectMsg("d-4")
processorProbe.expectMsg("e-5")
processorProbe.expectMsg("f-6")
view = system.actorOf(Props(classOf[PassiveTestView], name, viewProbe.ref, None))
view ! Update(await = true, replayMax = 2)
view ! "get"
viewProbe.expectMsg("replicated-b-2")
view ! Update(await = true, replayMax = 1)
view ! "get"
viewProbe.expectMsg("replicated-c-3")
view ! Update(await = true, replayMax = 4)
view ! "get"
viewProbe.expectMsg("replicated-f-6")
}
"run size-limited updates automatically" in {
val replayProbe = TestProbe()
persistor ! "c"
persistor ! "d"
processorProbe.expectMsg("c-3")
processorProbe.expectMsg("d-4")
subscribeToReplay(replayProbe)
view = system.actorOf(Props(classOf[ActiveTestView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("replicated-d-4")
replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) ⇒ }
replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) ⇒ }
replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) ⇒ }
}
}
"A view" can {
"use channels" in {
val confirmProbe = TestProbe()
val destinationProbe = TestProbe()
val destination = system.actorOf(Props(classOf[TestDestination], destinationProbe.ref))
subscribeToConfirmation(confirmProbe)
view = system.actorOf(Props(classOf[EmittingView], name, destination))
destinationProbe.expectMsg("emitted-a-1")
destinationProbe.expectMsg("emitted-b-2")
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
view ! "restart"
persistor ! "c"
destinationProbe.expectMsg("emitted-c-3")
awaitConfirmation(confirmProbe)
}
"take snapshots" in {
view = system.actorOf(Props(classOf[SnapshottingView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
view ! "snap"
viewProbe.expectMsg("snapped")
view ! "restart"
persistor ! "c"
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
}
}
class LeveldbViewSpec extends ViewSpec(PersistenceSpec.config("leveldb", "LeveldbViewSpec"))
class InmemViewSpec extends ViewSpec(PersistenceSpec.config("inmem", "InmemViewSpec"))
Other Akka source code examplesHere is a short list of links related to this Akka ViewSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.