|
Akka/Scala example source code file (EventStreamSpec.scala)
The EventStreamSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event
import language.postfixOps
import scala.concurrent.duration._
import akka.actor._
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
import akka.event.Logging.InitializeLogger
import akka.pattern.gracefulStop
import akka.testkit.{ EventFilter, TestEvent, TestProbe, AkkaSpec }
object EventStreamSpec {
val config = ConfigFactory.parseString("""
akka {
actor.serialize-messages = off
stdout-loglevel = WARNING
loglevel = INFO
loggers = ["akka.event.EventStreamSpec$MyLog", "%s"]
}
""".format(Logging.StandardOutLogger.getClass.getName))
val configUnhandled = ConfigFactory.parseString("""
akka {
actor.serialize-messages = off
stdout-loglevel = WARNING
loglevel = WARNING
actor.debug.unhandled = on
}
""")
val configUnhandledWithDebug =
ConfigFactory.parseString("akka.actor.debug.event-stream = on")
.withFallback(configUnhandled)
final case class M(i: Int)
final case class SetTarget(ref: ActorRef)
class MyLog extends Actor {
var dst: ActorRef = context.system.deadLetters
def receive = {
case Logging.InitializeLogger(bus) ⇒
bus.subscribe(context.self, classOf[SetTarget])
bus.subscribe(context.self, classOf[UnhandledMessage])
sender() ! Logging.LoggerInitialized
case SetTarget(ref) ⇒ { dst = ref; dst ! "OK" }
case e: Logging.LogEvent ⇒ dst ! e
case u: UnhandledMessage ⇒ dst ! u
}
}
// class hierarchy for subchannel test
class A
class B1 extends A
class B2 extends A
class C extends B1
trait T
trait AT extends T
trait ATT extends AT
trait BT extends T
trait BTT extends BT
class CC
class CCATBT extends CC with ATT with BTT
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
import EventStreamSpec._
val impl = system.asInstanceOf[ActorSystemImpl]
"An EventStream" must {
"manage subscriptions" in {
//#event-bus-start-unsubscriber-scala
val bus = new EventStream(system, true)
bus.startUnsubscriber()
//#event-bus-start-unsubscriber-scala
bus.subscribe(testActor, classOf[M])
bus.publish(M(42))
within(1 second) {
expectMsg(M(42))
bus.unsubscribe(testActor)
bus.publish(M(13))
expectNoMsg
}
}
"not allow null as subscriber" in {
val bus = new EventStream(system, true)
intercept[IllegalArgumentException] { bus.subscribe(null, classOf[M]) }.getMessage should be("subscriber is null")
}
"not allow null as unsubscriber" in {
val bus = new EventStream(system, true)
intercept[IllegalArgumentException] { bus.unsubscribe(null, classOf[M]) }.getMessage should be("subscriber is null")
intercept[IllegalArgumentException] { bus.unsubscribe(null) }.getMessage should be("subscriber is null")
}
"be able to log unhandled messages" in {
val sys = ActorSystem("EventStreamSpecUnhandled", configUnhandled)
try {
sys.eventStream.subscribe(testActor, classOf[AnyRef])
val m = UnhandledMessage(42, sys.deadLetters, sys.deadLetters)
sys.eventStream.publish(m)
expectMsgAllOf(m, Logging.Debug(sys.deadLetters.path.toString, sys.deadLetters.getClass, "unhandled message from " + sys.deadLetters + ": 42"))
sys.eventStream.unsubscribe(testActor)
} finally {
shutdown(sys)
}
}
"manage log levels" in {
val bus = new EventStream(system, false)
bus.startDefaultLoggers(impl)
bus.publish(SetTarget(testActor))
expectMsg("OK")
within(2 seconds) {
import Logging._
verifyLevel(bus, InfoLevel)
bus.setLogLevel(WarningLevel)
verifyLevel(bus, WarningLevel)
bus.setLogLevel(DebugLevel)
verifyLevel(bus, DebugLevel)
bus.setLogLevel(ErrorLevel)
verifyLevel(bus, ErrorLevel)
}
}
"manage sub-channels using classes" in {
val a = new A
val b1 = new B1
val b2 = new B2
val c = new C
val bus = new EventStream(system, false)
within(2 seconds) {
bus.subscribe(testActor, classOf[B2]) should be(true)
bus.publish(c)
bus.publish(b2)
expectMsg(b2)
bus.subscribe(testActor, classOf[A]) should be(true)
bus.publish(c)
expectMsg(c)
bus.publish(b1)
expectMsg(b1)
bus.unsubscribe(testActor, classOf[B1]) should be(true)
bus.publish(c)
bus.publish(b2)
bus.publish(a)
expectMsg(b2)
expectMsg(a)
expectNoMsg
}
}
"manage sub-channels using classes and traits (update on subscribe)" in {
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) should be(true)
es.subscribe(a2.ref, classOf[BT]) should be(true)
es.subscribe(a3.ref, classOf[CC]) should be(true)
es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
es.publish(tm1)
es.publish(tm2)
a1.expectMsgType[AT] should be(tm2)
a2.expectMsgType[BT] should be(tm2)
a3.expectMsgType[CC] should be(tm1)
a3.expectMsgType[CC] should be(tm2)
a4.expectMsgType[CCATBT] should be(tm2)
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
es.unsubscribe(a3.ref, classOf[CC]) should be(true)
es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
}
"manage sub-channels using classes and traits (update on unsubscribe)" in {
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) should be(true)
es.subscribe(a2.ref, classOf[BT]) should be(true)
es.subscribe(a3.ref, classOf[CC]) should be(true)
es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
es.unsubscribe(a3.ref, classOf[CC]) should be(true)
es.publish(tm1)
es.publish(tm2)
a1.expectMsgType[AT] should be(tm2)
a2.expectMsgType[BT] should be(tm2)
a3.expectNoMsg(1 second)
a4.expectMsgType[CCATBT] should be(tm2)
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
}
"manage sub-channels using classes and traits (update on unsubscribe all)" in {
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3, a4 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) should be(true)
es.subscribe(a2.ref, classOf[BT]) should be(true)
es.subscribe(a3.ref, classOf[CC]) should be(true)
es.subscribe(a4.ref, classOf[CCATBT]) should be(true)
es.unsubscribe(a3.ref)
es.publish(tm1)
es.publish(tm2)
a1.expectMsgType[AT] should be(tm2)
a2.expectMsgType[BT] should be(tm2)
a3.expectNoMsg(1 second)
a4.expectMsgType[CCATBT] should be(tm2)
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
es.unsubscribe(a4.ref, classOf[CCATBT]) should be(true)
}
"manage sub-channels using classes and traits (update on publish)" in {
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) should be(true)
es.subscribe(a2.ref, classOf[BT]) should be(true)
es.publish(tm1)
es.publish(tm2)
a1.expectMsgType[AT] should be(tm2)
a2.expectMsgType[BT] should be(tm2)
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
}
"manage sub-channels using classes and traits (unsubscribe classes used with trait)" in {
val es = new EventStream(system, false)
val tm1 = new CC
val tm2 = new CCATBT
val a1, a2, a3 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) should be(true)
es.subscribe(a2.ref, classOf[BT]) should be(true)
es.subscribe(a2.ref, classOf[CC]) should be(true)
es.subscribe(a3.ref, classOf[CC]) should be(true)
es.unsubscribe(a2.ref, classOf[CC]) should be(true)
es.unsubscribe(a3.ref, classOf[CCATBT]) should be(true)
es.publish(tm1)
es.publish(tm2)
a1.expectMsgType[AT] should be(tm2)
a2.expectMsgType[BT] should be(tm2)
a3.expectMsgType[CC] should be(tm1)
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
es.unsubscribe(a2.ref, classOf[BT]) should be(true)
es.unsubscribe(a3.ref, classOf[CC]) should be(true)
}
"manage sub-channels using classes and traits (subscribe after publish)" in {
val es = new EventStream(system, false)
val tm1 = new CCATBT
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[AT]) should be(true)
es.publish(tm1)
a1.expectMsgType[AT] should be(tm1)
a2.expectNoMsg(1 second)
es.subscribe(a2.ref, classOf[BTT]) should be(true)
es.publish(tm1)
a1.expectMsgType[AT] should be(tm1)
a2.expectMsgType[BTT] should be(tm1)
es.unsubscribe(a1.ref, classOf[AT]) should be(true)
es.unsubscribe(a2.ref, classOf[BTT]) should be(true)
}
"unsubscribe an actor on its termination" in {
val sys = ActorSystem("EventStreamSpecUnsubscribeOnTerminated", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
val tm = new A
val target = sys.actorOf(Props(new Actor {
def receive = { case in ⇒ a1.ref forward in }
}), "to-be-killed")
es.subscribe(a2.ref, classOf[Any])
es.subscribe(target, classOf[A]) should be(true)
es.subscribe(target, classOf[A]) should be(false)
target ! PoisonPill
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
fishForDebugMessage(a2, s"unwatching $target")
es.publish(tm)
a1.expectNoMsg(1 second)
a2.expectMsg(tm)
} finally {
shutdown(sys)
}
}
"unsubscribe the actor, when it subscribes already in terminated state" in {
val sys = ActorSystem("EventStreamSpecUnsubscribeTerminated", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
val target = system.actorOf(Props(new Actor {
def receive = { case in ⇒ a1.ref forward in }
}), "to-be-killed")
watch(target)
target ! PoisonPill
expectTerminated(target)
es.subscribe(a2.ref, classOf[Any])
// target1 is Terminated; When subscribing, it will be unsubscribed by the Unsubscriber right away
es.subscribe(target, classOf[A]) should be(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
es.subscribe(target, classOf[A]) should be(true)
fishForDebugMessage(a2, s"unsubscribing $target from all channels")
} finally {
shutdown(sys)
}
}
"not allow initializing a TerminatedUnsubscriber twice" in {
val sys = ActorSystem("MustNotAllowDoubleInitOfTerminatedUnsubscriber", config)
// initializes an TerminatedUnsubscriber during start
try {
val es = sys.eventStream
val p = TestProbe()
val refWillBeUsedAsUnsubscriber = es.initUnsubscriber(p.ref)
refWillBeUsedAsUnsubscriber should equal(false)
} finally {
shutdown(sys)
}
}
"unwatch an actor from unsubscriber when that actor unsubscribes from the stream" in {
val sys = ActorSystem("MustUnregisterDuringUnsubscribe", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[Logging.Debug])
es.subscribe(a2.ref, classOf[A])
fishForDebugMessage(a1, s"watching ${a2.ref}")
es.unsubscribe(a2.ref)
fishForDebugMessage(a1, s"unwatching ${a2.ref}")
} finally {
shutdown(sys)
}
}
"unwatch an actor from unsubscriber when that actor unsubscribes from channels it subscribed" in {
val sys = ActorSystem("MustUnregisterWhenNoMoreChannelSubscriptions", configUnhandledWithDebug)
try {
val es = sys.eventStream
val a1, a2 = TestProbe()
es.subscribe(a1.ref, classOf[Logging.Debug])
es.subscribe(a2.ref, classOf[A])
es.subscribe(a2.ref, classOf[T])
fishForDebugMessage(a1, s"watching ${a2.ref}", 1 second)
fishForDebugMessage(a1, s"watching ${a2.ref}", 1 second) // the unsubscriber "starts to watch" each time, as watching is idempotent
es.unsubscribe(a2.ref, classOf[A]) should equal(true)
fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel class akka.event.EventStreamSpec$$A")
a1.expectNoMsg(1 second)
es.unsubscribe(a2.ref, classOf[T]) should equal(true)
fishForDebugMessage(a1, s"unsubscribing ${a2.ref} from channel interface akka.event.EventStreamSpec$$T")
fishForDebugMessage(a1, s"unwatching ${a2.ref}, since has no subscriptions")
a1.expectNoMsg(1 second)
es.unsubscribe(a2.ref, classOf[T]) should equal(false)
} finally {
shutdown(sys)
}
}
}
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
import Logging._
val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error"))
val msg = allmsg filter (_.level <= level)
allmsg foreach bus.publish
msg foreach (expectMsg(_))
}
private def fishForDebugMessage(a: TestProbe, messagePrefix: String, max: Duration = 3 seconds) {
a.fishForMessage(max, hint = "expected debug message prefix: " + messagePrefix) {
case Logging.Debug(_, _, msg: String) if msg startsWith messagePrefix ⇒ true
case other ⇒ false
}
}
}
Other Akka source code examplesHere is a short list of links related to this Akka EventStreamSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.