alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (PersistentPublisherSpec.scala)

This example Akka source code file (PersistentPublisherSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

a, actorref, akka, concurrent, flowmaterializer, namedprocessor, persistence, persistent, persistentpublishersettings, persistentpublisherspec, string, test, testing, testprobe, time, unit

The PersistentPublisherSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.persistence.stream

import scala.concurrent.duration._

import akka.actor._
import akka.persistence._
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.testkit.TestProbe

// ------------------------------------------------------------------------------------------------
// FIXME: move this file to akka-persistence-experimental once going back to project dependencies
// ------------------------------------------------------------------------------------------------

object PersistentPublisherSpec {
  class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
    def receive = {
      case Persistent(payload, sequenceNr) ⇒ probe ! s"${payload}-${sequenceNr}"
    }
  }
}

class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) with PersistenceSpec {
  import PersistentPublisherSpec._

  val numMessages = 10

  val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis))
  val materializer = FlowMaterializer(MaterializerSettings())

  var processor1: ActorRef = _
  var processor2: ActorRef = _

  var processor1Probe: TestProbe = _
  var processor2Probe: TestProbe = _

  def processorId(num: Int): String =
    name + num

  override protected def beforeEach(): Unit = {
    super.beforeEach()

    processor1Probe = TestProbe()
    processor2Probe = TestProbe()

    processor1 = system.actorOf(Props(classOf[TestProcessor], processorId(1), processor1Probe.ref))
    processor2 = system.actorOf(Props(classOf[TestProcessor], processorId(2), processor2Probe.ref))

    1 to numMessages foreach { i ⇒
      processor1 ! Persistent("a")
      processor2 ! Persistent("b")

      processor1Probe.expectMsg(s"a-${i}")
      processor2Probe.expectMsg(s"b-${i}")
    }
  }

  override protected def afterEach(): Unit = {
    system.stop(processor1)
    system.stop(processor1)
    super.afterEach()
  }

  "A view producer" must {
    "pull existing messages from a processor's journal" in {
      val streamProbe = TestProbe()

      PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach {
        case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}"
      }.consume(materializer)

      1 to numMessages foreach { i ⇒
        streamProbe.expectMsg(s"a-${i}")
      }
    }
    "pull existing messages and new from a processor's journal" in {
      val streamProbe = TestProbe()

      PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach {
        case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}"
      }.consume(materializer)

      1 to numMessages foreach { i ⇒
        streamProbe.expectMsg(s"a-${i}")
      }

      processor1 ! Persistent("a")
      processor1 ! Persistent("a")

      streamProbe.expectMsg(s"a-${numMessages + 1}")
      streamProbe.expectMsg(s"a-${numMessages + 2}")
    }
    "pull existing messages from a processor's journal starting form a specified sequence number" in {
      val streamProbe = TestProbe()
      val fromSequenceNr = 5L

      PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach {
        case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}"
      }.consume(materializer)

      fromSequenceNr to numMessages foreach { i ⇒
        streamProbe.expectMsg(s"a-${i}")
      }
    }
  }

  "A view producer" can {
    "have several consumers" in {
      val streamProbe1 = TestProbe()
      val streamProbe2 = TestProbe()

      val producer = PersistentFlow.fromProcessor(processorId(1), publisherSettings).toProducer(materializer)

      Flow(producer).foreach {
        case Persistent(payload, sequenceNr) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}"
      }.consume(materializer)

      // let consumer consume all existing messages
      1 to numMessages foreach { i ⇒
        streamProbe1.expectMsg(s"a-${i}")
      }

      // subscribe another consumer
      Flow(producer).foreach {
        case Persistent(payload, sequenceNr) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}"
      }.consume(materializer)

      // produce new messages and let both consumers handle them
      1 to 2 foreach { i ⇒
        processor1 ! Persistent("a")
        streamProbe1.expectMsg(s"a-${numMessages + i}")
        streamProbe2.expectMsg(s"a-${numMessages + i}")
      }
    }
  }

  "A consumer" can {
    "consume from several view producers" in {
      val streamProbe1 = TestProbe()
      val streamProbe2 = TestProbe()

      val fromSequenceNr1 = 7L
      val fromSequenceNr2 = 3L

      val producer1 = PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toProducer(materializer)
      val producer2 = PersistentFlow.fromProcessor(processorId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toProducer(materializer)

      Flow(producer1).merge(producer2).foreach {
        case Persistent(payload: String, sequenceNr) if (payload.startsWith("a")) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}"
        case Persistent(payload: String, sequenceNr) if (payload.startsWith("b")) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}"
      }.consume(materializer)

      1 to numMessages foreach { i ⇒
        if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a-${i}")
        if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b-${i}")
      }
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka PersistentPublisherSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.