|
Akka/Scala example source code file (PersistentPublisherSpec.scala)
The PersistentPublisherSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence.stream import scala.concurrent.duration._ import akka.actor._ import akka.persistence._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.testkit._ import akka.testkit.TestProbe // ------------------------------------------------------------------------------------------------ // FIXME: move this file to akka-persistence-experimental once going back to project dependencies // ------------------------------------------------------------------------------------------------ object PersistentPublisherSpec { class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { def receive = { case Persistent(payload, sequenceNr) ⇒ probe ! s"${payload}-${sequenceNr}" } } } class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) with PersistenceSpec { import PersistentPublisherSpec._ val numMessages = 10 val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis)) val materializer = FlowMaterializer(MaterializerSettings()) var processor1: ActorRef = _ var processor2: ActorRef = _ var processor1Probe: TestProbe = _ var processor2Probe: TestProbe = _ def processorId(num: Int): String = name + num override protected def beforeEach(): Unit = { super.beforeEach() processor1Probe = TestProbe() processor2Probe = TestProbe() processor1 = system.actorOf(Props(classOf[TestProcessor], processorId(1), processor1Probe.ref)) processor2 = system.actorOf(Props(classOf[TestProcessor], processorId(2), processor2Probe.ref)) 1 to numMessages foreach { i ⇒ processor1 ! Persistent("a") processor2 ! Persistent("b") processor1Probe.expectMsg(s"a-${i}") processor2Probe.expectMsg(s"b-${i}") } } override protected def afterEach(): Unit = { system.stop(processor1) system.stop(processor1) super.afterEach() } "A view producer" must { "pull existing messages from a processor's journal" in { val streamProbe = TestProbe() PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" }.consume(materializer) 1 to numMessages foreach { i ⇒ streamProbe.expectMsg(s"a-${i}") } } "pull existing messages and new from a processor's journal" in { val streamProbe = TestProbe() PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" }.consume(materializer) 1 to numMessages foreach { i ⇒ streamProbe.expectMsg(s"a-${i}") } processor1 ! Persistent("a") processor1 ! Persistent("a") streamProbe.expectMsg(s"a-${numMessages + 1}") streamProbe.expectMsg(s"a-${numMessages + 2}") } "pull existing messages from a processor's journal starting form a specified sequence number" in { val streamProbe = TestProbe() val fromSequenceNr = 5L PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach { case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" }.consume(materializer) fromSequenceNr to numMessages foreach { i ⇒ streamProbe.expectMsg(s"a-${i}") } } } "A view producer" can { "have several consumers" in { val streamProbe1 = TestProbe() val streamProbe2 = TestProbe() val producer = PersistentFlow.fromProcessor(processorId(1), publisherSettings).toProducer(materializer) Flow(producer).foreach { case Persistent(payload, sequenceNr) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" }.consume(materializer) // let consumer consume all existing messages 1 to numMessages foreach { i ⇒ streamProbe1.expectMsg(s"a-${i}") } // subscribe another consumer Flow(producer).foreach { case Persistent(payload, sequenceNr) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" }.consume(materializer) // produce new messages and let both consumers handle them 1 to 2 foreach { i ⇒ processor1 ! Persistent("a") streamProbe1.expectMsg(s"a-${numMessages + i}") streamProbe2.expectMsg(s"a-${numMessages + i}") } } } "A consumer" can { "consume from several view producers" in { val streamProbe1 = TestProbe() val streamProbe2 = TestProbe() val fromSequenceNr1 = 7L val fromSequenceNr2 = 3L val producer1 = PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toProducer(materializer) val producer2 = PersistentFlow.fromProcessor(processorId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toProducer(materializer) Flow(producer1).merge(producer2).foreach { case Persistent(payload: String, sequenceNr) if (payload.startsWith("a")) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" case Persistent(payload: String, sequenceNr) if (payload.startsWith("b")) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" }.consume(materializer) 1 to numMessages foreach { i ⇒ if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a-${i}") if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b-${i}") } } } } Other Akka source code examplesHere is a short list of links related to this Akka PersistentPublisherSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.