|
Akka/Scala example source code file (EventBusDocSpec.scala)
The EventBusDocSpec.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.event
import scala.concurrent.duration._
import akka.testkit.AkkaSpec
import akka.actor.{ ActorSystem, ActorRef }
import akka.testkit.TestProbe
object EventBusDocSpec {
//#lookup-bus
import akka.event.EventBus
import akka.event.LookupClassification
final case class MsgEnvelope(topic: String, payload: Any)
/**
* Publishes the payload of the MsgEnvelope when the topic of the
* MsgEnvelope equals the String specified when subscribing.
*/
class LookupBusImpl extends EventBus with LookupClassification {
type Event = MsgEnvelope
type Classifier = String
type Subscriber = ActorRef
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered themselves
// for the event’s classifier
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event.payload
}
// must define a full order over the subscribers, expressed as expected from
// `java.lang.Comparable.compare`
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
a.compareTo(b)
// determines the initial size of the index data structure
// used internally (i.e. the expected number of different classifiers)
override protected def mapSize: Int = 128
}
//#lookup-bus
//#subchannel-bus
import akka.util.Subclassification
class StartsWithSubclassification extends Subclassification[String] {
override def isEqual(x: String, y: String): Boolean =
x == y
override def isSubclass(x: String, y: String): Boolean =
x.startsWith(y)
}
import akka.event.SubchannelClassification
/**
* Publishes the payload of the MsgEnvelope when the topic of the
* MsgEnvelope starts with the String specified when subscribing.
*/
class SubchannelBusImpl extends EventBus with SubchannelClassification {
type Event = MsgEnvelope
type Classifier = String
type Subscriber = ActorRef
// Subclassification is an object providing `isEqual` and `isSubclass`
// to be consumed by the other methods of this classifier
override protected val subclassification: Subclassification[Classifier] =
new StartsWithSubclassification
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered
// themselves for the event’s classifier
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event.payload
}
}
//#subchannel-bus
//#scanning-bus
import akka.event.ScanningClassification
/**
* Publishes String messages with length less than or equal to the length
* specified when subscribing.
*/
class ScanningBusImpl extends EventBus with ScanningClassification {
type Event = String
type Classifier = Int
type Subscriber = ActorRef
// is needed for determining matching classifiers and storing them in an
// ordered collection
override protected def compareClassifiers(a: Classifier, b: Classifier): Int =
if (a < b) -1 else if (a == b) 0 else 1
// is needed for storing subscribers in an ordered collection
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
a.compareTo(b)
// determines whether a given classifier shall match a given event; it is invoked
// for each subscription for all received events, hence the name of the classifier
override protected def matches(classifier: Classifier, event: Event): Boolean =
event.length <= classifier
// will be invoked for each event for all subscribers which registered themselves
// for a classifier matching this event
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event
}
}
//#scanning-bus
//#actor-bus
import akka.event.ActorEventBus
import akka.event.ActorClassification
import akka.event.ActorClassifier
final case class Notification(ref: ActorRef, id: Int)
class ActorBusImpl(val system: ActorSystem) extends ActorEventBus with ActorClassifier with ActorClassification {
type Event = Notification
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): ActorRef = event.ref
// determines the initial size of the index data structure
// used internally (i.e. the expected number of different classifiers)
override protected def mapSize: Int = 128
}
//#actor-bus
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class EventBusDocSpec extends AkkaSpec {
import EventBusDocSpec._
"demonstrate LookupClassification" in {
//#lookup-bus-test
val lookupBus = new LookupBusImpl
lookupBus.subscribe(testActor, "greetings")
lookupBus.publish(MsgEnvelope("time", System.currentTimeMillis()))
lookupBus.publish(MsgEnvelope("greetings", "hello"))
expectMsg("hello")
//#lookup-bus-test
}
"demonstrate SubchannelClassification" in {
//#subchannel-bus-test
val subchannelBus = new SubchannelBusImpl
subchannelBus.subscribe(testActor, "abc")
subchannelBus.publish(MsgEnvelope("xyzabc", "x"))
subchannelBus.publish(MsgEnvelope("bcdef", "b"))
subchannelBus.publish(MsgEnvelope("abc", "c"))
expectMsg("c")
subchannelBus.publish(MsgEnvelope("abcdef", "d"))
expectMsg("d")
//#subchannel-bus-test
}
"demonstrate ScanningClassification" in {
//#scanning-bus-test
val scanningBus = new ScanningBusImpl
scanningBus.subscribe(testActor, 3)
scanningBus.publish("xyzabc")
scanningBus.publish("ab")
expectMsg("ab")
scanningBus.publish("abc")
expectMsg("abc")
//#scanning-bus-test
}
"demonstrate ActorClassification" in {
//#actor-bus-test
val observer1 = TestProbe().ref
val observer2 = TestProbe().ref
val probe1 = TestProbe()
val probe2 = TestProbe()
val subscriber1 = probe1.ref
val subscriber2 = probe2.ref
val actorBus = new ActorBusImpl(system)
actorBus.subscribe(subscriber1, observer1)
actorBus.subscribe(subscriber2, observer1)
actorBus.subscribe(subscriber2, observer2)
actorBus.publish(Notification(observer1, 100))
probe1.expectMsg(Notification(observer1, 100))
probe2.expectMsg(Notification(observer1, 100))
actorBus.publish(Notification(observer2, 101))
probe2.expectMsg(Notification(observer2, 101))
probe1.expectNoMsg(500.millis)
//#actor-bus-test
}
}
Other Akka source code examplesHere is a short list of links related to this Akka EventBusDocSpec.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.