alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Index.scala)

This example Akka source code file (Index.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, boolean, collection, comparator, concurrentskiplistset, if, index, int, k, remove, unit, util, v

The Index.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.util

import annotation.tailrec

import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
import java.util.Comparator
import scala.collection.JavaConverters.{ asScalaIteratorConverter, collectionAsScalaIterableConverter }
import scala.collection.mutable

/**
 * An implementation of a ConcurrentMultiMap
 * Adds/remove is serialized over the specified key
 * Reads are fully concurrent <-- el-cheapo
 */
class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) {

  def this(mapSize: Int, cmp: (V, V) ⇒ Int) = this(mapSize, new Comparator[V] {
    def compare(a: V, b: V): Int = cmp(a, b)
  })

  private val container = new ConcurrentHashMap[K, ConcurrentSkipListSet[V]](mapSize)
  private val emptySet = new ConcurrentSkipListSet[V]

  /**
   * Associates the value of type V with the key of type K
   * @return true if the value didn't exist for the key previously, and false otherwise
   */
  def put(key: K, value: V): Boolean = {
    //Tailrecursive spin-locking put
    @tailrec
    def spinPut(k: K, v: V): Boolean = {
      var retry = false
      var added = false
      val set = container get k

      if (set ne null) {
        set.synchronized {
          if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
          else { //Else add the value to the set and signal that retry is not needed
            added = set add v
            retry = false
          }
        }
      } else {
        val newSet = new ConcurrentSkipListSet[V](valueComparator)
        newSet add v

        // Parry for two simultaneous putIfAbsent(id,newSet)
        val oldSet = container.putIfAbsent(k, newSet)
        if (oldSet ne null) {
          oldSet.synchronized {
            if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
            else { //Else try to add the value to the set and signal that retry is not needed
              added = oldSet add v
              retry = false
            }
          }
        } else added = true
      }

      if (retry) spinPut(k, v)
      else added
    }

    spinPut(key, value)
  }

  /**
   * @return Some(value) for the first matching value where the supplied function returns true for the given key,
   * if no matches it returns None
   */
  def findValue(key: K)(f: (V) ⇒ Boolean): Option[V] =
    container get key match {
      case null ⇒ None
      case set  ⇒ set.iterator.asScala find f
    }

  /**
   * Returns an Iterator of V containing the values for the supplied key, or an empty iterator if the key doesn't exist
   */
  def valueIterator(key: K): scala.Iterator[V] = {
    container.get(key) match {
      case null ⇒ Iterator.empty
      case some ⇒ some.iterator.asScala
    }
  }

  /**
   * Applies the supplied function to all keys and their values
   */
  def foreach(fun: (K, V) ⇒ Unit): Unit =
    container.entrySet.iterator.asScala foreach { e ⇒ e.getValue.iterator.asScala.foreach(fun(e.getKey, _)) }

  /**
   * Returns the union of all value sets.
   */
  def values: Set[V] = {
    val builder = Set.newBuilder[V]
    for {
      values ← container.values.iterator.asScala
      v ← values.iterator.asScala
    } builder += v
    builder.result()
  }

  /**
   * Returns the key set.
   */
  def keys: Iterable[K] = container.keySet.asScala

  /**
   * Disassociates the value of type V from the key of type K
   * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key
   */
  def remove(key: K, value: V): Boolean = {
    val set = container get key

    if (set ne null) {
      set.synchronized {
        if (set.remove(value)) { //If we can remove the value
          if (set.isEmpty) //and the set becomes empty
            container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set

          true //Remove succeeded
        } else false //Remove failed
      }
    } else false //Remove failed
  }

  /**
   * Disassociates all the values for the specified key
   * @return None if the key wasn't associated at all, or Some(scala.Iterable[V]) if it was associated
   */
  def remove(key: K): Option[Iterable[V]] = {
    val set = container get key

    if (set ne null) {
      set.synchronized {
        container.remove(key, set)
        val ret = collectionAsScalaIterableConverter(set.clone()).asScala // Make copy since we need to clear the original
        set.clear() // Clear the original set to signal to any pending writers that there was a conflict
        Some(ret)
      }
    } else None //Remove failed
  }

  /**
   * Removes the specified value from all keys
   */
  def removeValue(value: V): Unit = {
    val i = container.entrySet().iterator()
    while (i.hasNext) {
      val e = i.next()
      val set = e.getValue()

      if (set ne null) {
        set.synchronized {
          if (set.remove(value)) { //If we can remove the value
            if (set.isEmpty) //and the set becomes empty
              container.remove(e.getKey, emptySet) //We try to remove the key if it's mapped to an empty set
          }
        }
      }
    }
  }

  /**
   * @return true if the underlying containers is empty, may report false negatives when the last remove is underway
   */
  def isEmpty: Boolean = container.isEmpty

  /**
   *  Removes all keys and all values
   */
  def clear(): Unit = {
    val i = container.entrySet().iterator()
    while (i.hasNext) {
      val e = i.next()
      val set = e.getValue()
      if (set ne null) { set.synchronized { set.clear(); container.remove(e.getKey, emptySet) } }
    }
  }
}

/**
 * An implementation of a ConcurrentMultiMap
 * Adds/remove is serialized over the specified key
 * Reads are fully concurrent <-- el-cheapo
 */
class ConcurrentMultiMap[K, V](mapSize: Int, valueComparator: Comparator[V]) extends Index[K, V](mapSize, valueComparator)

Other Akka source code examples

Here is a short list of links related to this Akka Index.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.