|
Akka/Scala example source code file (NumberProcessorSpec.scala)
The NumberProcessorSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.language.postfixOps
import com.typesafe.config._
import scala.concurrent.duration._
import akka.actor._
import akka.persistence._
import akka.testkit._
object NumberProcessorSpec {
case class SetNumber(number: Int)
case class Add(number: Int)
case class Subtract(number: Int)
case object DecrementAndGet
case object GetNumber
class NumberProcessorWithPersistentChannel(name: String) extends NamedProcessor(name) {
var num = 0
val channel = context.actorOf(PersistentChannel.props(channelId = "stable_id",
PersistentChannelSettings(redeliverInterval = 30 seconds, redeliverMax = 15)),
name = "myPersistentChannel")
def receive = {
case Persistent(SetNumber(number), _) ⇒ num = number
case Persistent(Add(number), _) ⇒ num = num + number
case Persistent(Subtract(number), _) ⇒ num = num - number
case GetNumber ⇒ channel ! Deliver(Persistent(num), sender().path)
case p @ Persistent(DecrementAndGet, _) ⇒
num = num - 1
channel ! Deliver(p.withPayload(num), sender().path)
}
}
}
/*
* This test found the problem described in ticket #3933
*/
class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "NumberProcessorSpec"))
with PersistenceSpec {
import NumberProcessorSpec._
"A processor using a persistent channel" must {
"resurrect with the correct state, not replaying confirmed messages to clients" in {
val deliveredProbe = TestProbe()
system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistentChannel])
val probe = TestProbe()
val processor = namedProcessor[NumberProcessorWithPersistentChannel]
processor.tell(GetNumber, probe.testActor)
val zero = probe.expectMsgType[ConfirmablePersistent]
zero.confirm()
zero.payload should equal(0)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
processor.tell(Persistent(DecrementAndGet), probe.testActor)
val decrementFrom0 = probe.expectMsgType[ConfirmablePersistent]
decrementFrom0.confirm()
decrementFrom0.payload should equal(-1)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
watch(processor)
system.stop(processor)
expectMsgType[Terminated]
val processorResurrected = namedProcessor[NumberProcessorWithPersistentChannel]
processorResurrected.tell(Persistent(DecrementAndGet), probe.testActor)
val decrementFromMinus1 = probe.expectMsgType[ConfirmablePersistent]
decrementFromMinus1.confirm()
decrementFromMinus1.payload should equal(-2)
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka NumberProcessorSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.