|
Akka/Scala example source code file (PersistentChannelSpec.scala)
The PersistentChannelSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.persistence import scala.concurrent.duration._ import scala.language.postfixOps import com.typesafe.config._ import akka.actor._ import akka.testkit._ object PersistentChannelSpec { class SlowDestination(probe: ActorRef, maxReceived: Long) extends Actor { import context.dispatcher val delay = 100.millis var received = Vector.empty[ConfirmablePersistent] def receive = { case cp: ConfirmablePersistent ⇒ if (received.isEmpty) context.system.scheduler.scheduleOnce(delay, self, "confirm") received :+= cp case "confirm" ⇒ if (received.size > maxReceived) probe ! s"number of received messages to high: ${received.size}" else probe ! received.head.payload received.head.confirm() received = received.tail if (received.nonEmpty) context.system.scheduler.scheduleOnce(delay, self, "confirm") } } } abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config) { import PersistentChannelSpec._ private def redeliverChannelSettings(listener: Option[ActorRef]): PersistentChannelSettings = PersistentChannelSettings(redeliverMax = 2, redeliverInterval = 100 milliseconds, redeliverFailureListener = listener, idleTimeout = 5.seconds) private def createDefaultTestChannel(name: String): ActorRef = system.actorOf(PersistentChannel.props(s"${name}-default", PersistentChannelSettings(idleTimeout = 5.seconds))) override def createDefaultTestChannel(): ActorRef = createDefaultTestChannel(name) override def createRedeliverTestChannel(): ActorRef = system.actorOf(PersistentChannel.props(s"${name}-redeliver", redeliverChannelSettings(None))) override def createRedeliverTestChannel(listener: Option[ActorRef]): ActorRef = system.actorOf(PersistentChannel.props(s"${name}-redeliver-listener", redeliverChannelSettings(listener))) "A persistent channel" must { "support Persistent replies to Deliver senders" in { val destProbe = TestProbe() val replyProbe = TestProbe() val channel1 = system.actorOf(PersistentChannel.props(s"${name}-with-reply", PersistentChannelSettings(replyPersistent = true))) channel1 tell (Deliver(Persistent("a"), destProbe.ref.path), replyProbe.ref) destProbe.expectMsgPF() { case cp @ ConfirmablePersistent("a", _, _) ⇒ cp.confirm() } replyProbe.expectMsgPF() { case Persistent("a", _) ⇒ } channel1 tell (Deliver(PersistentRepr("b", sequenceNr = 13), destProbe.ref.path), replyProbe.ref) destProbe.expectMsgPF() { case cp @ ConfirmablePersistent("b", 13, _) ⇒ cp.confirm() } replyProbe.expectMsgPF() { case Persistent("b", 13) ⇒ } system.stop(channel1) } "not modify certain persistent message fields" in { val destProbe = TestProbe() val persistent1 = PersistentRepr(payload = "a", persistenceId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel, sequenceNr = 13) val persistent2 = PersistentRepr(payload = "b", persistenceId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel) defaultTestChannel ! Deliver(persistent1, destProbe.ref.path) defaultTestChannel ! Deliver(persistent2, destProbe.ref.path) destProbe.expectMsgPF() { case cp @ ConfirmablePersistentImpl("a", 13, "p1", _, _, Seq("c1", "c2"), _, _, channel) ⇒ cp.confirm() } destProbe.expectMsgPF() { case cp @ ConfirmablePersistentImpl("b", 2, "p1", _, _, Seq("c1", "c2"), _, _, channel) ⇒ cp.confirm() } } "redeliver un-confirmed stored messages during recovery" in { val confirmProbe = TestProbe() val forwardProbe = TestProbe() subscribeToConfirmation(confirmProbe) val channel1 = createDefaultTestChannel("extra") channel1 tell (Deliver(Persistent("a1"), forwardProbe.ref.path), null) channel1 tell (Deliver(Persistent("a2"), forwardProbe.ref.path), null) forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a1", _, _) ⇒ /* no confirmation */ } forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a2", _, _) ⇒ m.confirm() } awaitConfirmation(confirmProbe) system.stop(channel1) val channel2 = createDefaultTestChannel("extra") channel2 tell (Deliver(Persistent("a3"), forwardProbe.ref.path), null) forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a1", _, _) ⇒ m.confirm() } forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a3", _, _) ⇒ m.confirm() } awaitConfirmation(confirmProbe) awaitConfirmation(confirmProbe) system.stop(channel2) } "not flood destinations" in { val probe = TestProbe() val settings = PersistentChannelSettings( redeliverMax = 0, redeliverInterval = 1.minute, pendingConfirmationsMax = 4, pendingConfirmationsMin = 2) val channel = system.actorOf(PersistentChannel.props(s"${name}-watermark", settings)) val destination = system.actorOf(Props(classOf[SlowDestination], probe.ref, settings.pendingConfirmationsMax)) 1 to 10 foreach { i ⇒ channel ! Deliver(Persistent(i), destination.path) } 1 to 10 foreach { i ⇒ probe.expectMsg(i) } system.stop(channel) } "redeliver on reset" in { val probe = TestProbe() val settings = PersistentChannelSettings( redeliverMax = 0, redeliverInterval = 1.minute, pendingConfirmationsMax = 4, pendingConfirmationsMin = 2) val channel = system.actorOf(PersistentChannel.props(s"${name}-reset", settings)) 1 to 3 foreach { i ⇒ channel ! Deliver(Persistent(i), probe.ref.path) } 1 to 3 foreach { i ⇒ probe.expectMsgPF() { case ConfirmablePersistent(`i`, _, _) ⇒ } } channel ! Reset 1 to 3 foreach { i ⇒ probe.expectMsgPF() { case ConfirmablePersistent(`i`, _, _) ⇒ } } system.stop(channel) } } } class LeveldbPersistentChannelSpec extends PersistentChannelSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentChannelSpec")) class InmemPersistentChannelSpec extends PersistentChannelSpec(PersistenceSpec.config("inmem", "InmemPersistentChannelSpec")) Other Akka source code examplesHere is a short list of links related to this Akka PersistentChannelSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.