alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Aggregator.scala)

This example Akka source code file (Aggregator.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, aggregator, akka, annotation, any, boolean, entry, handled, option, pattern, remove, t, worklist

The Aggregator.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.contrib.pattern

import akka.actor.Actor
import scala.annotation.tailrec

/**
 * The aggregator is to be mixed into an actor for the aggregator behavior.
 */
trait Aggregator {
  this: Actor ⇒

  private var processing = false
  private val expectList = WorkList.empty[Actor.Receive]
  private val addBuffer = WorkList.empty[Actor.Receive]

  /**
   * Adds the partial function to the receive set, to be removed on first match.
   * @param fn The receive function.
   * @return The same receive function.
   */
  def expectOnce(fn: Actor.Receive): Actor.Receive = {
    if (processing) addBuffer.add(fn, permanent = false)
    else expectList.add(fn, permanent = false)
    fn
  }

  /**
   * Adds the partial function to the receive set and keeping it in the receive set till removed.
   * @param fn The receive function.
   * @return The same receive function.
   */
  def expect(fn: Actor.Receive): Actor.Receive = {
    if (processing) addBuffer.add(fn, permanent = true)
    else expectList.add(fn, permanent = true)
    fn
  }

  /**
   * Removes the partial function from the receive set.
   * @param fn The receive function.
   * @return True if the partial function is removed, false if not found.
   */
  def unexpect(fn: Actor.Receive): Boolean = {
    if (expectList remove fn) true
    else if (processing && (addBuffer remove fn)) true
    else false
  }

  /**
   * Receive function for handling the aggregations.
   */
  def receive: Actor.Receive = {
    case msg if handleMessage(msg) ⇒ // already dealt with in handleMessage
  }

  /**
   * Handles messages and matches against the expect list.
   * @param msg The message to be handled.
   * @return true if message is successfully processed, false otherwise.
   */
  def handleMessage(msg: Any): Boolean = {
    processing = true
    try {
      expectList process { fn ⇒
        var processed = true
        fn.applyOrElse(msg, (_: Any) ⇒ processed = false)
        processed
      }
    } finally {
      processing = false
      expectList addAll addBuffer
      addBuffer.removeAll()
    }
  }
}

/**
 * Provides the utility methods and constructors to the WorkList class.
 */
object WorkList {

  def empty[T] = new WorkList[T]

  /**
   * Singly linked list entry implementation for WorkList.
   * @param ref The item reference, None for head entry
   * @param permanent If the entry is to be kept after processing
   * @tparam T The type of the item
   */
  class Entry[T](val ref: Option[T], val permanent: Boolean) {
    var next: Entry[T] = null
    var isDeleted = false
  }
}

/**
 * Fast, small, and dirty implementation of a linked list that removes transient work entries once they are processed.
 * The list is not thread safe! However it is expected to be reentrant. This means a processing function can add/remove
 * entries from the list while processing. Most important, a processing function can remove its own entry from the list.
 * The first remove must return true and any subsequent removes must return false.
 * @tparam T The type of the item
 */
class WorkList[T] {

  import WorkList._

  val head = new Entry[T](None, true)
  var tail = head

  /**
   * Appends an entry to the work list.
   * @param ref The entry.
   * @return The updated work list.
   */
  def add(ref: T, permanent: Boolean) = {
    if (tail == head) {
      tail = new Entry[T](Some(ref), permanent)
      head.next = tail
    } else {
      tail.next = new Entry[T](Some(ref), permanent)
      tail = tail.next
    }
    this
  }

  /**
   * Removes an entry from the work list
   * @param ref The entry.
   * @return True if the entry is removed, false if the entry is not found.
   */
  def remove(ref: T): Boolean = {

    @tailrec
    def remove(parent: Entry[T], entry: Entry[T]): Boolean = {
      if (entry.ref.get == ref) {
        parent.next = entry.next // Remove entry
        if (tail == entry) tail = parent
        entry.isDeleted = true
        true
      } else if (entry.next != null) remove(entry, entry.next)
      else false
    }

    if (head.next == null) false else remove(head, head.next)
  }

  /**
   * Tries to process each entry using the processing function. Stops at the first entry processing succeeds.
   * If the entry is not permanent, the entry is removed.
   * @param processFn The processing function, returns true if processing succeeds.
   * @return true if an entry has been processed, false if no entries are processed successfully.
   */
  def process(processFn: T ⇒ Boolean): Boolean = {

    @tailrec
    def process(parent: Entry[T], entry: Entry[T]): Boolean = {
      val processed = processFn(entry.ref.get)
      if (processed) {
        if (!entry.permanent && !entry.isDeleted) {
          parent.next = entry.next // Remove entry
          if (tail == entry) tail = parent
          entry.isDeleted = true
        }
        true // Handled
      } else if (entry.next != null) process(entry, entry.next)
      else false
    }

    if (head.next == null) false else process(head, head.next)
  }

  /**
   * Appends another WorkList to this WorkList.
   * @param other The other WorkList
   * @return This WorkList
   */
  def addAll(other: WorkList[T]) = {
    if (other.head.next != null) {
      tail.next = other.head.next
      tail = other.tail
    }
    this
  }

  /**
   * Removes all entries from this WorkList
   * @return True if at least one entry is removed. False if none is removed.
   */
  def removeAll() = {
    if (head.next == null) false
    else {
      head.next = null
      tail = head
      true
    }
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka Aggregator.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.