alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ByteIterator.scala)

This example Akka source code file (ByteIterator.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, array, boolean, byte, bytearrayiterator, byteiterator, byteorder, collection, generics, int, long, multibytearrayiterator, mutable, reflection, unit, util

The ByteIterator.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.util

import java.nio.{ ByteBuffer, ByteOrder }

import scala.collection.{ LinearSeq, IndexedSeqOptimized }
import scala.collection.mutable.{ Builder, WrappedArray }
import scala.collection.immutable.{ IndexedSeq, VectorBuilder }
import scala.collection.generic.CanBuildFrom
import scala.collection.mutable.{ ListBuffer }
import scala.annotation.tailrec
import scala.reflect.ClassTag

object ByteIterator {
  object ByteArrayIterator {
    private val emptyArray: Array[Byte] = Array.ofDim[Byte](0)

    protected[akka] def apply(array: Array[Byte]): ByteArrayIterator =
      new ByteArrayIterator(array, 0, array.length)

    protected[akka] def apply(array: Array[Byte], from: Int, until: Int): ByteArrayIterator =
      new ByteArrayIterator(array, from, until)

    val empty: ByteArrayIterator = apply(emptyArray)
  }

  class ByteArrayIterator private (private var array: Array[Byte], private var from: Int, private var until: Int) extends ByteIterator {
    iterator ⇒

    @inline final def len: Int = until - from

    @inline final def hasNext: Boolean = from < until

    @inline final def head: Byte = array(from)

    final def next(): Byte = {
      if (!hasNext) Iterator.empty.next
      else { val i = from; from = from + 1; array(i) }
    }

    def clear(): Unit = { this.array = ByteArrayIterator.emptyArray; from = 0; until = from }

    final override def length: Int = { val l = len; clear(); l }

    final override def ++(that: TraversableOnce[Byte]): ByteIterator = that match {
      case that: ByteIterator ⇒
        if (that.isEmpty) this
        else if (this.isEmpty) that
        else that match {
          case that: ByteArrayIterator ⇒
            if ((this.array eq that.array) && (this.until == that.from)) {
              this.until = that.until
              that.clear()
              this
            } else {
              val result = MultiByteArrayIterator(List(this, that))
              this.clear()
              result
            }
          case that: MultiByteArrayIterator ⇒ this ++: that
        }
      case _ ⇒ super.++(that)
    }

    final override def clone: ByteArrayIterator = new ByteArrayIterator(array, from, until)

    final override def take(n: Int): this.type = {
      if (n < len) until = { if (n > 0) (from + n) else from }
      this
    }

    final override def drop(n: Int): this.type = {
      if (n > 0) from = { if (n < len) (from + n) else until }
      this
    }

    final override def takeWhile(p: Byte ⇒ Boolean): this.type = {
      val prev = from
      dropWhile(p)
      until = from; from = prev
      this
    }

    final override def dropWhile(p: Byte ⇒ Boolean): this.type = {
      var stop = false
      while (!stop && hasNext) {
        if (p(array(from))) { from = from + 1 } else { stop = true }
      }
      this
    }

    final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = {
      val n = 0 max ((xs.length - start) min this.len min len)
      Array.copy(this.array, from, xs, start, n)
      this.drop(n)
    }

    final override def toByteString: ByteString = {
      val result =
        if ((from == 0) && (until == array.length)) ByteString.ByteString1C(array)
        else ByteString.ByteString1(array, from, len)
      clear()
      result
    }

    def getBytes(xs: Array[Byte], offset: Int, n: Int): this.type = {
      if (n <= this.len) {
        Array.copy(this.array, this.from, xs, offset, n)
        this.drop(n)
      } else Iterator.empty.next
    }

    private def wrappedByteBuffer: ByteBuffer = ByteBuffer.wrap(array, from, len).asReadOnlyBuffer

    def getShorts(xs: Array[Short], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      { wrappedByteBuffer.order(byteOrder).asShortBuffer.get(xs, offset, n); drop(2 * n) }

    def getInts(xs: Array[Int], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      { wrappedByteBuffer.order(byteOrder).asIntBuffer.get(xs, offset, n); drop(4 * n) }

    def getLongs(xs: Array[Long], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      { wrappedByteBuffer.order(byteOrder).asLongBuffer.get(xs, offset, n); drop(8 * n) }

    def getFloats(xs: Array[Float], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      { wrappedByteBuffer.order(byteOrder).asFloatBuffer.get(xs, offset, n); drop(4 * n) }

    def getDoubles(xs: Array[Double], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      { wrappedByteBuffer.order(byteOrder).asDoubleBuffer.get(xs, offset, n); drop(8 * n) }

    def copyToBuffer(buffer: ByteBuffer): Int = {
      val copyLength = math.min(buffer.remaining, len)
      if (copyLength > 0) {
        buffer.put(array, from, copyLength)
        drop(copyLength)
      }
      copyLength
    }

    def asInputStream: java.io.InputStream = new java.io.InputStream {
      override def available: Int = iterator.len

      def read: Int = if (hasNext) (next().toInt & 0xff) else -1

      override def read(b: Array[Byte], off: Int, len: Int): Int = {
        if ((off < 0) || (len < 0) || (off + len > b.length)) throw new IndexOutOfBoundsException
        if (len == 0) 0
        else if (!isEmpty) {
          val nRead = math.min(available, len)
          copyToArray(b, off, nRead)
          nRead
        } else -1
      }

      override def skip(n: Long): Long = {
        val nSkip = math.min(iterator.len, n.toInt)
        iterator.drop(nSkip)
        nSkip
      }
    }
  }

  object MultiByteArrayIterator {
    protected val clearedList: List[ByteArrayIterator] = List(ByteArrayIterator.empty)

    val empty: MultiByteArrayIterator = new MultiByteArrayIterator(Nil)

    protected[akka] def apply(iterators: LinearSeq[ByteArrayIterator]): MultiByteArrayIterator =
      new MultiByteArrayIterator(iterators)
  }

  class MultiByteArrayIterator private (private var iterators: LinearSeq[ByteArrayIterator]) extends ByteIterator {
    // After normalization:
    // * iterators.isEmpty == false
    // * (!iterator.head.isEmpty || iterators.tail.isEmpty) == true
    private def normalize(): this.type = {
      @tailrec def norm(xs: LinearSeq[ByteArrayIterator]): LinearSeq[ByteArrayIterator] = {
        if (xs.isEmpty) MultiByteArrayIterator.clearedList
        else if (xs.head.isEmpty) norm(xs.tail)
        else xs
      }
      iterators = norm(iterators)
      this
    }
    normalize()

    @inline private def current: ByteArrayIterator = iterators.head
    @inline private def dropCurrent(): Unit = { iterators = iterators.tail }
    @inline def clear(): Unit = { iterators = MultiByteArrayIterator.empty.iterators }

    @inline final def hasNext: Boolean = current.hasNext

    @inline final def head: Byte = current.head

    final def next(): Byte = {
      val result = current.next()
      normalize()
      result
    }

    final override def len: Int = iterators.foldLeft(0) { _ + _.len }

    final override def length: Int = {
      var result = len
      clear()
      result
    }

    private[akka] def ++:(that: ByteArrayIterator): this.type = {
      iterators = that +: iterators
      this
    }

    final override def ++(that: TraversableOnce[Byte]): ByteIterator = that match {
      case that: ByteIterator ⇒
        if (that.isEmpty) this
        else if (this.isEmpty) that
        else {
          that match {
            case that: ByteArrayIterator ⇒
              iterators = this.iterators :+ that
              that.clear()
              this
            case that: MultiByteArrayIterator ⇒
              iterators = this.iterators ++ that.iterators
              that.clear()
              this
          }
        }
      case _ ⇒ super.++(that)
    }

    final override def clone: MultiByteArrayIterator = {
      val clonedIterators: List[ByteArrayIterator] = iterators.map(_.clone)(collection.breakOut)
      new MultiByteArrayIterator(clonedIterators)
    }

    final override def take(n: Int): this.type = {
      var rest = n
      val builder = new ListBuffer[ByteArrayIterator]
      while ((rest > 0) && !iterators.isEmpty) {
        current.take(rest)
        if (current.hasNext) {
          rest -= current.len
          builder += current
        }
        iterators = iterators.tail
      }
      iterators = builder.result
      normalize()
    }

    @tailrec final override def drop(n: Int): this.type =
      if ((n > 0) && !isEmpty) {
        val nCurrent = math.min(n, current.len)
        current.drop(n)
        val rest = n - nCurrent
        assert(current.isEmpty || (rest == 0))
        normalize()
        drop(rest)
      } else this

    final override def takeWhile(p: Byte ⇒ Boolean): this.type = {
      var stop = false
      var builder = new ListBuffer[ByteArrayIterator]
      while (!stop && !iterators.isEmpty) {
        val lastLen = current.len
        current.takeWhile(p)
        if (current.hasNext) builder += current
        if (current.len < lastLen) stop = true
        dropCurrent()
      }
      iterators = builder.result
      normalize()
    }

    @tailrec final override def dropWhile(p: Byte ⇒ Boolean): this.type =
      if (!isEmpty) {
        current.dropWhile(p)
        val dropMore = current.isEmpty
        normalize()
        if (dropMore) dropWhile(p) else this
      } else this

    final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = {
      var pos = start
      var rest = len
      while ((rest > 0) && !iterators.isEmpty) {
        val n = 0 max ((xs.length - pos) min current.len min rest)
        current.copyToArray(xs, pos, n)
        pos += n
        rest -= n
        dropCurrent()
      }
      normalize()
    }

    override def foreach[@specialized U](f: Byte ⇒ U): Unit = {
      iterators foreach { _ foreach f }
      clear()
    }

    final override def toByteString: ByteString = {
      if (iterators.tail.isEmpty) iterators.head.toByteString
      else {
        val result = iterators.foldLeft(ByteString.empty) { _ ++ _.toByteString }
        clear()
        result
      }
    }

    @tailrec protected final def getToArray[A](xs: Array[A], offset: Int, n: Int, elemSize: Int)(getSingle: ⇒ A)(getMult: (Array[A], Int, Int) ⇒ Unit): this.type =
      if (n <= 0) this else {
        if (isEmpty) Iterator.empty.next
        val nDone = if (current.len >= elemSize) {
          val nCurrent = math.min(n, current.len / elemSize)
          getMult(xs, offset, nCurrent)
          nCurrent
        } else {
          xs(offset) = getSingle
          1
        }
        normalize()
        getToArray(xs, offset + nDone, n - nDone, elemSize)(getSingle)(getMult)
      }

    def getBytes(xs: Array[Byte], offset: Int, n: Int): this.type =
      getToArray(xs, offset, n, 1) { getByte } { current.getBytes(_, _, _) }

    def getShorts(xs: Array[Short], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      getToArray(xs, offset, n, 2) { getShort(byteOrder) } { current.getShorts(_, _, _)(byteOrder) }

    def getInts(xs: Array[Int], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      getToArray(xs, offset, n, 4) { getInt(byteOrder) } { current.getInts(_, _, _)(byteOrder) }

    def getLongs(xs: Array[Long], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      getToArray(xs, offset, n, 8) { getLong(byteOrder) } { current.getLongs(_, _, _)(byteOrder) }

    def getFloats(xs: Array[Float], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      getToArray(xs, offset, n, 8) { getFloat(byteOrder) } { current.getFloats(_, _, _)(byteOrder) }

    def getDoubles(xs: Array[Double], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type =
      getToArray(xs, offset, n, 8) { getDouble(byteOrder) } { current.getDoubles(_, _, _)(byteOrder) }

    def copyToBuffer(buffer: ByteBuffer): Int = {
      val n = iterators.foldLeft(0) { _ + _.copyToBuffer(buffer) }
      normalize()
      n
    }

    def asInputStream: java.io.InputStream = new java.io.InputStream {
      override def available: Int = current.len

      def read: Int = if (hasNext) (next().toInt & 0xff) else -1

      override def read(b: Array[Byte], off: Int, len: Int): Int = {
        val nRead = current.asInputStream.read(b, off, len)
        normalize()
        nRead
      }

      override def skip(n: Long): Long = {
        @tailrec def skipImpl(n: Long, skipped: Long): Long =
          if (n > 0) {
            if (!isEmpty) {
              val m = current.asInputStream.skip(n)
              normalize()
              val newN = n - m
              val newSkipped = skipped + m
              if (newN > 0) skipImpl(newN, newSkipped)
              else newSkipped
            } else 0
          } else 0

        skipImpl(n, 0)
      }
    }
  }
}

/**
 * An iterator over a ByteString.
 */

abstract class ByteIterator extends BufferedIterator[Byte] {
  def len: Int

  def head: Byte

  def next(): Byte

  protected def clear(): Unit

  def ++(that: TraversableOnce[Byte]): ByteIterator = if (that.isEmpty) this else ByteIterator.ByteArrayIterator(that.toArray)

  // *must* be overridden by derived classes. This construction is necessary
  // to specialize the return type, as the method is already implemented in
  // the parent class.
  override def clone: ByteIterator = throw new UnsupportedOperationException("Method clone is not implemented in ByteIterator")

  override def duplicate: (ByteIterator, ByteIterator) = (this, clone)

  // *must* be overridden by derived classes. This construction is necessary
  // to specialize the return type, as the method is already implemented in
  // the parent class.
  override def take(n: Int): this.type = throw new UnsupportedOperationException("Method take is not implemented in ByteIterator")

  // *must* be overridden by derived classes. This construction is necessary
  // to specialize the return type, as the method is already implemented in
  // the parent class.
  override def drop(n: Int): this.type = throw new UnsupportedOperationException("Method drop is not implemented in ByteIterator")

  override def slice(from: Int, until: Int): this.type = {
    if (from > 0) drop(from).take(until - from)
    else take(until)
  }

  // *must* be overridden by derived classes. This construction is necessary
  // to specialize the return type, as the method is already implemented in
  // the parent class.
  override def takeWhile(p: Byte ⇒ Boolean): this.type = throw new UnsupportedOperationException("Method takeWhile is not implemented in ByteIterator")

  // *must* be overridden by derived classes. This construction is necessary
  // to specialize the return type, as the method is already implemented in
  // the parent class.
  override def dropWhile(p: Byte ⇒ Boolean): this.type = throw new UnsupportedOperationException("Method dropWhile is not implemented in ByteIterator")

  override def span(p: Byte ⇒ Boolean): (ByteIterator, ByteIterator) = {
    val that = clone
    this.takeWhile(p)
    that.drop(this.len)
    (this, that)
  }

  override def indexWhere(p: Byte ⇒ Boolean): Int = {
    var index = 0
    var found = false
    while (!found && hasNext) if (p(next())) { found = true } else { index += 1 }
    if (found) index else -1
  }

  def indexOf(elem: Byte): Int = indexWhere { _ == elem }

  override def indexOf[B >: Byte](elem: B): Int = indexWhere { _ == elem }

  def toByteString: ByteString

  override def toSeq: ByteString = toByteString

  override def foreach[@specialized U](f: Byte ⇒ U): Unit =
    while (hasNext) f(next())

  override def foldLeft[@specialized B](z: B)(op: (B, Byte) ⇒ B): B = {
    var acc = z
    foreach { byte ⇒ acc = op(acc, byte) }
    acc
  }

  override def toArray[B >: Byte](implicit arg0: ClassTag[B]): Array[B] = {
    val target = Array.ofDim[B](len)
    copyToArray(target)
    target
  }

  /**
   * Get a single Byte from this iterator. Identical to next().
   */
  def getByte: Byte = next()

  /**
   * Get a single Short from this iterator.
   */
  def getShort(implicit byteOrder: ByteOrder): Short = {
    if (byteOrder == ByteOrder.BIG_ENDIAN)
      ((next() & 0xff) << 8 | (next() & 0xff) << 0).toShort
    else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
      ((next() & 0xff) << 0 | (next() & 0xff) << 8).toShort
    else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
  }

  /**
   * Get a single Int from this iterator.
   */
  def getInt(implicit byteOrder: ByteOrder): Int = {
    if (byteOrder == ByteOrder.BIG_ENDIAN)
      ((next() & 0xff) << 24
        | (next() & 0xff) << 16
        | (next() & 0xff) << 8
        | (next() & 0xff) << 0)
    else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
      ((next() & 0xff) << 0
        | (next() & 0xff) << 8
        | (next() & 0xff) << 16
        | (next() & 0xff) << 24)
    else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
  }

  /**
   * Get a single Long from this iterator.
   */
  def getLong(implicit byteOrder: ByteOrder): Long = {
    if (byteOrder == ByteOrder.BIG_ENDIAN)
      ((next().toLong & 0xff) << 56
        | (next().toLong & 0xff) << 48
        | (next().toLong & 0xff) << 40
        | (next().toLong & 0xff) << 32
        | (next().toLong & 0xff) << 24
        | (next().toLong & 0xff) << 16
        | (next().toLong & 0xff) << 8
        | (next().toLong & 0xff) << 0)
    else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
      ((next().toLong & 0xff) << 0
        | (next().toLong & 0xff) << 8
        | (next().toLong & 0xff) << 16
        | (next().toLong & 0xff) << 24
        | (next().toLong & 0xff) << 32
        | (next().toLong & 0xff) << 40
        | (next().toLong & 0xff) << 48
        | (next().toLong & 0xff) << 56)
    else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
  }

  /**
   * Get a Long from this iterator where only the least significant `n`
   * bytes were encoded.
   */
  def getLongPart(n: Int)(implicit byteOrder: ByteOrder): Long = {
    if (byteOrder == ByteOrder.BIG_ENDIAN) {
      var x = 0L
      (1 to n) foreach (_ ⇒ x = (x << 8) | (next() & 0xff))
      x
    } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) {
      var x = 0L
      (0 until n) foreach (i ⇒ x |= (next() & 0xff) << 8 * i)
      x
    } else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
  }

  def getFloat(implicit byteOrder: ByteOrder): Float =
    java.lang.Float.intBitsToFloat(getInt(byteOrder))

  def getDouble(implicit byteOrder: ByteOrder): Double =
    java.lang.Double.longBitsToDouble(getLong(byteOrder))

  /**
   * Get a specific number of Bytes from this iterator. In contrast to
   * copyToArray, this method will fail if this.len < xs.length.
   */
  def getBytes(xs: Array[Byte]): this.type = getBytes(xs, 0, xs.length)

  /**
   * Get a specific number of Bytes from this iterator. In contrast to
   * copyToArray, this method will fail if length < n or if (xs.length - offset) < n.
   */
  def getBytes(xs: Array[Byte], offset: Int, n: Int): this.type

  /**
   * Get a number of Shorts from this iterator.
   */
  def getShorts(xs: Array[Short])(implicit byteOrder: ByteOrder): this.type =
    getShorts(xs, 0, xs.length)(byteOrder)

  /**
   * Get a number of Shorts from this iterator.
   */
  def getShorts(xs: Array[Short], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type

  /**
   * Get a number of Ints from this iterator.
   */
  def getInts(xs: Array[Int])(implicit byteOrder: ByteOrder): this.type =
    getInts(xs, 0, xs.length)(byteOrder)

  /**
   * Get a number of Ints from this iterator.
   */
  def getInts(xs: Array[Int], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type

  /**
   * Get a number of Longs from this iterator.
   */
  def getLongs(xs: Array[Long])(implicit byteOrder: ByteOrder): this.type =
    getLongs(xs, 0, xs.length)(byteOrder)

  /**
   * Get a number of Longs from this iterator.
   */
  def getLongs(xs: Array[Long], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type

  /**
   * Get a number of Floats from this iterator.
   */
  def getFloats(xs: Array[Float])(implicit byteOrder: ByteOrder): this.type =
    getFloats(xs, 0, xs.length)(byteOrder)

  /**
   * Get a number of Floats from this iterator.
   */
  def getFloats(xs: Array[Float], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type

  /**
   * Get a number of Doubles from this iterator.
   */
  def getDoubles(xs: Array[Double])(implicit byteOrder: ByteOrder): this.type =
    getDoubles(xs, 0, xs.length)(byteOrder)

  /**
   * Get a number of Doubles from this iterator.
   */
  def getDoubles(xs: Array[Double], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type

  /**
   * Copy as many bytes as possible to a ByteBuffer, starting from it's
   * current position. This method will not overflow the buffer.
   *
   * @param buffer a ByteBuffer to copy bytes to
   * @return the number of bytes actually copied
   */
  def copyToBuffer(buffer: ByteBuffer): Int

  /**
   * Directly wraps this ByteIterator in an InputStream without copying.
   * Read and skip operations on the stream will advance the iterator
   * accordingly.
   */
  def asInputStream: java.io.InputStream
}

Other Akka source code examples

Here is a short list of links related to this Akka ByteIterator.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.