alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (EventStream.scala)

This example Akka source code file (EventStream.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, annotation, anyref, atomicreference, boolean, class, classifier, event, illegalargumentexception, left, right, unit, util

The EventStream.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.event

import language.implicitConversions

import akka.actor.{ ActorRef, ActorSystem }
import akka.event.Logging.simpleName
import akka.util.Subclassification
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec

/**
 * An Akka EventStream is a pub-sub stream of events both system and user generated,
 * where subscribers are ActorRefs and the channels are Classes and Events are any java.lang.Object.
 * EventStreams employ SubchannelClassification, which means that if you listen to a Class,
 * you'll receive any message that is of that type or a subtype.
 *
 * The debug flag in the constructor toggles if operations on this EventStream should also be published
 * as Debug-Events
 */
class EventStream(sys: ActorSystem, private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {

  type Event = AnyRef
  type Classifier = Class[_]

  /** Either the list of subscribed actors, or a ref to an [[akka.event.EventStreamUnsubscriber]] */
  private val initiallySubscribedOrUnsubscriber = new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty))

  protected implicit val subclassification = new Subclassification[Class[_]] {
    def isEqual(x: Class[_], y: Class[_]) = x == y
    def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x
  }

  protected def classify(event: AnyRef): Class[_] = event.getClass

  protected def publish(event: AnyRef, subscriber: ActorRef) = {
    subscriber ! event
  }

  override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
    if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
    if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
    registerWithUnsubscriber(subscriber)
    super.subscribe(subscriber, channel)
  }

  override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
    if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
    val ret = super.unsubscribe(subscriber, channel)
    if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel))
    unregisterIfNoMoreSubscribedChannels(subscriber)
    ret
  }

  override def unsubscribe(subscriber: ActorRef) {
    if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
    super.unsubscribe(subscriber)
    if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels"))
    unregisterIfNoMoreSubscribedChannels(subscriber)
  }

  /**
   * ''Must'' be called after actor system is "ready".
   * Starts system actor that takes care of unsubscribing subscribers that have terminated.
   */
  def startUnsubscriber() = EventStreamUnsubscriber.start(sys, this)

  /**
   * INTERNAL API
   */
  @tailrec
  final private[akka] def initUnsubscriber(unsubscriber: ActorRef): Boolean = {
    initiallySubscribedOrUnsubscriber.get match {
      case value @ Left(subscribers) ⇒
        if (initiallySubscribedOrUnsubscriber.compareAndSet(value, Right(unsubscriber))) {
          if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "initialized unsubscriber to: " + unsubscriber + ", registering " + subscribers.size + " initial subscribers with it"))
          subscribers foreach registerWithUnsubscriber
          true
        } else {
          // recurse, because either new subscribers have been registered since `get` (retry Left case),
          // or another thread has succeeded in setting it's unsubscriber (end on Right case)
          initUnsubscriber(unsubscriber)
        }

      case Right(presentUnsubscriber) ⇒
        if (debug) publish(Logging.Debug(simpleName(this), this.getClass, s"not using unsubscriber $unsubscriber, because already initialized with $presentUnsubscriber"))
        false
    }
  }

  /**
   * INTERNAL API
   */
  @tailrec
  private def registerWithUnsubscriber(subscriber: ActorRef): Unit = {
    initiallySubscribedOrUnsubscriber.get match {
      case value @ Left(subscribers) ⇒
        if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers + subscriber)))
          registerWithUnsubscriber(subscriber)

      case Right(unsubscriber) ⇒
        unsubscriber ! EventStreamUnsubscriber.Register(subscriber)
    }
  }

  /**
   * INTERNAL API
   *
   * The actual check if the subscriber still has subscriptions is performed by the `EventStreamUnsubscriber`,
   * because it's an expensive operation, and we don want to block client-code for that long, the Actor will eventually
   * catch up and perform the apropriate operation.
   */
  @tailrec
  private def unregisterIfNoMoreSubscribedChannels(subscriber: ActorRef): Unit = {
    initiallySubscribedOrUnsubscriber.get match {
      case value @ Left(subscribers) ⇒
        if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers - subscriber)))
          unregisterIfNoMoreSubscribedChannels(subscriber)

      case Right(unsubscriber) ⇒
        unsubscriber ! EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber)
    }
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka EventStream.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.