|
Akka/Scala example source code file (FSMActorSpec.scala)
The FSMActorSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import language.postfixOps
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.testkit._
import TestEvent.Mute
import scala.concurrent.duration._
import akka.event._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import akka.util.Timeout
import org.scalatest.matchers.Matcher
import org.scalatest.matchers.HavePropertyMatcher
import org.scalatest.matchers.HavePropertyMatchResult
object FSMActorSpec {
val timeout = Timeout(2 seconds)
class Latches(implicit system: ActorSystem) {
val unlockedLatch = TestLatch()
val lockedLatch = TestLatch()
val unhandledLatch = TestLatch()
val terminatedLatch = TestLatch()
val transitionLatch = TestLatch()
val initialStateLatch = TestLatch()
val transitionCallBackLatch = TestLatch()
}
sealed trait LockState
case object Locked extends LockState
case object Open extends LockState
case object Hello
case object Bye
class Lock(code: String, timeout: FiniteDuration, latches: Latches) extends Actor with FSM[LockState, CodeState] {
import latches._
startWith(Locked, CodeState("", code))
when(Locked) {
case Event(digit: Char, CodeState(soFar, code)) ⇒ {
soFar + digit match {
case incomplete if incomplete.length < code.length ⇒
stay using CodeState(incomplete, code)
case codeTry if (codeTry == code) ⇒ {
doUnlock()
goto(Open) using CodeState("", code) forMax timeout
}
case wrong ⇒ {
stay using CodeState("", code)
}
}
}
case Event("hello", _) ⇒ stay replying "world"
case Event("bye", _) ⇒ stop(FSM.Shutdown)
}
when(Open) {
case Event(StateTimeout, _) ⇒ {
doLock()
goto(Locked)
}
}
whenUnhandled {
case Event(msg, _) ⇒ {
log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData)
unhandledLatch.open
stay
}
}
onTransition {
case Locked -> Open ⇒ transitionLatch.open
}
// verify that old-style does still compile
onTransition(transitionHandler _)
def transitionHandler(from: LockState, to: LockState) = {
// dummy
}
onTermination {
case StopEvent(FSM.Shutdown, Locked, _) ⇒
// stop is called from lockstate with shutdown as reason...
terminatedLatch.open()
}
// initialize the lock
initialize()
private def doLock(): Unit = lockedLatch.open()
private def doUnlock(): Unit = unlockedLatch.open()
}
final case class CodeState(soFar: String, code: String)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with ImplicitSender {
import FSMActorSpec._
"An FSM Actor" must {
"unlock the lock" in {
import FSM.{ Transition, CurrentState, SubscribeTransitionCallBack }
val latches = new Latches
import latches._
// lock that locked after being open for 1 sec
val lock = system.actorOf(Props(new Lock("33221", 1 second, latches)))
val transitionTester = system.actorOf(Props(new Actor {
def receive = {
case Transition(_, _, _) ⇒ transitionCallBackLatch.open
case CurrentState(_, s: LockState) if s eq Locked ⇒ initialStateLatch.open // SI-5900 workaround
}
}))
lock ! SubscribeTransitionCallBack(transitionTester)
Await.ready(initialStateLatch, timeout.duration)
lock ! '3'
lock ! '3'
lock ! '2'
lock ! '2'
lock ! '1'
Await.ready(unlockedLatch, timeout.duration)
Await.ready(transitionLatch, timeout.duration)
Await.ready(transitionCallBackLatch, timeout.duration)
Await.ready(lockedLatch, timeout.duration)
EventFilter.warning(start = "unhandled event", occurrences = 1) intercept {
lock ! "not_handled"
Await.ready(unhandledLatch, timeout.duration)
}
val answerLatch = TestLatch()
val tester = system.actorOf(Props(new Actor {
def receive = {
case Hello ⇒ lock ! "hello"
case "world" ⇒ answerLatch.open
case Bye ⇒ lock ! "bye"
}
}))
tester ! Hello
Await.ready(answerLatch, timeout.duration)
tester ! Bye
Await.ready(terminatedLatch, timeout.duration)
}
"log termination" in {
val fsm = TestActorRef(new Actor with FSM[Int, Null] {
startWith(1, null)
when(1) {
case Event("go", _) ⇒ goto(2)
}
})
val name = fsm.path.toString
EventFilter.error("Next state 2 does not exist", occurrences = 1) intercept {
system.eventStream.subscribe(testActor, classOf[Logging.Error])
fsm ! "go"
expectMsgPF(1 second, hint = "Next state 2 does not exist") {
case Logging.Error(_, `name`, _, "Next state 2 does not exist") ⇒ true
}
system.eventStream.unsubscribe(testActor)
}
}
"run onTermination upon ActorRef.stop()" in {
val started = TestLatch(1)
/*
* This lazy val trick is beyond evil: KIDS, DON'T TRY THIS AT HOME!
* It is necessary here because of the path-dependent type fsm.StopEvent.
*/
lazy val fsm = new Actor with FSM[Int, Null] {
override def preStart = { started.countDown }
startWith(1, null)
when(1) { FSM.NullFunction }
onTermination {
case x ⇒ testActor ! x
}
}
val ref = system.actorOf(Props(fsm))
Await.ready(started, timeout.duration)
system.stop(ref)
expectMsg(1 second, fsm.StopEvent(FSM.Shutdown, 1, null))
}
"run onTermination with updated state upon stop(reason, stateData)" in {
val expected = "pigdog"
val actor = system.actorOf(Props(new Actor with FSM[Int, String] {
startWith(1, null)
when(1) {
case Event(2, null) ⇒ stop(FSM.Normal, expected)
}
onTermination {
case StopEvent(FSM.Normal, 1, `expected`) ⇒ testActor ! "green"
}
}))
actor ! 2
expectMsg("green")
}
"cancel all timers when terminated" in {
val timerNames = List("timer-1", "timer-2", "timer-3")
// Lazy so fsmref can refer to checkTimersActive
lazy val fsmref = TestFSMRef(new Actor with FSM[String, Null] {
startWith("not-started", null)
when("not-started") {
case Event("start", _) ⇒ goto("started") replying "starting"
}
when("started", stateTimeout = 10 seconds) {
case Event("stop", _) ⇒ stop()
}
onTransition {
case "not-started" -> "started" ⇒
for (timerName ← timerNames) setTimer(timerName, (), 10 seconds, false)
}
onTermination {
case _ ⇒ {
checkTimersActive(false)
testActor ! "stopped"
}
}
})
def checkTimersActive(active: Boolean) {
for (timer ← timerNames) fsmref.isTimerActive(timer) should be(active)
fsmref.isStateTimerActive should be(active)
}
checkTimersActive(false)
fsmref ! "start"
expectMsg(1 second, "starting")
checkTimersActive(true)
fsmref ! "stop"
expectMsg(1 second, "stopped")
}
"log events and transitions if asked to do so" in {
import scala.collection.JavaConverters._
val config = ConfigFactory.parseMap(Map("akka.loglevel" -> "DEBUG", "akka.actor.serialize-messages" -> "off",
"akka.actor.debug.fsm" -> true).asJava).withFallback(system.settings.config)
val fsmEventSystem = ActorSystem("fsmEvent", config)
try {
new TestKit(fsmEventSystem) {
EventFilter.debug(occurrences = 5) intercept {
val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] {
startWith(1, null)
when(1) {
case Event("go", _) ⇒
setTimer("t", FSM.Shutdown, 1.5 seconds, false)
goto(2)
}
when(2) {
case Event("stop", _) ⇒
cancelTimer("t")
stop
}
onTermination {
case StopEvent(r, _, _) ⇒ testActor ! r
}
})
val name = fsm.path.toString
val fsmClass = fsm.underlyingActor.getClass
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true
}
expectMsg(1 second, Logging.Debug(name, fsmClass, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, fsmClass, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true
}
expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), FSM.Normal)
expectNoMsg(1 second)
system.eventStream.unsubscribe(testActor)
}
}
} finally {
TestKit.shutdownActorSystem(fsmEventSystem)
}
}
"fill rolling event log and hand it out" in {
val fsmref = TestActorRef(new Actor with LoggingFSM[Int, Int] {
override def logDepth = 3
startWith(1, 0)
when(1) {
case Event("count", c) ⇒ stay using (c + 1)
case Event("log", _) ⇒ stay replying getLog
}
})
fsmref ! "log"
val fsm = fsmref.underlyingActor
import FSM.LogEntry
expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log")))
fsmref ! "count"
fsmref ! "log"
expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log"), LogEntry(1, 0, "count"), LogEntry(1, 1, "log")))
fsmref ! "count"
fsmref ! "log"
expectMsg(1 second, IndexedSeq(LogEntry(1, 1, "log"), LogEntry(1, 1, "count"), LogEntry(1, 2, "log")))
}
"allow transforming of state results" in {
import akka.actor.FSM._
val fsmref = system.actorOf(Props(new Actor with FSM[Int, Int] {
startWith(0, 0)
when(0)(transform {
case Event("go", _) ⇒ stay
} using {
case x ⇒ goto(1)
})
when(1) {
case _ ⇒ stay
}
}))
fsmref ! SubscribeTransitionCallBack(testActor)
fsmref ! "go"
expectMsg(CurrentState(fsmref, 0))
expectMsg(Transition(fsmref, 0, 1))
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka FSMActorSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.