|
Akka/Scala example source code file (ProcessorChannelSpec.scala)
The ProcessorChannelSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import com.typesafe.config._
import akka.actor._
import akka.testkit._
object ProcessorChannelSpec {
class TestProcessor(name: String, channelProps: Props) extends NamedProcessor(name) {
val destination = context.actorOf(Props[TestDestination])
val channel = context.actorOf(channelProps)
def receive = {
case m @ Persistent(s: String, _) if s.startsWith("a") ⇒
// forward to destination via channel,
// destination replies to initial sender
channel forward Deliver(m.withPayload(s"fw: ${s}"), destination.path)
case m @ Persistent(s: String, _) if s.startsWith("b") ⇒
// reply to sender via channel
channel ! Deliver(m.withPayload(s"re: ${s}"), sender().path)
case m @ Persistent(s: String, _) if s.startsWith("c") ⇒
// don't use channel
sender() ! s"got: ${s}"
case "replay" ⇒ throw new TestException("replay requested")
}
}
class TestDestination extends Actor {
def receive = {
case m: Persistent ⇒ sender() ! m
}
}
class ResendingProcessor(name: String, channelProps: Props, destination: ActorRef) extends NamedProcessor(name) {
val channel = context.actorOf(channelProps)
def receive = {
case p: Persistent ⇒ channel ! Deliver(p, destination.path)
case "replay" ⇒ throw new TestException("replay requested")
}
}
class ResendingPersistentActor(name: String, channelProps: Props, destination: ActorRef) extends NamedProcessor(name) with PersistentActor {
val channel = context.actorOf(channelProps)
var events: List[String] = Nil
def handleEvent(event: String) = {
events = event :: events
channel ! Deliver(Persistent(event), destination.path)
}
def receiveRecover: Receive = {
case event: String ⇒ handleEvent(event)
}
def receiveCommand: Receive = {
case "cmd" ⇒ persist("evt")(handleEvent)
case "replay" ⇒ throw new TestException("replay requested")
}
}
}
abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ProcessorChannelSpec._
private var processor: ActorRef = _
override protected def beforeEach(): Unit = {
super.beforeEach()
setupTestProcessorData()
processor = createTestProcessor()
}
override protected def afterEach(): Unit = {
system.stop(processor)
super.afterEach()
}
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[Delivered])
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Delivered]
def createTestProcessor(): ActorRef =
system.actorOf(Props(classOf[TestProcessor], name, testChannelProps))
def testChannelProps: Props
def testResendingChannelProps: Props
def setupTestProcessorData(): Unit = {
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
val replyProbe = TestProbe()
val senderProbe = TestProbe()
val processor = createTestProcessor()
subscribeToConfirmation(confirmProbe)
processor tell (Persistent("a1"), forwardProbe.ref)
processor tell (Persistent("b1"), replyProbe.ref)
processor tell (Persistent("c1"), senderProbe.ref)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _, _) ⇒ m.confirm() }
replyProbe.expectMsgPF() { case m @ ConfirmablePersistent("re: b1", _, _) ⇒ m.confirm() }
senderProbe.expectMsg("got: c1")
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
system.stop(processor)
}
"A processor that uses a channel" can {
"forward new messages to destination" in {
processor ! Persistent("a2")
expectMsgPF() { case m @ ConfirmablePersistent("fw: a2", _, _) ⇒ m.confirm() }
}
"reply new messages to senders" in {
processor ! Persistent("b2")
expectMsgPF() { case m @ ConfirmablePersistent("re: b2", _, _) ⇒ m.confirm() }
}
"de-duplicate confirmed messages on restart" in {
processor ! Persistent("c3")
expectMsg("got: c3")
processor ! Persistent("a3")
expectMsgPF() { case m @ ConfirmablePersistent("fw: a3", _, _) ⇒ m.confirm() }
processor ! "replay"
expectMsg("got: c3")
expectNoMsg(1.second)
}
"de-duplicate confirmed messages on starting new with same processor id" in {
processor ! Persistent("c4")
expectMsg("got: c4")
processor ! Persistent("a4")
expectMsgPF() { case m @ ConfirmablePersistent("fw: a4", _, _) ⇒ m.confirm() }
val p2 = createTestProcessor()
expectMsg("got: c4")
expectNoMsg(1.second)
}
"resend unconfirmed messages on restart" in {
val probe = TestProbe()
val p = system.actorOf(Props(classOf[ResendingProcessor], "rp", testResendingChannelProps, probe.ref))
p ! Persistent("a")
probe.expectMsgPF() { case cp @ ConfirmablePersistent("a", 1L, 0) ⇒ }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("a", 1L, 1) ⇒ }
probe.expectNoMsg(200 milliseconds)
p ! "replay"
probe.expectMsgPF() { case cp @ ConfirmablePersistent("a", 1L, 0) ⇒ }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("a", 1L, 1) ⇒ cp.confirm() }
}
}
"A persistent actor that uses a channel" can {
"reliably deliver events" in {
val probe = TestProbe()
val ep = system.actorOf(Props(classOf[ResendingPersistentActor], "rep", testResendingChannelProps, probe.ref))
ep ! "cmd"
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 0) ⇒ }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 1) ⇒ }
probe.expectNoMsg(200 milliseconds)
ep ! "replay"
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 0) ⇒ }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", _, 1) ⇒ cp.confirm() }
}
}
}
class LeveldbProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorChannelSpec")) {
def testChannelProps: Props = Channel.props(s"${name}-channel")
def testResendingChannelProps: Props =
Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))
}
class InmemProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "InmemProcessorChannelSpec")) {
def testChannelProps: Props = Channel.props(s"${name}-channel")
def testResendingChannelProps: Props =
Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))
}
class LeveldbProcessorPersistentChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "LeveldbProcessorPersistentChannelSpec")) {
def testChannelProps: Props = PersistentChannel.props(s"${name}-channel")
def testResendingChannelProps: Props =
PersistentChannel.props("channel", PersistentChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))
}
class InmemProcessorPersistentChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "InmemProcessorPersistentChannelSpec")) {
def testChannelProps: Props = PersistentChannel.props(s"${name}-channel")
def testResendingChannelProps: Props =
PersistentChannel.props("channel", PersistentChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds))
}
Other Akka source code examplesHere is a short list of links related to this Akka ProcessorChannelSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.