|
Akka/Scala example source code file (ChannelSpec.scala)
The ChannelSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import com.typesafe.config._
import akka.actor._
import akka.testkit._
object ChannelSpec {
class TestDestinationProcessor(name: String) extends NamedProcessor(name) {
def receive = {
case cp @ ConfirmablePersistent("a", _, _) ⇒ cp.confirm()
case cp @ ConfirmablePersistent("b", _, _) ⇒ cp.confirm()
case cp @ ConfirmablePersistent("boom", _, _) if (recoveryFinished) ⇒ throw new TestException("boom")
}
}
class TestReceiver(testActor: ActorRef) extends Actor {
def receive = {
case cp @ ConfirmablePersistent(payload, _, _) ⇒
testActor ! payload
cp.confirm()
}
}
class TestListener(probe: ActorRef) extends Actor {
def receive = {
case RedeliverFailure(messages) ⇒ messages.foreach(probe ! _.payload)
}
}
}
abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ChannelSpec._
protected var defaultTestChannel: ActorRef = _
protected var redeliverTestChannel: ActorRef = _
override protected def beforeEach: Unit = {
super.beforeEach()
defaultTestChannel = createDefaultTestChannel()
redeliverTestChannel = createRedeliverTestChannel()
}
override protected def afterEach(): Unit = {
system.stop(defaultTestChannel)
system.stop(redeliverTestChannel)
super.afterEach()
}
private def redeliverChannelSettings(listener: Option[ActorRef]): ChannelSettings =
ChannelSettings(redeliverMax = 2, redeliverInterval = 100 milliseconds, redeliverFailureListener = listener)
def createDefaultTestChannel(): ActorRef =
system.actorOf(Channel.props(s"${name}-default", ChannelSettings()))
def createRedeliverTestChannel(): ActorRef =
system.actorOf(Channel.props(s"${name}-redeliver", redeliverChannelSettings(None)))
def createRedeliverTestChannel(listener: Option[ActorRef]): ActorRef =
system.actorOf(Channel.props(s"${name}-redeliver-listener", redeliverChannelSettings(listener)))
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[Delivered])
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Delivered]
def actorRefFor(topLevelName: String) =
extension.system.provider.resolveActorRef(RootActorPath(Address("akka", system.name)) / "user" / topLevelName)
"A channel" must {
"must resolve destination references and preserve message order" in {
val empty = actorRefFor("testDestination") // will be an EmptyLocalActorRef
val probe = TestProbe()
val destination = system.actorOf(Props(classOf[TestReceiver], probe.ref), "testDestination")
defaultTestChannel ! Deliver(PersistentRepr("a"), empty.path)
defaultTestChannel ! Deliver(Persistent("b"), destination.path)
defaultTestChannel ! Deliver(Persistent("c"), destination.path)
probe.expectMsg("a")
probe.expectMsg("b")
probe.expectMsg("c")
}
"support processors as destination" in {
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], name))
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
defaultTestChannel ! Deliver(Persistent("a"), destination.path)
awaitConfirmation(confirmProbe)
}
"support processors as destination that may fail" in {
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], name))
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
defaultTestChannel ! Deliver(Persistent("a"), destination.path)
defaultTestChannel ! Deliver(Persistent("boom"), destination.path)
defaultTestChannel ! Deliver(Persistent("b"), destination.path)
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
}
"accept confirmable persistent messages for delivery" in {
val confirmProbe = TestProbe()
val destinationProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
defaultTestChannel ! Deliver(PersistentRepr("a", confirmable = true), destinationProbe.ref.path)
destinationProbe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, _) ⇒ m.confirm() }
awaitConfirmation(confirmProbe)
}
"redeliver on missing confirmation" in {
val probe = TestProbe()
redeliverTestChannel ! Deliver(Persistent("b"), probe.ref.path)
probe.expectMsgPF() { case m @ ConfirmablePersistent("b", _, redeliveries) ⇒ redeliveries should be(0) }
probe.expectMsgPF() { case m @ ConfirmablePersistent("b", _, redeliveries) ⇒ redeliveries should be(1) }
probe.expectMsgPF() { case m @ ConfirmablePersistent("b", _, redeliveries) ⇒ redeliveries should be(2); m.confirm() }
}
"redeliver in correct relative order" in {
val deliveries = redeliverChannelSettings(None).redeliverMax + 1
val interval = redeliverChannelSettings(None).redeliverInterval.toMillis / 5 * 4
val probe = TestProbe()
val cycles = 9
1 to cycles foreach { i ⇒
redeliverTestChannel ! Deliver(Persistent(i), probe.ref.path)
Thread.sleep(interval)
}
val received = (1 to (cycles * deliveries)).foldLeft(Vector.empty[ConfirmablePersistent]) {
case (acc, _) ⇒ acc :+ probe.expectMsgType[ConfirmablePersistent]
}
val grouped = received.groupBy(_.redeliveries)
val expected = 1 to 9 toVector
grouped(0).map(_.payload) should be(expected)
grouped(1).map(_.payload) should be(expected)
grouped(2).map(_.payload) should be(expected)
}
"redeliver not more than redeliverMax on missing confirmation" in {
val probe = TestProbe()
redeliverTestChannel ! Deliver(PersistentRepr("a"), probe.ref.path)
probe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, redeliveries) ⇒ redeliveries should be(0) }
probe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, redeliveries) ⇒ redeliveries should be(1) }
probe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, redeliveries) ⇒ redeliveries should be(2) }
probe.expectNoMsg(300 milliseconds)
}
"preserve message order to the same destination" in {
val probe = TestProbe()
val destination = system.actorOf(Props(classOf[TestReceiver], probe.ref))
1 to 10 foreach { i ⇒
defaultTestChannel ! Deliver(PersistentRepr(s"test-${i}"), destination.path)
}
1 to 10 foreach { i ⇒
probe.expectMsg(s"test-${i}")
}
}
"notify redelivery failure listener" in {
val probe = TestProbe()
val listener = system.actorOf(Props(classOf[TestListener], probe.ref))
val channel = createRedeliverTestChannel(Some(listener))
1 to 3 foreach { i ⇒ channel ! Deliver(Persistent(i), system.deadLetters.path) }
probe.expectMsgAllOf(1, 2, 3)
system.stop(channel)
}
}
}
class LeveldbChannelSpec extends ChannelSpec(PersistenceSpec.config("leveldb", "LeveldbChannelSpec"))
class InmemChannelSpec extends ChannelSpec(PersistenceSpec.config("inmem", "InmemChannelSpec"))
Other Akka source code examplesHere is a short list of links related to this Akka ChannelSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.