alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ViewSpec.scala)

This example Akka source code file (ViewSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, akka, concurrent, duration, finiteduration, none, persistent, string, test, testexception, testing, testprobe, time, unit, update, view

The ViewSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.persistence

import akka.actor._
import akka.persistence.JournalProtocol.ReplayMessages
import akka.testkit._
import com.typesafe.config.Config

import scala.concurrent.duration._

object ViewSpec {
  private class TestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
    def receiveCommand = {
      case msg ⇒
        persist(msg) { m ⇒ probe ! s"${m}-${lastSequenceNr}" }
    }

    override def receiveRecover: Receive = {
      case _ ⇒
    }
  }

  private class TestView(name: String, probe: ActorRef, interval: FiniteDuration, var failAt: Option[String]) extends View {
    def this(name: String, probe: ActorRef, interval: FiniteDuration) =
      this(name, probe, interval, None)

    def this(name: String, probe: ActorRef) =
      this(name, probe, 100.milliseconds)

    override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system)
    override val processorId: String = name

    var last: String = _

    def receive = {
      case "get" ⇒
        probe ! last
      case "boom" ⇒
        throw new TestException("boom")
      case Persistent(payload, _) if Some(payload) == failAt ⇒
        throw new TestException("boom")
      case Persistent(payload, sequenceNr) ⇒
        last = s"replicated-${payload}-${sequenceNr}"
        probe ! last
    }

    override def postRestart(reason: Throwable): Unit = {
      super.postRestart(reason)
      failAt = None
    }
  }

  private class PassiveTestView(name: String, probe: ActorRef, var failAt: Option[String]) extends View {
    override val persistenceId: String = name

    override def autoUpdate: Boolean = false
    override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery

    var last: String = _

    def receive = {
      case "get" ⇒
        probe ! last
      case Persistent(payload, _) if Some(payload) == failAt ⇒
        throw new TestException("boom")
      case Persistent(payload, sequenceNr) ⇒
        last = s"replicated-${payload}-${sequenceNr}"
    }

    override def postRestart(reason: Throwable): Unit = {
      super.postRestart(reason)
      failAt = None
    }

  }

  private class ActiveTestView(name: String, probe: ActorRef) extends View {
    override val persistenceId: String = name
    override def autoUpdateInterval: FiniteDuration = 50.millis
    override def autoUpdateReplayMax: Long = 2

    def receive = {
      case Persistent(payload, sequenceNr) ⇒
        probe ! s"replicated-${payload}-${sequenceNr}"
    }
  }

  private class TestDestination(probe: ActorRef) extends Actor {
    def receive = {
      case cp @ ConfirmablePersistent(payload, sequenceNr, _) ⇒
        cp.confirm()
        probe ! s"${payload}-${sequenceNr}"
    }
  }

  private class EmittingView(name: String, destination: ActorRef) extends View {
    override val persistenceId: String = name
    override def autoUpdateInterval: FiniteDuration = 100.milliseconds.dilated(context.system)

    val channel = context.actorOf(Channel.props(s"${name}-channel"))

    def receive = {
      case "restart" ⇒
        throw new TestException("restart requested")
      case Persistent(payload, sequenceNr) ⇒
        channel ! Deliver(Persistent(s"emitted-${payload}"), destination.path)
    }
  }

  private class SnapshottingView(name: String, probe: ActorRef) extends View {
    override val persistenceId: String = name
    override val viewId: String = s"${name}-replicator"

    override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system)

    var last: String = _

    def receive = {
      case "get" ⇒
        probe ! last
      case "snap" ⇒
        saveSnapshot(last)
      case "restart" ⇒
        throw new TestException("restart requested")
      case SaveSnapshotSuccess(_) ⇒
        probe ! "snapped"
      case SnapshotOffer(metadata, snapshot: String) ⇒
        last = snapshot
        probe ! last
      case Persistent(payload, sequenceNr) ⇒
        last = s"replicated-${payload}-${sequenceNr}"
        probe ! last
    }
  }
}

