|
Akka/Scala example source code file (EventBusSpec.scala)
The EventBusSpec.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.event import language.postfixOps import org.scalatest.BeforeAndAfterEach import akka.testkit._ import scala.concurrent.duration._ import java.util.concurrent.atomic._ import akka.actor.{ Props, Actor, ActorRef, ActorSystem, PoisonPill, RootActorPath } import akka.japi.{ Procedure, Function } import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.Await object EventBusSpec { class TestActorWrapperActor(testActor: ActorRef) extends Actor { def receive = { case x ⇒ testActor forward x } } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class EventBusSpec(busName: String, conf: Config = ConfigFactory.empty()) extends AkkaSpec(conf) with BeforeAndAfterEach { import EventBusSpec._ type BusType <: EventBus def createNewEventBus(): BusType def createEvents(numberOfEvents: Int): Iterable[BusType#Event] def createSubscriber(pipeTo: ActorRef): BusType#Subscriber def classifierFor(event: BusType#Event): BusType#Classifier def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit lazy val bus = createNewEventBus() busName must { def createNewSubscriber() = createSubscriber(testActor).asInstanceOf[bus.Subscriber] def getClassifierFor(event: BusType#Event) = classifierFor(event).asInstanceOf[bus.Classifier] def createNewEvents(numberOfEvents: Int): Iterable[bus.Event] = createEvents(numberOfEvents).asInstanceOf[Iterable[bus.Event]] val events = createNewEvents(100) val event = events.head val classifier = getClassifierFor(event) val subscriber = createNewSubscriber() "allow subscribers" in { bus.subscribe(subscriber, classifier) should be(true) } "allow to unsubscribe already existing subscriber" in { bus.unsubscribe(subscriber, classifier) should be(true) } "not allow to unsubscribe non-existing subscriber" in { val sub = createNewSubscriber() bus.unsubscribe(sub, classifier) should be(false) disposeSubscriber(system, sub) } "not allow for the same subscriber to subscribe to the same channel twice" in { bus.subscribe(subscriber, classifier) should be(true) bus.subscribe(subscriber, classifier) should be(false) bus.unsubscribe(subscriber, classifier) should be(true) } "not allow for the same subscriber to unsubscribe to the same channel twice" in { bus.subscribe(subscriber, classifier) should be(true) bus.unsubscribe(subscriber, classifier) should be(true) bus.unsubscribe(subscriber, classifier) should be(false) } "allow to add multiple subscribers" in { val subscribers = (1 to 10) map { _ ⇒ createNewSubscriber() } val events = createEvents(10) val classifiers = events map getClassifierFor subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.subscribe(s, c) } should be(true) subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.unsubscribe(s, c) } should be(true) subscribers foreach (disposeSubscriber(system, _)) } "publishing events without any subscribers shouldn't be a problem" in { bus.publish(event) } "publish the given event to the only subscriber" in { bus.subscribe(subscriber, classifier) bus.publish(event) expectMsg(event) expectNoMsg(1 second) bus.unsubscribe(subscriber, classifier) } "publish to the only subscriber multiple times" in { bus.subscribe(subscriber, classifier) bus.publish(event) bus.publish(event) bus.publish(event) expectMsg(event) expectMsg(event) expectMsg(event) expectNoMsg(1 second) bus.unsubscribe(subscriber, classifier) } "publish the given event to all intended subscribers" in { val range = 0 until 10 val subscribers = range map (_ ⇒ createNewSubscriber()) subscribers foreach { s ⇒ bus.subscribe(s, classifier) should be(true) } bus.publish(event) range foreach { _ ⇒ expectMsg(event) } subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) should be(true); disposeSubscriber(system, s) } } "not publish the given event to any other subscribers than the intended ones" in { val otherSubscriber = createNewSubscriber() val otherClassifier = getClassifierFor(events.drop(1).head) bus.subscribe(subscriber, classifier) bus.subscribe(otherSubscriber, otherClassifier) bus.publish(event) expectMsg(event) bus.unsubscribe(subscriber, classifier) bus.unsubscribe(otherSubscriber, otherClassifier) expectNoMsg(1 second) } "not publish the given event to a former subscriber" in { bus.subscribe(subscriber, classifier) bus.unsubscribe(subscriber, classifier) bus.publish(event) expectNoMsg(1 second) } "cleanup subscriber" in { disposeSubscriber(system, subscriber) } } } object ActorEventBusSpec { class MyActorEventBus(protected val system: ActorSystem) extends ActorEventBus with ActorClassification with ActorClassifier { type Event = Notification def classify(event: Event) = event.ref protected def mapSize = 32 def publish(event: Event, subscriber: Subscriber) = subscriber ! event } case class Notification(ref: ActorRef, payload: Int) } class ActorEventBusSpec(conf: Config) extends EventBusSpec("ActorEventBus", conf) { import akka.event.ActorEventBusSpec._ import EventBusSpec.TestActorWrapperActor def this() { this(ConfigFactory.parseString("akka.actor.debug.event-stream = on").withFallback(AkkaSpec.testConf)) } type BusType = MyActorEventBus def createNewEventBus(): BusType = new MyActorEventBus(system) // different actor in each event because we want each event to have a different classifier (see EventBusSpec tests) def createEvents(numberOfEvents: Int) = (0 until numberOfEvents).map(Notification(TestProbe().ref, _)).toSeq def createSubscriber(pipeTo: ActorRef) = system.actorOf(Props(new TestActorWrapperActor(pipeTo))) def classifierFor(event: BusType#Event) = event.ref def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber) // ActorClassification specific tests "must unsubscribe subscriber when it terminates" in { val a1 = createSubscriber(system.deadLetters) val subs = createSubscriber(testActor) def m(i: Int) = Notification(a1, i) val p = TestProbe() system.eventStream.subscribe(p.ref, classOf[Logging.Debug]) bus.subscribe(subs, a1) bus.publish(m(1)) expectMsg(m(1)) watch(subs) subs ! PoisonPill // when a1 dies, subs has nothing subscribed expectTerminated(subs) expectUnsubscribedByUnsubscriber(p, subs) bus.publish(m(2)) expectNoMsg(1 second) disposeSubscriber(system, subs) disposeSubscriber(system, a1) } "must keep subscriber even if its subscription-actors have died" in { // Deaths of monitored actors should not influence the subscription. // For example: one might still want to monitor messages classified to A // even though it died, and handle these in some way. val a1 = createSubscriber(system.deadLetters) val subs = createSubscriber(testActor) def m(i: Int) = Notification(a1, i) bus.subscribe(subs, a1) should equal(true) bus.publish(m(1)) expectMsg(m(1)) watch(a1) a1 ! PoisonPill expectTerminated(a1) bus.publish(m(2)) // even though a1 has terminated, classification still applies expectMsg(m(2)) disposeSubscriber(system, subs) disposeSubscriber(system, a1) } "must unregister subscriber only after it unsubscribes from all of it's subscriptions" in { val a1, a2 = createSubscriber(system.deadLetters) val subs = createSubscriber(testActor) def m1(i: Int) = Notification(a1, i) def m2(i: Int) = Notification(a2, i) val p = TestProbe() system.eventStream.subscribe(p.ref, classOf[Logging.Debug]) bus.subscribe(subs, a1) should equal(true) bus.subscribe(subs, a2) should equal(true) bus.publish(m1(1)) bus.publish(m2(1)) expectMsg(m1(1)) expectMsg(m2(1)) bus.unsubscribe(subs, a1) bus.publish(m1(2)) expectNoMsg(1 second) bus.publish(m2(2)) expectMsg(m2(2)) bus.unsubscribe(subs, a2) expectUnregisterFromUnsubscriber(p, subs) bus.publish(m1(3)) bus.publish(m2(3)) expectNoMsg(1 second) disposeSubscriber(system, subs) disposeSubscriber(system, a1) disposeSubscriber(system, a2) } private def expectUnsubscribedByUnsubscriber(p: TestProbe, a: ActorRef) { val expectedMsg = s"actor $a has terminated, unsubscribing it from $bus" p.fishForMessage(1 second, hint = expectedMsg) { case Logging.Debug(_, _, msg) if msg equals expectedMsg ⇒ true case other ⇒ false } } private def expectUnregisterFromUnsubscriber(p: TestProbe, a: ActorRef) { val expectedMsg = s"unregistered watch of $a in $bus" p.fishForMessage(1 second, hint = expectedMsg) { case Logging.Debug(_, _, msg) if msg equals expectedMsg ⇒ true case other ⇒ false } } } object ScanningEventBusSpec { class MyScanningEventBus extends EventBus with ScanningClassification { type Event = Int type Subscriber = Procedure[Int] type Classifier = String protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b) protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event) } } class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") { import ScanningEventBusSpec._ type BusType = MyScanningEventBus def createNewEventBus(): BusType = new MyScanningEventBus def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i } def classifierFor(event: BusType#Event) = event.toString def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = () } object LookupEventBusSpec { class MyLookupEventBus extends EventBus with LookupClassification { type Event = Int type Subscriber = Procedure[Int] type Classifier = String override protected def classify(event: Int): String = event.toString override protected def compareSubscribers(a: Procedure[Int], b: Procedure[Int]): Int = akka.util.Helpers.compareIdentityHash(a, b) override protected def mapSize = 32 override protected def publish(event: Int, subscriber: Procedure[Int]): Unit = subscriber(event) } } class LookupEventBusSpec extends EventBusSpec("LookupEventBus") { import LookupEventBusSpec._ type BusType = MyLookupEventBus def createNewEventBus(): BusType = new MyLookupEventBus def createEvents(numberOfEvents: Int) = (0 until numberOfEvents) def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i } def classifierFor(event: BusType#Event) = event.toString def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = () } Other Akka source code examplesHere is a short list of links related to this Akka EventBusSpec.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.