alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (EventBusUnsubscribers.scala)

This example Akka source code file (EventBusUnsubscribers.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorclassification, actorclassificationunsubscriber, actorref, akka, boolean, event, eventstream, props, register, unregister, unregisterifnomoresubscribedchannels

The EventBusUnsubscribers.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.event

import akka.actor._
import akka.event.Logging.simpleName
import java.util.concurrent.atomic.AtomicInteger

/**
 * INTERNAL API
 *
 * Watches all actors which subscribe on the given eventStream, and unsubscribes them from it when they are Terminated.
 *
 * Assumptions note:
 * We do not guarantee happens-before in the EventStream when 2 threads subscribe(a) / unsubscribe(a) on the same actor,
 * thus the messages sent to this actor may appear to be reordered - this is fine, because the worst-case is starting to
 * needlessly watch the actor which will not cause trouble for the stream. This is a trade-off between slowing down
 * subscribe calls * because of the need of linearizing the history message sequence and the possibility of sometimes
 * watching a few actors too much - we opt for the 2nd choice here.
 */
private[akka] class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor {

  import EventStreamUnsubscriber._

  override def preStart() {
    if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $eventStream"))
    eventStream initUnsubscriber self
  }

  def receive = {
    case Register(actor) ⇒
      if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates"))
      context watch actor

    case UnregisterIfNoMoreSubscribedChannels(actor) if eventStream.hasSubscriptions(actor) ⇒
    // do nothing
    // hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream

    case UnregisterIfNoMoreSubscribedChannels(actor) ⇒
      if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions"))
      context unwatch actor

    case Terminated(actor) ⇒
      if (debug) eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $eventStream, because it was terminated"))
      eventStream unsubscribe actor
  }
}

/**
 * INTERNAL API
 *
 * Provides factory for [[akka.event.EventStreamUnsubscriber]] actors with **unique names**.
 * This is needed if someone spins up more [[EventStream]]s using the same [[ActorSystem]],
 * each stream gets it's own unsubscriber.
 */
private[akka] object EventStreamUnsubscriber {

  private val unsubscribersCount = new AtomicInteger(0)

  final case class Register(actor: ActorRef)

  final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef)

  private def props(eventStream: EventStream, debug: Boolean) =
    Props(classOf[EventStreamUnsubscriber], eventStream, debug)

  def start(system: ActorSystem, stream: EventStream) = {
    val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream")
    system.asInstanceOf[ExtendedActorSystem]
      .systemActorOf(props(stream, debug), "eventStreamUnsubscriber-" + unsubscribersCount.incrementAndGet())
  }

}

/**
 * INTERNAL API
 *
 * Watches all actors which subscribe on the given event stream, and unsubscribes them from it when they are Terminated.
 */
private[akka] class ActorClassificationUnsubscriber(bus: ActorClassification, debug: Boolean) extends Actor with Stash {

  import ActorClassificationUnsubscriber._

  private var atSeq = 0
  private def nextSeq = atSeq + 1

  override def preStart() {
    super.preStart()
    if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"will monitor $bus"))
  }

  def receive = {
    case Register(actor, seq) if seq == nextSeq ⇒
      if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"registered watch for $actor in $bus"))
      context watch actor
      atSeq = nextSeq
      unstashAll()

    case reg: Register ⇒
      stash()

    case Unregister(actor, seq) if seq == nextSeq ⇒
      if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"unregistered watch of $actor in $bus"))
      context unwatch actor
      atSeq = nextSeq
      unstashAll()

    case unreg: Unregister ⇒
      stash()

    case Terminated(actor) ⇒
      if (debug) context.system.eventStream.publish(Logging.Debug(simpleName(getClass), getClass, s"actor $actor has terminated, unsubscribing it from $bus"))
      // the `unsubscribe` will trigger another `Unregister(actor, _)` message to this unsubscriber;
      // but since that actor is terminated, there cannot be any harm in processing an Unregister for it.
      bus unsubscribe actor
  }

}

/**
 * INTERNAL API
 *
 * Provides factory for [[akka.event.ActorClassificationUnsubscriber]] actors with **unique names**.
 */
private[akka] object ActorClassificationUnsubscriber {

  private val unsubscribersCount = new AtomicInteger(0)

  final case class Register(actor: ActorRef, seq: Int)
  final case class Unregister(actor: ActorRef, seq: Int)

  def start(system: ActorSystem, bus: ActorClassification, debug: Boolean = false) = {
    val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream")
    system.asInstanceOf[ExtendedActorSystem]
      .systemActorOf(props(bus, debug), "actorClassificationUnsubscriber-" + unsubscribersCount.incrementAndGet())
  }

  private def props(eventBus: ActorClassification, debug: Boolean) = Props(classOf[ActorClassificationUnsubscriber], eventBus, debug)

}

Other Akka source code examples

Here is a short list of links related to this Akka EventBusUnsubscribers.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.