abstract class ViewSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
  import akka.persistence.ViewSpec._

  var persistor: ActorRef = _
  var view: ActorRef = _

  var processorProbe: TestProbe = _
  var viewProbe: TestProbe = _

  override protected def beforeEach(): Unit = {
    super.beforeEach()

    processorProbe = TestProbe()
    viewProbe = TestProbe()

    persistor = system.actorOf(Props(classOf[TestPersistentActor], name, processorProbe.ref))
    persistor ! "a"
    persistor ! "b"

    processorProbe.expectMsg("a-1")
    processorProbe.expectMsg("b-2")
  }

  override protected def afterEach(): Unit = {
    system.stop(persistor)
    system.stop(view)
    super.afterEach()
  }

  def subscribeToConfirmation(probe: TestProbe): Unit =
    system.eventStream.subscribe(probe.ref, classOf[Delivered])

  def awaitConfirmation(probe: TestProbe): Unit =
    probe.expectMsgType[Delivered]

  def subscribeToReplay(probe: TestProbe): Unit =
    system.eventStream.subscribe(probe.ref, classOf[ReplayMessages])

  "A view" must {
    "receive past updates from a processor" in {
      view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref))
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
    }
    "receive live updates from a processor" in {
      view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref))
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
      persistor ! "c"
      viewProbe.expectMsg("replicated-c-3")
    }
    "run updates at specified interval" in {
      view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 2.seconds))
      // initial update is done on start
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
      // live updates takes 5 seconds to replicate
      persistor ! "c"
      viewProbe.expectNoMsg(1.second)
      viewProbe.expectMsg("replicated-c-3")
    }
    "run updates on user request" in {
      view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds))
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
      persistor ! "c"
      processorProbe.expectMsg("c-3")
      view ! Update(await = false)
      viewProbe.expectMsg("replicated-c-3")
    }
    "run updates on user request and await update" in {
      view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds))
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
      persistor ! "c"
      processorProbe.expectMsg("c-3")
      view ! Update(await = true)
      view ! "get"
      viewProbe.expectMsg("replicated-c-3")
    }
    "run updates again on failure outside an update cycle" in {
      view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds))
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
      view ! "boom"
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
    }
    "run updates again on failure during an update cycle" in {
      persistor ! "c"
      processorProbe.expectMsg("c-3")
      view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref, 5.seconds, Some("b")))
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
      viewProbe.expectMsg("replicated-c-3")
    }
    "run size-limited updates on user request" in {
      persistor ! "c"
      persistor ! "d"
      persistor ! "e"
      persistor ! "f"

      processorProbe.expectMsg("c-3")
      processorProbe.expectMsg("d-4")
      processorProbe.expectMsg("e-5")
      processorProbe.expectMsg("f-6")

      view = system.actorOf(Props(classOf[PassiveTestView], name, viewProbe.ref, None))

      view ! Update(await = true, replayMax = 2)
      view ! "get"
      viewProbe.expectMsg("replicated-b-2")

      view ! Update(await = true, replayMax = 1)
      view ! "get"
      viewProbe.expectMsg("replicated-c-3")

      view ! Update(await = true, replayMax = 4)
      view ! "get"
      viewProbe.expectMsg("replicated-f-6")
    }
    "run size-limited updates automatically" in {
      val replayProbe = TestProbe()

      persistor ! "c"
      persistor ! "d"

      processorProbe.expectMsg("c-3")
      processorProbe.expectMsg("d-4")

      subscribeToReplay(replayProbe)

      view = system.actorOf(Props(classOf[ActiveTestView], name, viewProbe.ref))

      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
      viewProbe.expectMsg("replicated-c-3")
      viewProbe.expectMsg("replicated-d-4")

      replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) ⇒ }
      replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) ⇒ }
      replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) ⇒ }
    }
  }

  "A view" can {
    "use channels" in {
      val confirmProbe = TestProbe()
      val destinationProbe = TestProbe()
      val destination = system.actorOf(Props(classOf[TestDestination], destinationProbe.ref))

      subscribeToConfirmation(confirmProbe)

      view = system.actorOf(Props(classOf[EmittingView], name, destination))
      destinationProbe.expectMsg("emitted-a-1")
      destinationProbe.expectMsg("emitted-b-2")
      awaitConfirmation(confirmProbe)
      awaitConfirmation(confirmProbe)

      view ! "restart"
      persistor ! "c"

      destinationProbe.expectMsg("emitted-c-3")
      awaitConfirmation(confirmProbe)
    }
    "take snapshots" in {
      view = system.actorOf(Props(classOf[SnapshottingView], name, viewProbe.ref))
      viewProbe.expectMsg("replicated-a-1")
      viewProbe.expectMsg("replicated-b-2")
      view ! "snap"
      viewProbe.expectMsg("snapped")
      view ! "restart"
      persistor ! "c"
      viewProbe.expectMsg("replicated-b-2")
      viewProbe.expectMsg("replicated-c-3")
    }
  }
}

class LeveldbViewSpec extends ViewSpec(PersistenceSpec.config("leveldb", "LeveldbViewSpec"))
class InmemViewSpec extends ViewSpec(PersistenceSpec.config("inmem", "InmemViewSpec"))

Other Akka source code examples

Here is a short list of links related to this Akka ViewSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.