alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (EventBusSpec.scala)

This example Akka source code file (EventBusSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, actorsystem, akka, bustype, classifier, concurrent, duration, event, int, notification, procedure, subscriber, test, testing, time, unit

The EventBusSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.event

import language.postfixOps

import org.scalatest.BeforeAndAfterEach
import akka.testkit._
import scala.concurrent.duration._
import java.util.concurrent.atomic._

import akka.actor.{ Props, Actor, ActorRef, ActorSystem, PoisonPill, RootActorPath }
import akka.japi.{ Procedure, Function }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await

object EventBusSpec {
  class TestActorWrapperActor(testActor: ActorRef) extends Actor {
    def receive = {
      case x ⇒ testActor forward x
    }
  }
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class EventBusSpec(busName: String, conf: Config = ConfigFactory.empty()) extends AkkaSpec(conf) with BeforeAndAfterEach {
  import EventBusSpec._
  type BusType <: EventBus

  def createNewEventBus(): BusType

  def createEvents(numberOfEvents: Int): Iterable[BusType#Event]

  def createSubscriber(pipeTo: ActorRef): BusType#Subscriber

  def classifierFor(event: BusType#Event): BusType#Classifier

  def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit

  lazy val bus = createNewEventBus()

  busName must {
    def createNewSubscriber() = createSubscriber(testActor).asInstanceOf[bus.Subscriber]
    def getClassifierFor(event: BusType#Event) = classifierFor(event).asInstanceOf[bus.Classifier]
    def createNewEvents(numberOfEvents: Int): Iterable[bus.Event] = createEvents(numberOfEvents).asInstanceOf[Iterable[bus.Event]]

    val events = createNewEvents(100)
    val event = events.head
    val classifier = getClassifierFor(event)
    val subscriber = createNewSubscriber()

    "allow subscribers" in {
      bus.subscribe(subscriber, classifier) should be(true)
    }

    "allow to unsubscribe already existing subscriber" in {
      bus.unsubscribe(subscriber, classifier) should be(true)
    }

    "not allow to unsubscribe non-existing subscriber" in {
      val sub = createNewSubscriber()
      bus.unsubscribe(sub, classifier) should be(false)
      disposeSubscriber(system, sub)
    }

    "not allow for the same subscriber to subscribe to the same channel twice" in {
      bus.subscribe(subscriber, classifier) should be(true)
      bus.subscribe(subscriber, classifier) should be(false)
      bus.unsubscribe(subscriber, classifier) should be(true)
    }

    "not allow for the same subscriber to unsubscribe to the same channel twice" in {
      bus.subscribe(subscriber, classifier) should be(true)
      bus.unsubscribe(subscriber, classifier) should be(true)
      bus.unsubscribe(subscriber, classifier) should be(false)
    }

    "allow to add multiple subscribers" in {
      val subscribers = (1 to 10) map { _ ⇒ createNewSubscriber() }
      val events = createEvents(10)
      val classifiers = events map getClassifierFor
      subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.subscribe(s, c) } should be(true)
      subscribers.zip(classifiers) forall { case (s, c) ⇒ bus.unsubscribe(s, c) } should be(true)

      subscribers foreach (disposeSubscriber(system, _))
    }

    "publishing events without any subscribers shouldn't be a problem" in {
      bus.publish(event)
    }

    "publish the given event to the only subscriber" in {
      bus.subscribe(subscriber, classifier)
      bus.publish(event)
      expectMsg(event)
      expectNoMsg(1 second)
      bus.unsubscribe(subscriber, classifier)
    }

    "publish to the only subscriber multiple times" in {
      bus.subscribe(subscriber, classifier)
      bus.publish(event)
      bus.publish(event)
      bus.publish(event)
      expectMsg(event)
      expectMsg(event)
      expectMsg(event)
      expectNoMsg(1 second)
      bus.unsubscribe(subscriber, classifier)
    }

    "publish the given event to all intended subscribers" in {
      val range = 0 until 10
      val subscribers = range map (_ ⇒ createNewSubscriber())
      subscribers foreach { s ⇒ bus.subscribe(s, classifier) should be(true) }
      bus.publish(event)
      range foreach { _ ⇒ expectMsg(event) }
      subscribers foreach { s ⇒ bus.unsubscribe(s, classifier) should be(true); disposeSubscriber(system, s) }
    }

    "not publish the given event to any other subscribers than the intended ones" in {
      val otherSubscriber = createNewSubscriber()
      val otherClassifier = getClassifierFor(events.drop(1).head)
      bus.subscribe(subscriber, classifier)
      bus.subscribe(otherSubscriber, otherClassifier)
      bus.publish(event)
      expectMsg(event)
      bus.unsubscribe(subscriber, classifier)
      bus.unsubscribe(otherSubscriber, otherClassifier)
      expectNoMsg(1 second)
    }

    "not publish the given event to a former subscriber" in {
      bus.subscribe(subscriber, classifier)
      bus.unsubscribe(subscriber, classifier)
      bus.publish(event)
      expectNoMsg(1 second)
    }

    "cleanup subscriber" in {
      disposeSubscriber(system, subscriber)
    }
  }
}

object ActorEventBusSpec {
  class MyActorEventBus(protected val system: ActorSystem) extends ActorEventBus
    with ActorClassification with ActorClassifier {

    type Event = Notification

    def classify(event: Event) = event.ref
    protected def mapSize = 32
    def publish(event: Event, subscriber: Subscriber) = subscriber ! event
  }

  case class Notification(ref: ActorRef, payload: Int)
}

class ActorEventBusSpec(conf: Config) extends EventBusSpec("ActorEventBus", conf) {
  import akka.event.ActorEventBusSpec._
  import EventBusSpec.TestActorWrapperActor

  def this() {
    this(ConfigFactory.parseString("akka.actor.debug.event-stream = on").withFallback(AkkaSpec.testConf))
  }

  type BusType = MyActorEventBus
  def createNewEventBus(): BusType = new MyActorEventBus(system)

  // different actor in each event because we want each event to have a different classifier (see EventBusSpec tests)
  def createEvents(numberOfEvents: Int) = (0 until numberOfEvents).map(Notification(TestProbe().ref, _)).toSeq

  def createSubscriber(pipeTo: ActorRef) = system.actorOf(Props(new TestActorWrapperActor(pipeTo)))

  def classifierFor(event: BusType#Event) = event.ref

  def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = system.stop(subscriber)

  // ActorClassification specific tests

  "must unsubscribe subscriber when it terminates" in {
    val a1 = createSubscriber(system.deadLetters)
    val subs = createSubscriber(testActor)
    def m(i: Int) = Notification(a1, i)
    val p = TestProbe()
    system.eventStream.subscribe(p.ref, classOf[Logging.Debug])

    bus.subscribe(subs, a1)
    bus.publish(m(1))
    expectMsg(m(1))

    watch(subs)
    subs ! PoisonPill // when a1 dies, subs has nothing subscribed
    expectTerminated(subs)
    expectUnsubscribedByUnsubscriber(p, subs)

    bus.publish(m(2))
    expectNoMsg(1 second)

    disposeSubscriber(system, subs)
    disposeSubscriber(system, a1)
  }

  "must keep subscriber even if its subscription-actors have died" in {
    // Deaths of monitored actors should not influence the subscription.
    // For example: one might still want to monitor messages classified to A
    // even though it died, and handle these in some way.
    val a1 = createSubscriber(system.deadLetters)
    val subs = createSubscriber(testActor)
    def m(i: Int) = Notification(a1, i)

    bus.subscribe(subs, a1) should equal(true)

    bus.publish(m(1))
    expectMsg(m(1))

    watch(a1)
    a1 ! PoisonPill
    expectTerminated(a1)

    bus.publish(m(2)) // even though a1 has terminated, classification still applies
    expectMsg(m(2))

    disposeSubscriber(system, subs)
    disposeSubscriber(system, a1)
  }

  "must unregister subscriber only after it unsubscribes from all of it's subscriptions" in {
    val a1, a2 = createSubscriber(system.deadLetters)
    val subs = createSubscriber(testActor)
    def m1(i: Int) = Notification(a1, i)
    def m2(i: Int) = Notification(a2, i)

    val p = TestProbe()
    system.eventStream.subscribe(p.ref, classOf[Logging.Debug])

    bus.subscribe(subs, a1) should equal(true)
    bus.subscribe(subs, a2) should equal(true)

    bus.publish(m1(1))
    bus.publish(m2(1))
    expectMsg(m1(1))
    expectMsg(m2(1))

    bus.unsubscribe(subs, a1)
    bus.publish(m1(2))
    expectNoMsg(1 second)
    bus.publish(m2(2))
    expectMsg(m2(2))

    bus.unsubscribe(subs, a2)
    expectUnregisterFromUnsubscriber(p, subs)
    bus.publish(m1(3))
    bus.publish(m2(3))
    expectNoMsg(1 second)

    disposeSubscriber(system, subs)
    disposeSubscriber(system, a1)
    disposeSubscriber(system, a2)
  }

  private def expectUnsubscribedByUnsubscriber(p: TestProbe, a: ActorRef) {
    val expectedMsg = s"actor $a has terminated, unsubscribing it from $bus"
    p.fishForMessage(1 second, hint = expectedMsg) {
      case Logging.Debug(_, _, msg) if msg equals expectedMsg ⇒ true
      case other ⇒ false
    }
  }

  private def expectUnregisterFromUnsubscriber(p: TestProbe, a: ActorRef) {
    val expectedMsg = s"unregistered watch of $a in $bus"
    p.fishForMessage(1 second, hint = expectedMsg) {
      case Logging.Debug(_, _, msg) if msg equals expectedMsg ⇒ true
      case other ⇒ false
    }
  }
}

object ScanningEventBusSpec {

  class MyScanningEventBus extends EventBus with ScanningClassification {
    type Event = Int
    type Subscriber = Procedure[Int]
    type Classifier = String

    protected def compareClassifiers(a: Classifier, b: Classifier): Int = a compareTo b
    protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = akka.util.Helpers.compareIdentityHash(a, b)

    protected def matches(classifier: Classifier, event: Event): Boolean = event.toString == classifier

    protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber(event)
  }
}

class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
  import ScanningEventBusSpec._

  type BusType = MyScanningEventBus

  def createNewEventBus(): BusType = new MyScanningEventBus

  def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)

  def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }

  def classifierFor(event: BusType#Event) = event.toString

  def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = ()
}

object LookupEventBusSpec {
  class MyLookupEventBus extends EventBus with LookupClassification {
    type Event = Int
    type Subscriber = Procedure[Int]
    type Classifier = String

    override protected def classify(event: Int): String = event.toString
    override protected def compareSubscribers(a: Procedure[Int], b: Procedure[Int]): Int =
      akka.util.Helpers.compareIdentityHash(a, b)
    override protected def mapSize = 32
    override protected def publish(event: Int, subscriber: Procedure[Int]): Unit =
      subscriber(event)
  }
}

class LookupEventBusSpec extends EventBusSpec("LookupEventBus") {
  import LookupEventBusSpec._

  type BusType = MyLookupEventBus

  def createNewEventBus(): BusType = new MyLookupEventBus

  def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)

  def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }

  def classifierFor(event: BusType#Event) = event.toString

  def disposeSubscriber(system: ActorSystem, subscriber: BusType#Subscriber): Unit = ()
}

Other Akka source code examples

Here is a short list of links related to this Akka EventBusSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.