Play Framework/Scala example source code file (EnumeratorsSpec.scala)

This example Play Framework source code file (EnumeratorsSpec.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Play Framework (and Scala) source code examples by using tags.

All credit for the original source code belongs to Play Framework; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Play Framework tags/keywords

The EnumeratorsSpec.scala Play Framework example source code

 * Copyright (C) 2009-2013 Typesafe Inc. <>
package play.api.libs.iteratee

import org.specs2.mutable._
import{ ByteArrayInputStream, File, FileOutputStream, OutputStream }
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import play.api.libs.iteratee.Execution.Implicits.{ defaultExecutionContext => dec }
import scala.concurrent.{Promise, Future, Await}
import scala.concurrent.duration.Duration

object EnumeratorsSpec extends Specification
  with IterateeSpecification with ExecutionSpecification {

  "Enumerator's interleave" should {

    "mix it with another enumerator into one" in {
      mustExecute(8) { foldEC =>
        val e1 = Enumerator(List(1), List(3), List(5), List(7))
        val e2 = Enumerator(List(2), List(4), List(6), List(8))
        val e = e1 interleave e2
        val kk = e |>>> Iteratee.fold(List.empty[Int])((r, e: List[Int]) => r ++ e)(foldEC)
        val result = Await.result(kk, Duration.Inf)
        println("interleaved enumerators result is: " + result)
        result.diff(Seq(1, 2, 3, 4, 5, 6, 7, 8)) must equalTo(Seq())

    "yield when both enumerators EOF" in {
      mustExecute(8) { foldEC =>
        val e1 = Enumerator(List(1), List(3), List(5), List(7)) >>> Enumerator.enumInput(Input.EOF)
        val e2 = Enumerator(List(2), List(4), List(6), List(8)) >>> Enumerator.enumInput(Input.EOF)
        val e = e1 interleave e2
        val kk = e |>>> Iteratee.fold(List.empty[Int])((r, e: List[Int]) => r ++ e)(foldEC)
        val result = Await.result(kk, Duration.Inf)
        result.diff(Seq(1, 2, 3, 4, 5, 6, 7, 8)) must equalTo(Seq())

    "yield when iteratee is done!" in {
      mustExecute(7) { foldEC =>
        val e1 = Enumerator(List(1), List(3), List(5), List(7))
        val e2 = Enumerator(List(2), List(4), List(6), List(8))
        val e = e1 interleave e2
        val kk = e |>>> Enumeratee.take(7) &>> Iteratee.fold(List.empty[Int])((r, e: List[Int]) => r ++ e)(foldEC)
        val result = Await.result(kk, Duration.Inf)
        result.length must equalTo(7)

    "not necessarily go alternatively between two enumerators" in {
      mustExecute(1, 2) { (onDoneEC, unfoldEC) =>
        val firstDone = Promise[Unit]
        val e1 = Enumerator(1, 2, 3, 4).onDoneEnumerating(firstDone.success(Unit))(onDoneEC)
        val e2 = Enumerator.unfoldM[Boolean, Int](true) { first => if (first) => Some((false, 5))) else Future.successful(None) }(unfoldEC)
        val result = Await.result((e1 interleave e2) |>>> Iteratee.getChunks[Int], Duration.Inf)
        result must_== Seq(1, 2, 3, 4, 5)


  "Enumerator.enumerate " should {
    "generate an Enumerator from a singleton Iterator" in {
      mustExecute(1) { foldEC =>
        val iterator = scala.collection.Iterator.single[Int](3)
        val futureOfResult = Enumerator.enumerate(iterator) |>>>
          Enumeratee.take(1) &>>
          Iteratee.fold(List.empty[Int])((r, e: Int) => e :: r)(foldEC)
        val result = Await.result(futureOfResult, Duration.Inf)
        result(0) must equalTo(3)
        result.length must equalTo(1)

    "take as much element as in the iterator in the right order" in {
      mustExecute(50) { foldEC =>
        val iterator = scala.collection.Iterator.range(0, 50)
        val futureOfResult = Enumerator.enumerate(iterator) |>>>
          Enumeratee.take(100) &>>
          Iteratee.fold(Seq.empty[Int])((r, e: Int) => r :+ e)(foldEC)
        val result = Await.result(futureOfResult, Duration.Inf)
        result.length must equalTo(50)
        result(0) must equalTo(0)
        result(49) must equalTo(49)
    "work with Seq too" in {
      mustExecute(6) { foldEC =>
        val seq = List(1, 2, 3, 7, 42, 666)
        val futureOfResult = Enumerator.enumerate(seq) |>>>
          Enumeratee.take(100) &>>
          Iteratee.fold(Seq.empty[Int])((r, e: Int) => r :+ e)(foldEC)
        val result = Await.result(futureOfResult, Duration.Inf)
        result.length must equalTo(6)
        result(0) must equalTo(1)
        result(4) must equalTo(42)

  /*"Enumerator's PatchPanel" should {

  "allow to patch in different Enumerators" in {
      import play.api.libs.concurrent.Promise
    val pp = Promise[Concurrent.PatchPanel[Int]]()
    val e = Concurrent.patchPanel[Int](p => pp.redeem(p))
    val i1 = Iteratee.fold[Int,Int](0){(s,i) => println(i);s+i}
    val sum = e |>> i1
    val p = pp.future.await.get
    sum.flatMap( must equalTo(10)


  "Enumerator.apply" should {
    "enumerate zero args" in {
    "enumerate 1 arg" in {
    "enumerate more than 1 arg" in {
      mustEnumerateTo(1, 2)(Enumerator(1, 2))
      mustEnumerateTo(1, 2, 3)(Enumerator(1, 2, 3))

  "Enumerator" should {
    "call onDoneEnumerating callback" in {
      mustExecute(1) { onDoneEC =>
        val count = new java.util.concurrent.atomic.AtomicInteger()
        mustEnumerateTo(1, 2, 3)(Enumerator(1, 2, 3).onDoneEnumerating(count.incrementAndGet())(onDoneEC))
        count.get() must equalTo(1)

    "call onDoneEnumerating callback when an error is encountered" in {
      mustExecute(1) { onDoneEC =>
        val count = new java.util.concurrent.atomic.AtomicInteger()
          Enumerator(1, 2, 3).onDoneEnumerating(count.incrementAndGet())(onDoneEC)
        count.get() must_== 1
    "transform input elements with map" in {
      mustExecute(3) { mapEC =>
        mustEnumerateTo(2, 4, 6)(Enumerator(1, 2, 3).map(_ * 2)(mapEC))
    "transform input with map" in {
      mustExecute(3) { mapEC =>
        mustEnumerateTo(2, 4, 6)(Enumerator(1, 2, 3).mapInput( * 2))(mapEC))
    "be transformed to another Enumerator using flatMap" in {
      mustExecute(3, 30) { (flatMapEC, foldEC) =>
        val e = Enumerator(10, 20, 30).flatMap(i => Enumerator((i until i + 10): _*))(flatMapEC)
        val it = Iteratee.fold[Int, Int](0)((sum, x) => sum + x)(foldEC)
        Await.result(e |>>> it, Duration.Inf) must equalTo((10 until 40).sum)


  "Enumerator.generateM" should {
    "generate a stream of values until the expression is None" in {
      mustExecute(12, 11) { (generateEC, foldEC) =>
        val a = (0 to 10).toList
        val it = a.iterator

        val enumerator = Enumerator.generateM(Future(if (it.hasNext) Some( else None))(generateEC)

        Await.result(enumerator |>>> Iteratee.fold[Int, String]("")(_ + _)(foldEC), Duration.Inf) must equalTo("012345678910")

    "Can be composed with another enumerator (doesn't send EOF)" in {
      mustExecute(12, 12) { (generateEC, foldEC) =>
        val a = (0 to 10).toList
        val it = a.iterator

        val enumerator = Enumerator.generateM(Future(if (it.hasNext) Some( else None))(generateEC) >>> Enumerator(12)

        Await.result(enumerator |>>> Iteratee.fold[Int, String]("")(_ + _)(foldEC), Duration.Inf) must equalTo("01234567891012")


  "Enumerator.callback1" should {
    "Call onError on iteratee's error state" in {
      val it = Error[String]("foo", Input.Empty)
      val errorCount = new AtomicInteger(0)

      val enum = Enumerator.fromCallback1[String](
        b => Future.successful(None),
        () => (),
        (msg, input) =>

      val result = enum |>>> it

      Await.ready(result, Duration(30, TimeUnit.SECONDS))
      errorCount.get() must equalTo(1)

    "Call onError on future failure" in {
      val it1 = Iteratee.fold1[String, String](Future.successful(""))((_, _) => Future.failed(new RuntimeException()))
      val it2 = Iteratee.fold1[String, String](Future.failed(new RuntimeException()))((_, _) => Future.failed(new RuntimeException()))
      val errorCount = new AtomicInteger(0)

      val enum = Enumerator.fromCallback1[String](
        b => Future.successful(Some("")),
        () => (),
        (msg, input) =>

      val result1 = enum |>>> it1
      val result2 = enum |>>> it2
      Await.ready(, Duration(2, TimeUnit.SECONDS))

      errorCount.get() must equalTo(2)

    "generate a stream of values until the expression is None" in {
      mustExecute(5) { callbackEC =>
        val it = (1 to 3).iterator // FIXME: Probably not thread-safe
        val completeCount = new AtomicInteger(0)
        val completeDone = new CountDownLatch(1)
        val errorCount = new AtomicInteger(0)
        val enumerator = Enumerator.fromCallback1(
            b => Future(if (it.hasNext) Some((b, else None),
            () => {
            (_: String, _: Input[(Boolean, Int)]) => errorCount.incrementAndGet())(callbackEC)
        mustEnumerateTo((true, 1), (false, 2), (false, 3))(enumerator)
        completeDone.await(30, TimeUnit.SECONDS) must beTrue
        completeCount.get() must equalTo(1)
        errorCount.get() must equalTo(0)

  "Enumerator.fromStream" should {
    "read bytes from a stream" in {
      mustExecute(3) { fromStreamEC =>
        val s = "hello"
        val enumerator = Enumerator.fromStream(new ByteArrayInputStream(s.getBytes))(fromStreamEC).map(new String(_))
    "close the stream" in {
      class CloseableByteArrayInputStream(bytes: Array[Byte]) extends ByteArrayInputStream(bytes) {
        @volatile var closed = false

        override def close() = {
          closed = true

      "when done normally" in {
        val stream = new CloseableByteArrayInputStream(Array.empty)
        mustExecute(2) { fromStreamEC =>
          Await.result(Enumerator.fromStream(stream)(fromStreamEC)(Iteratee.ignore), Duration.Inf)
          stream.closed must beTrue
      "when completed abnormally" in {
        val stream = new CloseableByteArrayInputStream("hello".getBytes)
        mustExecute(2) { fromStreamEC =>
          stream.closed must beTrue

  "Enumerator.fromFile" should {
    "read bytes from a file" in {
      mustExecute(3) { fromFileEC =>
        val f = File.createTempFile("EnumeratorSpec", "fromFile")
        try {
          val s = "hello"
          val out = new FileOutputStream(f)
          val enumerator = Enumerator.fromFile(f)(fromFileEC).map(new String(_))
        } finally {

  "Enumerator.unfoldM" should {
    "Can be composed with another enumerator (doesn't send EOF)" in {
      mustExecute(12, 12) { (foldEC, unfoldEC) =>
        val enumerator = Enumerator.unfoldM[Int, Int](0)(s => Future(if (s > 10) None else Some((s + 1, s + 1))))(unfoldEC) >>> Enumerator(12)

        Await.result(enumerator |>>> Iteratee.fold[Int, String]("")(_ + _)(foldEC), Duration.Inf) must equalTo("123456789101112")

  "Enumerator.unfold" should {
    "unfolds a value into input for an enumerator" in {
      mustExecute(5) { unfoldEC =>
        val enumerator = Enumerator.unfold[Int, Int](0)(s => if (s > 3) None else Some((s + 1, s)))(unfoldEC)
        mustEnumerateTo(0, 1, 2, 3)(enumerator)

  "Enumerator.repeat" should {
    "supply input from a by-name arg" in {
      mustExecute(3) { repeatEC =>
        val count = new AtomicInteger(0)
        val fut = Enumerator.repeat(count.incrementAndGet())(repeatEC) |>>> (Enumeratee.take(3) &>> Iteratee.getChunks[Int])
        Await.result(fut, Duration.Inf) must equalTo(List(1, 2, 3))

  "Enumerator.repeatM" should {
    "supply input from a by-name arg" in {
      mustExecute(3) { repeatEC =>
        val count = new AtomicInteger(0)
        val fut = Enumerator.repeatM(Future.successful(count.incrementAndGet()))(repeatEC) |>>> (Enumeratee.take(3) &>> Iteratee.getChunks[Int])
        Await.result(fut, Duration.Inf) must equalTo(List(1, 2, 3))

  "Enumerator.outputStream" should {
    "produce the same value written in the OutputStream" in {
      mustExecute(1, 2) { (outputEC, foldEC) =>
        val a = "FOO"
        val b = "bar"
        val enumerator = Enumerator.outputStream { outputStream =>
        val promise = (enumerator |>>> Iteratee.fold[Array[Byte], Array[Byte]](Array[Byte]())(_ ++ _)(foldEC))
        Await.result(promise, Duration.Inf).map(_.toChar).foldLeft("")(_ + _) must equalTo(a + b)

    "not block" in {
      mustExecute(1) { outputEC =>
        var os: OutputStream = null
        val osReady = new CountDownLatch(1)
        val enumerator = Enumerator.outputStream { o => os = o; osReady.countDown() }(outputEC)
        val promiseIteratee = Promise[Iteratee[Array[Byte], Array[Byte]]]
        val future = enumerator |>>> Iteratee.flatten(promiseIteratee.future)
        osReady.await(30, TimeUnit.SECONDS) must beTrue
        // os should now be set
        os.write(" ".getBytes)
        Await.result(future, Duration("10s")) must_== "hello world".getBytes


