alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (FSMActorSpec.scala)

This example Akka source code file (FSMActorSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, codestate, concurrent, duration, event, fsm, locked, lockstate, logentry, null, string, test, testing, testlatch, time

The FSMActorSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.actor

import language.postfixOps
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.testkit._
import TestEvent.Mute
import scala.concurrent.duration._
import akka.event._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import akka.util.Timeout
import org.scalatest.matchers.Matcher
import org.scalatest.matchers.HavePropertyMatcher
import org.scalatest.matchers.HavePropertyMatchResult

object FSMActorSpec {
  val timeout = Timeout(2 seconds)

  class Latches(implicit system: ActorSystem) {
    val unlockedLatch = TestLatch()
    val lockedLatch = TestLatch()
    val unhandledLatch = TestLatch()
    val terminatedLatch = TestLatch()
    val transitionLatch = TestLatch()
    val initialStateLatch = TestLatch()
    val transitionCallBackLatch = TestLatch()
  }

  sealed trait LockState
  case object Locked extends LockState
  case object Open extends LockState

  case object Hello
  case object Bye

  class Lock(code: String, timeout: FiniteDuration, latches: Latches) extends Actor with FSM[LockState, CodeState] {

    import latches._

    startWith(Locked, CodeState("", code))

    when(Locked) {
      case Event(digit: Char, CodeState(soFar, code)) ⇒ {
        soFar + digit match {
          case incomplete if incomplete.length < code.length ⇒
            stay using CodeState(incomplete, code)
          case codeTry if (codeTry == code) ⇒ {
            doUnlock()
            goto(Open) using CodeState("", code) forMax timeout
          }
          case wrong ⇒ {
            stay using CodeState("", code)
          }
        }
      }
      case Event("hello", _) ⇒ stay replying "world"
      case Event("bye", _)   ⇒ stop(FSM.Shutdown)
    }

    when(Open) {
      case Event(StateTimeout, _) ⇒ {
        doLock()
        goto(Locked)
      }
    }

    whenUnhandled {
      case Event(msg, _) ⇒ {
        log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData)
        unhandledLatch.open
        stay
      }
    }

    onTransition {
      case Locked -> Open ⇒ transitionLatch.open
    }

    // verify that old-style does still compile
    onTransition(transitionHandler _)

    def transitionHandler(from: LockState, to: LockState) = {
      // dummy
    }

    onTermination {
      case StopEvent(FSM.Shutdown, Locked, _) ⇒
        // stop is called from lockstate with shutdown as reason...
        terminatedLatch.open()
    }

    // initialize the lock
    initialize()

    private def doLock(): Unit = lockedLatch.open()

    private def doUnlock(): Unit = unlockedLatch.open()
  }

  final case class CodeState(soFar: String, code: String)
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with ImplicitSender {
  import FSMActorSpec._

  "An FSM Actor" must {

    "unlock the lock" in {

      import FSM.{ Transition, CurrentState, SubscribeTransitionCallBack }

      val latches = new Latches
      import latches._

      // lock that locked after being open for 1 sec
      val lock = system.actorOf(Props(new Lock("33221", 1 second, latches)))

      val transitionTester = system.actorOf(Props(new Actor {
        def receive = {
          case Transition(_, _, _)                          ⇒ transitionCallBackLatch.open
          case CurrentState(_, s: LockState) if s eq Locked ⇒ initialStateLatch.open // SI-5900 workaround
        }
      }))

      lock ! SubscribeTransitionCallBack(transitionTester)
      Await.ready(initialStateLatch, timeout.duration)

      lock ! '3'
      lock ! '3'
      lock ! '2'
      lock ! '2'
      lock ! '1'

      Await.ready(unlockedLatch, timeout.duration)
      Await.ready(transitionLatch, timeout.duration)
      Await.ready(transitionCallBackLatch, timeout.duration)
      Await.ready(lockedLatch, timeout.duration)

      EventFilter.warning(start = "unhandled event", occurrences = 1) intercept {
        lock ! "not_handled"
        Await.ready(unhandledLatch, timeout.duration)
      }

      val answerLatch = TestLatch()
      val tester = system.actorOf(Props(new Actor {
        def receive = {
          case Hello   ⇒ lock ! "hello"
          case "world" ⇒ answerLatch.open
          case Bye     ⇒ lock ! "bye"
        }
      }))
      tester ! Hello
      Await.ready(answerLatch, timeout.duration)

      tester ! Bye
      Await.ready(terminatedLatch, timeout.duration)
    }

    "log termination" in {
      val fsm = TestActorRef(new Actor with FSM[Int, Null] {
        startWith(1, null)
        when(1) {
          case Event("go", _) ⇒ goto(2)
        }
      })
      val name = fsm.path.toString
      EventFilter.error("Next state 2 does not exist", occurrences = 1) intercept {
        system.eventStream.subscribe(testActor, classOf[Logging.Error])
        fsm ! "go"
        expectMsgPF(1 second, hint = "Next state 2 does not exist") {
          case Logging.Error(_, `name`, _, "Next state 2 does not exist") ⇒ true
        }
        system.eventStream.unsubscribe(testActor)
      }
    }

    "run onTermination upon ActorRef.stop()" in {
      val started = TestLatch(1)
      /*
       * This lazy val trick is beyond evil: KIDS, DON'T TRY THIS AT HOME!
       * It is necessary here because of the path-dependent type fsm.StopEvent.
       */
      lazy val fsm = new Actor with FSM[Int, Null] {
        override def preStart = { started.countDown }
        startWith(1, null)
        when(1) { FSM.NullFunction }
        onTermination {
          case x ⇒ testActor ! x
        }
      }
      val ref = system.actorOf(Props(fsm))
      Await.ready(started, timeout.duration)
      system.stop(ref)
      expectMsg(1 second, fsm.StopEvent(FSM.Shutdown, 1, null))
    }

    "run onTermination with updated state upon stop(reason, stateData)" in {
      val expected = "pigdog"
      val actor = system.actorOf(Props(new Actor with FSM[Int, String] {
        startWith(1, null)
        when(1) {
          case Event(2, null) ⇒ stop(FSM.Normal, expected)
        }
        onTermination {
          case StopEvent(FSM.Normal, 1, `expected`) ⇒ testActor ! "green"
        }
      }))
      actor ! 2
      expectMsg("green")
    }

    "cancel all timers when terminated" in {
      val timerNames = List("timer-1", "timer-2", "timer-3")

      // Lazy so fsmref can refer to checkTimersActive
      lazy val fsmref = TestFSMRef(new Actor with FSM[String, Null] {
        startWith("not-started", null)
        when("not-started") {
          case Event("start", _) ⇒ goto("started") replying "starting"
        }
        when("started", stateTimeout = 10 seconds) {
          case Event("stop", _) ⇒ stop()
        }
        onTransition {
          case "not-started" -> "started" ⇒
            for (timerName ← timerNames) setTimer(timerName, (), 10 seconds, false)
        }
        onTermination {
          case _ ⇒ {
            checkTimersActive(false)
            testActor ! "stopped"
          }
        }
      })

      def checkTimersActive(active: Boolean) {
        for (timer ← timerNames) fsmref.isTimerActive(timer) should be(active)
        fsmref.isStateTimerActive should be(active)
      }

      checkTimersActive(false)

      fsmref ! "start"
      expectMsg(1 second, "starting")
      checkTimersActive(true)

      fsmref ! "stop"
      expectMsg(1 second, "stopped")
    }

    "log events and transitions if asked to do so" in {
      import scala.collection.JavaConverters._
      val config = ConfigFactory.parseMap(Map("akka.loglevel" -> "DEBUG", "akka.actor.serialize-messages" -> "off",
        "akka.actor.debug.fsm" -> true).asJava).withFallback(system.settings.config)
      val fsmEventSystem = ActorSystem("fsmEvent", config)
      try {
        new TestKit(fsmEventSystem) {
          EventFilter.debug(occurrences = 5) intercept {
            val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] {
              startWith(1, null)
              when(1) {
                case Event("go", _) ⇒
                  setTimer("t", FSM.Shutdown, 1.5 seconds, false)
                  goto(2)
              }
              when(2) {
                case Event("stop", _) ⇒
                  cancelTimer("t")
                  stop
              }
              onTermination {
                case StopEvent(r, _, _) ⇒ testActor ! r
              }
            })
            val name = fsm.path.toString
            val fsmClass = fsm.underlyingActor.getClass
            system.eventStream.subscribe(testActor, classOf[Logging.Debug])
            fsm ! "go"
            expectMsgPF(1 second, hint = "processing Event(go,null)") {
              case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true
            }
            expectMsg(1 second, Logging.Debug(name, fsmClass, "setting timer 't'/1500 milliseconds: Shutdown"))
            expectMsg(1 second, Logging.Debug(name, fsmClass, "transition 1 -> 2"))
            fsm ! "stop"
            expectMsgPF(1 second, hint = "processing Event(stop,null)") {
              case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true
            }
            expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), FSM.Normal)
            expectNoMsg(1 second)
            system.eventStream.unsubscribe(testActor)
          }
        }
      } finally {
        TestKit.shutdownActorSystem(fsmEventSystem)
      }
    }

    "fill rolling event log and hand it out" in {
      val fsmref = TestActorRef(new Actor with LoggingFSM[Int, Int] {
        override def logDepth = 3
        startWith(1, 0)
        when(1) {
          case Event("count", c) ⇒ stay using (c + 1)
          case Event("log", _)   ⇒ stay replying getLog
        }
      })
      fsmref ! "log"
      val fsm = fsmref.underlyingActor
      import FSM.LogEntry
      expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log")))
      fsmref ! "count"
      fsmref ! "log"
      expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log"), LogEntry(1, 0, "count"), LogEntry(1, 1, "log")))
      fsmref ! "count"
      fsmref ! "log"
      expectMsg(1 second, IndexedSeq(LogEntry(1, 1, "log"), LogEntry(1, 1, "count"), LogEntry(1, 2, "log")))
    }

    "allow transforming of state results" in {
      import akka.actor.FSM._
      val fsmref = system.actorOf(Props(new Actor with FSM[Int, Int] {
        startWith(0, 0)
        when(0)(transform {
          case Event("go", _) ⇒ stay
        } using {
          case x ⇒ goto(1)
        })
        when(1) {
          case _ ⇒ stay
        }
      }))
      fsmref ! SubscribeTransitionCallBack(testActor)
      fsmref ! "go"
      expectMsg(CurrentState(fsmref, 0))
      expectMsg(Transition(fsmref, 0, 1))
    }

  }

}

Other Akka source code examples

Here is a short list of links related to this Akka FSMActorSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.