alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (EventStreamSpec.scala)

This example Akka source code file (EventStreamSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

a, actor, actorsystem, akka, b1, cc, ccatbt, concurrent, duration, eventstream, t, test, testing, testprobe, time, warning

The EventStreamSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.event

import language.postfixOps

import scala.concurrent.duration._
import akka.actor._
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
import akka.event.Logging.InitializeLogger
import akka.pattern.gracefulStop
import akka.testkit.{ EventFilter, TestEvent, TestProbe, AkkaSpec }

object EventStreamSpec {

  val config = ConfigFactory.parseString("""
      akka {
        actor.serialize-messages = off
        stdout-loglevel = WARNING
        loglevel = INFO
        loggers = ["akka.event.EventStreamSpec$MyLog", "%s"]
      }
      """.format(Logging.StandardOutLogger.getClass.getName))

  val configUnhandled = ConfigFactory.parseString("""
      akka {
        actor.serialize-messages = off
        stdout-loglevel = WARNING
        loglevel = WARNING
        actor.debug.unhandled = on
      }
      """)

  val configUnhandledWithDebug =
    ConfigFactory.parseString("akka.actor.debug.event-stream = on")
      .withFallback(configUnhandled)

  final case class M(i: Int)

  final case class SetTarget(ref: ActorRef)

  class MyLog extends Actor {
    var dst: ActorRef = context.system.deadLetters
    def receive = {
      case Logging.InitializeLogger(bus) ⇒
        bus.subscribe(context.self, classOf[SetTarget])
        bus.subscribe(context.self, classOf[UnhandledMessage])
        sender() ! Logging.LoggerInitialized
      case SetTarget(ref)      ⇒ { dst = ref; dst ! "OK" }
      case e: Logging.LogEvent ⇒ dst ! e
      case u: UnhandledMessage ⇒ dst ! u
    }
  }

  // class hierarchy for subchannel test
  class A
  class B1 extends A
  class B2 extends A
  class C extends B1

  trait T
  trait AT extends T
  trait ATT extends AT
  trait BT extends T
  trait BTT extends BT
  class CC
  class CCATBT extends CC with ATT with BTT
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {

  import EventStreamSpec._

  val impl = system.asInstanceOf[ActorSystemImpl]

  "An EventStream" must {

    "manage subscriptions" in {
      //#event-bus-start-unsubscriber-scala
      val bus = new EventStream(system, true)
      bus.startUnsubscriber()
      //#event-bus-start-unsubscriber-scala

      bus.subscribe(testActor, classOf[M])
      bus.publish(M(42))
      within(1 second) {
        expectMsg(M(42))
        bus.unsubscribe(testActor)
        bus.publish(M(13))
        expectNoMsg
      }
    }

    "not allow null as subscriber" in {
      val bus = new EventStream(system, true)
      intercept[IllegalArgumentException] { bus.subscribe(null, classOf[M]) }.getMessage should be("subscriber is null")
    }

    "not allow null as unsubscriber" in {
      val bus = new EventStream(system, true)
      intercept[IllegalArgumentException] { bus.unsubscribe(null, classOf[M]) }.getMessage should be("subscriber is null")
      intercept[IllegalArgumentException] { bus.unsubscribe(null) }.getMessage should be("subscriber is null")
    }

    "be able to log unhandled messages" in {
      val sys = ActorSystem("EventStreamSpecUnhandled", configUnhandled)
      try {
        sys.eventStream.subscribe(testActor, classOf[AnyRef])
        val m = UnhandledMessage(42, sys.deadLetters, sys.deadLetters)
        sys.eventStream.publish(m)
        expectMsgAllOf(m, Logging.Debug(sys.deadLetters.path.toString, sys.deadLetters.getClass, "unhandled message from " + sys.deadLetters + ": 42"))
        sys.eventStream.unsubscribe(testActor)
      } finally {
        shutdown(sys)
      }
    }

    "manage log levels" in {
      val bus = new EventStream(system, false)
      bus.startDefaultLoggers(impl)
      bus.publish(SetTarget(testActor))
      expectMsg("OK")
      within(2 seconds) {
        import Logging._
        verifyLevel(bus, InfoLevel)
        bus.setLogLevel(WarningLevel)
        verifyLevel(bus, WarningLevel)
        bus.setLogLevel(DebugLevel)
        verifyLevel(bus, DebugLevel)
        bus.setLogLevel(ErrorLevel)
        verifyLevel(bus, ErrorLevel)
      }
    }

    "manage sub-channels using classes" in {
      val a = new A
      val b1 = new B1
      val b2 = new B2
      val c = new C
      val bus = new EventStream(system, false)
      within(2 seconds) {
        bus.subscribe(testActor, classOf[B2]) should be(true)
        bus.publish(c)
        bus.publish(b2)
        expectMsg(b2)
        bus.subscribe(testActor, classOf[A]) should be(true)
        bus.publish(c)
        expectMsg(c)
        bus.publish(b1)
        expectMsg(b1)
        bus.unsubscribe(testActor, classOf[B1]) should be(true)
        bus.publish(c)
        bus.publish(b2)
        bus.publish(a)
        expectMsg(b2)
        expectMsg(a)
        expectNoMsg
      }
    }

    "manage sub-channels using classes and traits (update on subscribe)" in {
      val es = new EventStream(system, false)
      val tm1 = new CC
      val tm2 = new CCATBT
      val a1, a2, a3, a4 = TestProbe()

      es.subscribe(a1.ref, classOf[AT]) should be(true)
      es.subscribe(a2.ref, classOf[BT]) should be(true)
      es.subscribe(a3.ref, classOf[CC]) should be(true)
      es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
      es.publish(tm1)
      es.publish(tm2)
      a1.expectMsgType[AT] should be(tm2)
      a2.expectMsgType[BT] should be(tm2)
      a3.expectMsgType[CC] should be(tm1)
      a3.expectMsgType[CC] should be(tm2)
      a4.expectMsgType[CCATBT] should be(tm2)
      es.unsubscribe(a1.ref, classOf[AT]) should be(true)
      es.unsubscribe(a2.ref, classOf[BT]) should be(true)
      es.unsubscribe(a3.ref, classOf[CC]) should be(true)
      es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
    }

    "manage sub-channels using classes and traits (update on unsubscribe)" in {
      val es = new EventStream(system, false)
      val tm1 = new CC
      val tm2 = new CCATBT
      val a1, a2, a3, a4 = TestProbe()

      es.subscribe(a1.ref, classOf[AT]) should be(true)
      es.subscribe(a2.ref, classOf[BT]) should be(true)
      es.subscribe(a3.ref, classOf[CC]) should be(true)
      es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
      es.unsubscribe(a3.ref, classOf[CC]) should be(true)
      es.publish(tm1)
      es.publish(tm2)
      a1.expectMsgType[AT] should be(tm2)
      a2.expectMsgType[BT] should be(tm2)
      a3.expectNoMsg(1 second)
      a4.expectMsgType[CCATBT] should be(tm2)
      es.unsubscribe(a1.ref, classOf[AT]) should be(true)
      es.unsubscribe(a2.ref, classOf[BT]) should be(true)
      es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
    }

    "manage sub-channels using classes and traits (update on unsubscribe all)" in {
      val es = new EventStream(system, false)
      val tm1 = new CC
      val tm2 = new CCATBT
      val a1, a2, a3, a4 = TestProbe()

      es.subscribe(a1.ref, classOf[AT]) should be(true)
      es.subscribe(a2.ref, classOf[BT]) should be(true)
      es.subscribe(a3.ref, classOf[CC]) should be(true)
      es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
      es.unsubscribe(a3.ref)
      es.publish(tm1)
      es.publish(tm2)
      a1.expectMsgType[AT] should be(tm2)
      a2.expectMsgType[BT] should be(tm2)
      a3.expectNoMsg(1 second)
      a4.expectMsgType[CCATBT] should be(tm2)
      es.unsubscribe(a1.ref, classOf[AT]) should be(true)
      es.unsubscribe(a2.ref, classOf[BT]) should be(true)
      es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
    }

    "manage sub-channels using classes and traits (update on publish)" in {
      val es = new EventStream(system, false)
      val tm1 = new CC
      val tm2 = new CCATBT
      val a1, a2 = TestProbe()

      es.subscribe(a1.ref, classOf[AT]) should be(true)
      es.subscribe(a2.ref, classOf[BT]) should be(true)
      es.publish(tm1)
      es.publish(tm2)
      a1.expectMsgType[AT] should be(tm2)
      a2.expectMsgType[BT] should be(tm2)
      es.unsubscribe(a1.ref, classOf[AT]) should be(true)
      es.unsubscribe(a2.ref, classOf[BT]) should be(true)
    }

    "manage sub-channels using classes and traits (unsubscribe classes used with trait)" in {
      val es = new EventStream(system, false)
      val tm1 = new CC
      val tm2 = new CCATBT
      val a1, a2, a3 = TestProbe()

      es.subscribe(a1.ref, classOf[AT]) should be(true)
      es.subscribe(a2.ref, classOf[BT]) should be(true)
      es.subscribe(a2.ref, classOf[CC]) should be(true)
      es.subscribe(a3.ref, classOf[CC]) should be(true)
      es.unsubscribe(a2.ref, classOf[CC]) should be(true)
      es.unsubscribe(a3.ref, classOf[CCATBT]) should be(true)
      es.publish(tm1)
      es.publish(tm2)
      a1.expectMsgType[AT] should be(tm2)
      a2.expectMsgType[BT] should be(tm2)
      a3.expectMsgType[CC] should be(tm1)
      es.unsubscribe(a1.ref, classOf[AT]) should be(true)
      es.unsubscribe(a2.ref, classOf[BT]) should be(true)
      es.unsubscribe(a3.ref, classOf[CC]) should be(true)
    }

    "manage sub-channels using classes and traits (subscribe after publish)" in {
      val es = new EventStream(system, false)
      val tm1 = new CCATBT
      val a1, a2 = TestProbe()

      es.subscribe(a1.ref, classOf[AT]) should be(true)
      es.publish(tm1)
      a1.expectMsgType[AT] should be(tm1)
      a2.expectNoMsg(1 second)
      es.subscribe(a2.ref, classOf[BTT]) should be(true)
      es.publish(tm1)
      a1.expectMsgType[AT] should be(tm1)
      a2.expectMsgType[BTT] should be(tm1)
      es.unsubscribe(a1.ref, classOf[AT]) should be(true)
      es.unsubscribe(a2.ref, classOf[BTT]) should be(true)
    }

    "unsubscribe an actor on its termination" in {
      val sys = ActorSystem("EventStreamSpecUnsubscribeOnTerminated", configUnhandledWithDebug)

      try {
        val es = sys.eventStream
        val a1, a2 = TestProbe()
        val tm = new A

        val target = sys.actorOf(Props(new Actor {
          def receive = { case in ⇒ a1.ref forward in }
        }), "to-be-killed")

        es.subscribe(a2.ref, classOf[Any])
        es.subscribe(target, classOf[A]) should be(true)
        es.subscribe(target, classOf[A]) should be(false)

        target ! PoisonPill
        fishForDebugMessage(a2, s"unsubscribing $target from all channels")
        fishForDebugMessage(a2, s"unwatching $target")

        es.publish(tm)

        a1.expectNoMsg(1 second)
        a2.expectMsg(tm)
      } finally {
        shutdown(sys)
      }
    }

    "unsubscribe the actor, when it subscribes already in terminated state" in {
      val sys = ActorSystem("EventStreamSpecUnsubscribeTerminated", configUnhandledWithDebug)

      try {
        val es = sys.eventStream
        val a1, a2 = TestProbe()

        val target = system.actorOf(Props(new Actor {
          def receive = { case in ⇒ a1.ref forward in }
        }), "to-be-killed")

        watch(target)
        target ! PoisonPill
        expectTerminated(target)

        es.subscribe(a2.ref, classOf[Any])

        // target1 is Terminated; When subscribing, it will be unsubscribed by the Unsubscriber right away
        es.subscribe(target, classOf[A]) should be(true)
        fishForDebugMessage(a2, s"unsubscribing $target from all channels")

        es.subscribe(target, classOf[A]) should be(true)
        fishForDebugMessage(a2, s"unsubscribing $target from all channels")
      } finally {
        shutdown(sys)
      }
    }

    "not allow initializing a TerminatedUnsubscriber twice" in {
      val sys = ActorSystem("MustNotAllowDoubleInitOfTerminatedUnsubscriber", config)
      // initializes an TerminatedUnsubscriber during start

      try {
        val es = sys.eventStream
        val p = TestProbe()

        val refWillBeUsedAsUnsubscriber = es.initUnsubscriber(p.ref)

        refWillBeUsedAsUnsubscriber should equal(false)

      } finally {
        shutdown(sys)
      }
    }

    "unwatch an actor from unsubscriber when that actor unsubscribes from the stream" in {
      val sys = ActorSystem("MustUnregisterDuringUnsubscribe", configUnhandledWithDebug)

      try {
        val es = sys.eventStream
        val a1, a2 = TestProbe()

        es.subscribe(a1.ref, classOf[Logging.Debug])

        es.subscribe(a2.ref, classOf[A])
        fishForDebugMessage(a1, s"watching ${a2.ref}")

        es.unsubscribe(a2.ref)
        fishForDebugMessage(a1, s"unwatching ${a2.ref}")

      } finally {
        shutdown(sys)
      }
    }

    "unwatch an actor from unsubscriber when that actor unsubscribes from channels it subscribed" in {
      val sys = ActorSystem("MustUnregisterWhenNoMoreChannelSubscriptions", configUnhandledWithDebug)

      try {
        val es = sys.eventStream
        val a1, a2 = TestProbe()

        es.subscribe(a1.ref, classOf[Logging.Debug])

        es.subscribe(a2.ref, classOf[A])
        es.subscribe(a2.ref, classOf[T])
        fishForDebugMessage(a1, s"watching ${a2.ref}", 1 second)
        fishForDebugMessage(a1, s"watching ${a2.ref}", 1 second) // the unsubscriber "starts to watch" each time, as watching is idempotent

        es.unsubscribe(a2.ref, classOf[A]) should equal(true)
        fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel class akka.event.EventStreamSpec$$A")
        a1.expectNoMsg(1 second)

        es.unsubscribe(a2.ref, classOf[T]) should equal(true)
        fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel interface akka.event.EventStreamSpec$$T")
        fishForDebugMessage(a1, s"unwatching ${a2.ref}, since has no subscriptions")
        a1.expectNoMsg(1 second)

        es.unsubscribe(a2.ref, classOf[T]) should equal(false)

      } finally {
        shutdown(sys)
      }
    }

  }

  private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
    import Logging._
    val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error"))
    val msg = allmsg filter (_.level <= level)
    allmsg foreach bus.publish
    msg foreach (expectMsg(_))
  }

  private def fishForDebugMessage(a: TestProbe, messagePrefix: String, max: Duration = 3 seconds) {
    a.fishForMessage(max, hint = "expected debug message prefix: " + messagePrefix) {
      case Logging.Debug(_, _, msg: String) if msg startsWith messagePrefix ⇒ true
      case other ⇒ false
    }
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka EventStreamSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.