alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (BoundedBlockingQueue.scala)

This example Akka source code file (BoundedBlockingQueue.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, anyref, boolean, collection, e, illegalargumentexception, int, iterator, long, nullpointerexception, tries, util

The BoundedBlockingQueue.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.util

import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
import annotation.tailrec

/**
 * BoundedBlockingQueue wraps any Queue and turns the result into a BlockingQueue with a limited capacity.
 * @param maxCapacity - the maximum capacity of this Queue, needs to be > 0
 * @param backing - the backing Queue
 * @tparam E - The type of the contents of this Queue
 */
class BoundedBlockingQueue[E <: AnyRef](
  val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {

  backing match {
    case null ⇒ throw new IllegalArgumentException("Backing Queue may not be null")
    case b: BlockingQueue[_] ⇒
      require(maxCapacity > 0)
      require(b.size() == 0)
      require(b.remainingCapacity >= maxCapacity)
    case b: Queue[_] ⇒
      require(b.size() == 0)
      require(maxCapacity > 0)
  }

  protected val lock = new ReentrantLock(false)

  private val notEmpty = lock.newCondition()
  private val notFull = lock.newCondition()

  def put(e: E) { //Blocks until not full
    if (e eq null) throw new NullPointerException
    lock.lockInterruptibly()
    try {
      @tailrec def putElement() {
        if (backing.size() < maxCapacity) {
          require(backing.offer(e))
          notEmpty.signal()
        } else {
          notFull.await()
          putElement()
        }
      }
      putElement()
    } finally lock.unlock()
  }

  def take(): E = { //Blocks until not empty
    lock.lockInterruptibly()
    try {
      @tailrec def takeElement(): E = {
        if (!backing.isEmpty()) {
          val e = backing.poll()
          require(e ne null)
          notFull.signal()
          e
        } else {
          notEmpty.await()
          takeElement()
        }
      }
      takeElement()
    } finally lock.unlock()
  }

  def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false
    if (e eq null) throw new NullPointerException
    lock.lock()
    try {
      if (backing.size() == maxCapacity) false
      else {
        require(backing.offer(e)) //Should never fail
        notEmpty.signal()
        true
      }
    } finally lock.unlock()
  }

  def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail
    if (e eq null) throw new NullPointerException
    lock.lockInterruptibly()
    try {
      @tailrec def offerElement(remainingNanos: Long): Boolean = {
        if (backing.size() < maxCapacity) {
          require(backing.offer(e)) //Should never fail
          notEmpty.signal()
          true
        } else if (remainingNanos <= 0) false
        else offerElement(notFull.awaitNanos(remainingNanos))
      }
      offerElement(unit.toNanos(timeout))
    } finally lock.unlock()
  }

  def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail
    lock.lockInterruptibly()
    try {
      @tailrec def pollElement(remainingNanos: Long): E = {
        backing.poll() match {
          case null if remainingNanos <= 0 ⇒ null.asInstanceOf[E]
          case null                        ⇒ pollElement(notEmpty.awaitNanos(remainingNanos))
          case e ⇒ {
            notFull.signal()
            e
          }
        }
      }
      pollElement(unit.toNanos(timeout))
    } finally lock.unlock()
  }

  def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null
    lock.lock()
    try {
      backing.poll() match {
        case null ⇒ null.asInstanceOf[E]
        case e ⇒
          notFull.signal()
          e
      }
    } finally lock.unlock()
  }

  override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false
    if (e eq null) throw new NullPointerException
    lock.lock()
    try {
      if (backing remove e) {
        notFull.signal()
        true
      } else false
    } finally lock.unlock()
  }

  override def contains(e: AnyRef): Boolean = {
    if (e eq null) throw new NullPointerException
    lock.lock()
    try backing.contains(e) finally lock.unlock()
  }

  override def clear() {
    lock.lock()
    try {
      backing.clear()
      notFull.signalAll()
    } finally lock.unlock()
  }

  def remainingCapacity(): Int = {
    lock.lock()
    try {
      maxCapacity - backing.size()
    } finally lock.unlock()
  }

  def size(): Int = {
    lock.lock()
    try backing.size() finally lock.unlock()
  }

  def peek(): E = {
    lock.lock()
    try backing.peek() finally lock.unlock()
  }

  def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue)

  def drainTo(c: Collection[_ >: E], maxElements: Int): Int = {
    if (c eq null) throw new NullPointerException
    if (c eq this) throw new IllegalArgumentException
    if (c eq backing) throw new IllegalArgumentException
    if (maxElements <= 0) 0
    else {
      lock.lock()
      try {
        @tailrec def drainOne(n: Int = 0): Int = {
          if (n < maxElements) {
            backing.poll() match {
              case null ⇒ n
              case e    ⇒ c add e; drainOne(n + 1)
            }
          } else n
        }
        val n = drainOne()
        if (n > 0) notFull.signalAll()
        n
      } finally lock.unlock()
    }
  }

  override def containsAll(c: Collection[_]): Boolean = {
    lock.lock()
    try backing.containsAll(c) finally lock.unlock()
  }

  override def removeAll(c: Collection[_]): Boolean = {
    lock.lock()
    try {
      if (backing.removeAll(c)) {
        val sz = backing.size()
        if (sz < maxCapacity) notFull.signal()
        if (sz > 0) notEmpty.signal()
        true
      } else false
    } finally lock.unlock()
  }

  override def retainAll(c: Collection[_]): Boolean = {
    lock.lock()
    try {
      if (backing.retainAll(c)) {
        val sz = backing.size()
        if (sz < maxCapacity) notFull.signal()
        if (sz > 0) notEmpty.signal()
        true
      } else false
    } finally lock.unlock()
  }

  def iterator(): Iterator[E] = {
    lock.lock
    try {
      val elements = backing.toArray
      new Iterator[E] {
        var at = 0
        var last = -1

        def hasNext(): Boolean = at < elements.length

        def next(): E = {
          if (at >= elements.length) throw new NoSuchElementException
          last = at
          at += 1
          elements(last).asInstanceOf[E]
        }

        def remove() {
          if (last < 0) throw new IllegalStateException
          val target = elements(last)
          last = -1 //To avoid 2 subsequent removes without a next in between
          lock.lock()
          try {
            @tailrec def removeTarget(i: Iterator[E] = backing.iterator()): Unit =
              if (i.hasNext) {
                if (i.next eq target) {
                  i.remove()
                  notFull.signal()
                } else removeTarget(i)
              }

            removeTarget()
          } finally lock.unlock()
        }
      }
    } finally lock.unlock()
  }

  override def toArray(): Array[AnyRef] = {
    lock.lock()
    try backing.toArray finally lock.unlock()
  }

  override def isEmpty(): Boolean = {
    lock.lock()
    try backing.isEmpty() finally lock.unlock()
  }

  override def toArray[X](a: Array[X with AnyRef]) = {
    lock.lock()
    try backing.toArray[X](a) finally lock.unlock()
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka BoundedBlockingQueue.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.