alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (EventBusDocSpec.scala)

This example Akka source code file (EventBusDocSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, akka, boolean, classifier, concurrent, event, eventbus, int, msgenvelope, string, subscriber, test, testing, testkit, time, unit

The EventBusDocSpec.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package docs.event

import scala.concurrent.duration._
import akka.testkit.AkkaSpec
import akka.actor.{ ActorSystem, ActorRef }
import akka.testkit.TestProbe

object EventBusDocSpec {

  //#lookup-bus
  import akka.event.EventBus
  import akka.event.LookupClassification

  final case class MsgEnvelope(topic: String, payload: Any)

  /**
   * Publishes the payload of the MsgEnvelope when the topic of the
   * MsgEnvelope equals the String specified when subscribing.
   */
  class LookupBusImpl extends EventBus with LookupClassification {
    type Event = MsgEnvelope
    type Classifier = String
    type Subscriber = ActorRef

    // is used for extracting the classifier from the incoming events  
    override protected def classify(event: Event): Classifier = event.topic

    // will be invoked for each event for all subscribers which registered themselves
    // for the event’s classifier
    override protected def publish(event: Event, subscriber: Subscriber): Unit = {
      subscriber ! event.payload
    }

    // must define a full order over the subscribers, expressed as expected from
    // `java.lang.Comparable.compare`
    override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
      a.compareTo(b)

    // determines the initial size of the index data structure
    // used internally (i.e. the expected number of different classifiers)
    override protected def mapSize: Int = 128

  }

  //#lookup-bus

  //#subchannel-bus
  import akka.util.Subclassification

  class StartsWithSubclassification extends Subclassification[String] {
    override def isEqual(x: String, y: String): Boolean =
      x == y

    override def isSubclass(x: String, y: String): Boolean =
      x.startsWith(y)
  }

  import akka.event.SubchannelClassification

  /**
   * Publishes the payload of the MsgEnvelope when the topic of the
   * MsgEnvelope starts with the String specified when subscribing.
   */
  class SubchannelBusImpl extends EventBus with SubchannelClassification {
    type Event = MsgEnvelope
    type Classifier = String
    type Subscriber = ActorRef

    // Subclassification is an object providing `isEqual` and `isSubclass`
    // to be consumed by the other methods of this classifier
    override protected val subclassification: Subclassification[Classifier] =
      new StartsWithSubclassification

    // is used for extracting the classifier from the incoming events  
    override protected def classify(event: Event): Classifier = event.topic

    // will be invoked for each event for all subscribers which registered
    // themselves for the event’s classifier
    override protected def publish(event: Event, subscriber: Subscriber): Unit = {
      subscriber ! event.payload
    }
  }
  //#subchannel-bus

  //#scanning-bus
  import akka.event.ScanningClassification

  /**
   * Publishes String messages with length less than or equal to the length
   * specified when subscribing.
   */
  class ScanningBusImpl extends EventBus with ScanningClassification {
    type Event = String
    type Classifier = Int
    type Subscriber = ActorRef

    // is needed for determining matching classifiers and storing them in an
    // ordered collection
    override protected def compareClassifiers(a: Classifier, b: Classifier): Int =
      if (a < b) -1 else if (a == b) 0 else 1

    // is needed for storing subscribers in an ordered collection  
    override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
      a.compareTo(b)

    // determines whether a given classifier shall match a given event; it is invoked
    // for each subscription for all received events, hence the name of the classifier
    override protected def matches(classifier: Classifier, event: Event): Boolean =
      event.length <= classifier

    // will be invoked for each event for all subscribers which registered themselves
    // for a classifier matching this event
    override protected def publish(event: Event, subscriber: Subscriber): Unit = {
      subscriber ! event
    }
  }
  //#scanning-bus

  //#actor-bus
  import akka.event.ActorEventBus
  import akka.event.ActorClassification
  import akka.event.ActorClassifier

  final case class Notification(ref: ActorRef, id: Int)

  class ActorBusImpl(val system: ActorSystem) extends ActorEventBus with ActorClassifier with ActorClassification {
    type Event = Notification

    // is used for extracting the classifier from the incoming events
    override protected def classify(event: Event): ActorRef = event.ref

    // determines the initial size of the index data structure
    // used internally (i.e. the expected number of different classifiers)
    override protected def mapSize: Int = 128
  }
  //#actor-bus

}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class EventBusDocSpec extends AkkaSpec {
  import EventBusDocSpec._

  "demonstrate LookupClassification" in {
    //#lookup-bus-test
    val lookupBus = new LookupBusImpl
    lookupBus.subscribe(testActor, "greetings")
    lookupBus.publish(MsgEnvelope("time", System.currentTimeMillis()))
    lookupBus.publish(MsgEnvelope("greetings", "hello"))
    expectMsg("hello")
    //#lookup-bus-test
  }

  "demonstrate SubchannelClassification" in {
    //#subchannel-bus-test
    val subchannelBus = new SubchannelBusImpl
    subchannelBus.subscribe(testActor, "abc")
    subchannelBus.publish(MsgEnvelope("xyzabc", "x"))
    subchannelBus.publish(MsgEnvelope("bcdef", "b"))
    subchannelBus.publish(MsgEnvelope("abc", "c"))
    expectMsg("c")
    subchannelBus.publish(MsgEnvelope("abcdef", "d"))
    expectMsg("d")
    //#subchannel-bus-test
  }

  "demonstrate ScanningClassification" in {
    //#scanning-bus-test
    val scanningBus = new ScanningBusImpl
    scanningBus.subscribe(testActor, 3)
    scanningBus.publish("xyzabc")
    scanningBus.publish("ab")
    expectMsg("ab")
    scanningBus.publish("abc")
    expectMsg("abc")
    //#scanning-bus-test
  }

  "demonstrate ActorClassification" in {
    //#actor-bus-test
    val observer1 = TestProbe().ref
    val observer2 = TestProbe().ref
    val probe1 = TestProbe()
    val probe2 = TestProbe()
    val subscriber1 = probe1.ref
    val subscriber2 = probe2.ref
    val actorBus = new ActorBusImpl(system)
    actorBus.subscribe(subscriber1, observer1)
    actorBus.subscribe(subscriber2, observer1)
    actorBus.subscribe(subscriber2, observer2)
    actorBus.publish(Notification(observer1, 100))
    probe1.expectMsg(Notification(observer1, 100))
    probe2.expectMsg(Notification(observer1, 100))
    actorBus.publish(Notification(observer2, 101))
    probe2.expectMsg(Notification(observer2, 101))
    probe1.expectNoMsg(500.millis)
    //#actor-bus-test
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka EventBusDocSpec.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.