alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Play Framework/Scala example source code file (Enumeratee.scala)

This example Play Framework source code file (Enumeratee.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Play Framework (and Scala) source code examples by using tags.

All credit for the original source code belongs to Play Framework; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Play Framework tags/keywords

a, api, checkdone, concurrent, cont, e, enumeratee, executioncontext, input, iterate, iteratee, k, library, play, play framework, to

The Enumeratee.scala Play Framework example source code

/*
 * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
 */
package play.api.libs.iteratee

import play.api.libs.iteratee.Execution.Implicits.{ defaultExecutionContext => dec }
import play.api.libs.iteratee.internal.{ executeIteratee, executeFuture }
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
import scala.concurrent.{ ExecutionContext, Future }

/**
 * Combines the roles of an Iteratee[From] and a Enumerator[To].  This allows adapting of streams to that modify input
 * produced by an Enumerator, or to be consumed by a Iteratee.
 */
trait Enumeratee[From, To] {
  parent =>

  /**
   * Create a new Iteratee that feeds its input, potentially modifying it along the way, into the inner Iteratee, and
   * produces that Iteratee as its result.
   */
  def applyOn[A](inner: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]]

  /**
   * Alias for `applyOn`
   */
  def apply[A](inner: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]] = applyOn[A](inner)

  /**
   * Transform the given iteratee into an iteratee that accepts the input type that this enumeratee maps.
   */
  def transform[A](inner: Iteratee[To, A]): Iteratee[From, A] = apply(inner).joinI

  /**
   * Alias for `transform`
   */
  def &>>[A](inner: Iteratee[To, A]): Iteratee[From, A] = transform(inner)

  /**
   * Alias for `apply`
   */
  def &>[A](inner: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]] = apply(inner)

  /**
   * Compose this Enumeratee with another Enumeratee
   */
  def compose[To2](other: Enumeratee[To, To2]): Enumeratee[From, To2] = {
    new Enumeratee[From, To2] {
      def applyOn[A](iteratee: Iteratee[To2, A]): Iteratee[From, Iteratee[To2, A]] = {
        parent.applyOn(other.applyOn(iteratee)).joinI
      }
    }
  }

  /**
   * Compose this Enumeratee with another Enumeratee
   */
  def ><>[To2](other: Enumeratee[To, To2]): Enumeratee[From, To2] = compose(other)

  /**
   * Compose this Enumeratee with another Enumeratee, concatenating any input left by both Enumeratees when they
   * are done.
   */
  def composeConcat[X](other: Enumeratee[To, To])(implicit p: To => scala.collection.TraversableLike[X, To], bf: scala.collection.generic.CanBuildFrom[To, X, To]): Enumeratee[From, To] = {
    new Enumeratee[From, To] {
      def applyOn[A](iteratee: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]] = {
        parent.applyOn(other.applyOn(iteratee).joinConcatI)
      }
    }
  }

  /**
   * Alias for `composeConcat`
   */
  def >+>[X](other: Enumeratee[To, To])(implicit p: To => scala.collection.TraversableLike[X, To], bf: scala.collection.generic.CanBuildFrom[To, X, To]): Enumeratee[From, To] = composeConcat[X](other)

}

/**
 * @define paramEcSingle @param ec The context to execute the supplied function with. The context is prepared on the calling thread before being used.
 * @define paramEcMultiple @param ec The context to execute the supplied functions with. The context is prepared on the calling thread before being used.
 */
object Enumeratee {

  /**
   * An Enumeratee that checks to ensure that the passed in Iteratee is not done before doing any work.
   */
  trait CheckDone[From, To] extends Enumeratee[From, To] {

    def continue[A](k: Input[To] => Iteratee[To, A]): Iteratee[From, Iteratee[To, A]]

    def applyOn[A](it: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]] =
      it.pureFlatFold[From, Iteratee[To, A]] {
        case Step.Cont(k) => continue(k)
        case _ => Done(it, Input.Empty)
      }(dec)
  }

  /**
   * flatten a [[scala.concurrent.Future]] of [[play.api.libs.iteratee.Enumeratee]]] into an Enumeratee
   *
   * @param futureOfEnumeratee a future of enumeratee
   */
  def flatten[From, To](futureOfEnumeratee: Future[Enumeratee[From, To]]) = new Enumeratee[From, To] {
    def applyOn[A](it: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]] =
      Iteratee.flatten(futureOfEnumeratee.map(_.applyOn[A](it))(dec))
  }

  /**
   * Create an Enumeratee that zips two Iteratees together.
   *
   * Each input gets passed to each Iteratee, and the result is a tuple of both of their results.
   *
   * If either Iteratee encounters an error, the result will be an error.
   *
   * The Enumeratee will continue consuming input until both inner Iteratees are done.  If one inner Iteratee finishes
   * before the other, the result of that Iteratee is held, and the one continues by itself, until it too is finished.
   */
  def zip[E, A, B](inner1: Iteratee[E, A], inner2: Iteratee[E, B]): Iteratee[E, (A, B)] = zipWith(inner1, inner2)((_, _))(dec)

  /**
   * Create an Enumeratee that zips two Iteratees together, using the passed in zipper function to combine the results
   * of the two.
   *
   * @param inner1 The first Iteratee to combine.
   * @param inner2 The second Iteratee to combine.
   * @param zipper Used to combine the results of each Iteratee.
   * $paramEcSingle
   */
  def zipWith[E, A, B, C](inner1: Iteratee[E, A], inner2: Iteratee[E, B])(zipper: (A, B) => C)(implicit ec: ExecutionContext): Iteratee[E, C] = {
    val pec = ec.prepare()
    import Execution.Implicits.{ defaultExecutionContext => ec } // Shadow ec to make this the only implicit EC in scope

    def getNext(it1: Iteratee[E, A], it2: Iteratee[E, B]): Iteratee[E, C] = {
      val eventuallyIter =
        for (
          (a1, it1_) <- getInside(it1);
          (a2, it2_) <- getInside(it2)
        ) yield checkDone(a1, a2) match {
          case Left((msg, in)) => Error(msg, in)
          case Right(None) => Cont(step(it1_, it2_))
          case Right(Some(Left(Left(a)))) => it2_.map(b => zipper(a, b))(pec)
          case Right(Some(Left(Right(b)))) => it1_.map(a => zipper(a, b))(pec)
          case Right(Some(Right(((a, b), e)))) => executeIteratee(Done(zipper(a, b), e))(pec)
        }

      Iteratee.flatten(eventuallyIter)
    }

    def step(it1: Iteratee[E, A], it2: Iteratee[E, B])(in: Input[E]) = {
      Iteratee.flatten(
        for (
          it1_ <- it1.feed(in);
          it2_ <- it2.feed(in)
        ) yield getNext(it1_, it2_))

    }

    def getInside[T](it: Iteratee[E, T]): Future[(Option[Either[(String, Input[E]), (T, Input[E])]], Iteratee[E, T])] = {
      it.pureFold {
        case Step.Done(a, e) => Some(Right((a, e)))
        case Step.Cont(k) => None
        case Step.Error(msg, e) => Some(Left((msg, e)))
      }(dec).map(r => (r, it))(dec)

    }

    def checkDone(x: Option[Either[(String, Input[E]), (A, Input[E])]], y: Option[Either[(String, Input[E]), (B, Input[E])]]): Either[(String, Input[E]), Option[Either[Either[A, B], ((A, B), Input[E])]]] =
      (x, y) match {
        case (Some(Right((a, e1))), Some(Right((b, e2)))) => Right(Some(Right(((a, b), e1 /* FIXME: should calculate smalled here*/ ))))
        case (Some(Left((msg, e))), _) => Left((msg, e))
        case (_, Some(Left((msg, e)))) => Left((msg, e))
        case (Some(Right((a, _))), None) => Right(Some(Left(Left(a))))
        case (None, Some(Right((b, _)))) => Right(Some(Left(Right(b))))
        case (None, None) => Right(None)

      }
    getNext(inner1, inner2)

  }

  /**
   * A partially-applied function returned by the `mapInput` method.
   */
  trait MapInput[From] {
    /**
     * @param f Used to transform each input element.
     * $paramEcSingle
     */
    def apply[To](f: Input[From] => Input[To])(implicit ec: ExecutionContext): Enumeratee[From, To]
  }

  /**
   * Create an Enumeratee that transforms its input using the given function.
   *
   * This is like the `map` function, except that it allows the Enumeratee to, for example, send EOF to the inner
   * iteratee before EOF is encountered.
   */
  def mapInput[From] = new MapInput[From] {
    def apply[To](f: Input[From] => Input[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] {
      val pec = ec.prepare()

      def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
        case in @ (Input.El(_) | Input.Empty) => new CheckDone[From, To] {
          def continue[A](k: K[To, A]) = Cont(step(k))
        } &> Iteratee.flatten(Future(f(in))(pec).map(in => k(in))(dec))

        case Input.EOF => Done(Cont(k), Input.EOF)
      }

      def continue[A](k: K[To, A]) = Cont(step(k))
    }
  }

  /**
   * A partially-applied function returned by the `mapConcatInput` method.
   */
  trait MapConcatInput[From] {
    /**
     * @param f Used to transform each input element into a sequence of inputs.
     * $paramEcSingle
     */
    def apply[To](f: From => Seq[Input[To]])(implicit ec: ExecutionContext): Enumeratee[From, To]
  }

  /**
   * Create an enumeratee that transforms its input into a sequence of inputs for the target iteratee.
   */
  def mapConcatInput[From] = new MapConcatInput[From] {
    def apply[To](f: From => Seq[Input[To]])(implicit ec: ExecutionContext) = mapFlatten[From](in => Enumerator.enumerateSeq2(f(in)))(ec)
  }

  /**
   * A partially-applied function returned by the `mapConcat` method.
   */
  trait MapConcat[From] {
    /**
     * @param f Used to transform each input element into a sequence of input elements.
     * $paramEcSingle
     */
    def apply[To](f: From => Seq[To])(implicit ec: ExecutionContext): Enumeratee[From, To]
  }

  /**
   * Create an Enumeratee that transforms its input elements into a sequence of input elements for the target Iteratee.
   */
  def mapConcat[From] = new MapConcat[From] {
    def apply[To](f: From => Seq[To])(implicit ec: ExecutionContext) = mapFlatten[From](in => Enumerator.enumerateSeq1(f(in)))(ec)
  }

  /**
   * A partially-applied function returned by the `mapFlatten` method.
   */
  trait MapFlatten[From] {
    /**
     * @param f Used to transform each input element into an Enumerator.
     * $paramEcSingle
     */
    def apply[To](f: From => Enumerator[To])(implicit ec: ExecutionContext): Enumeratee[From, To]
  }

  /**
   * Create an Enumeratee that transforms its input elements into an Enumerator that is fed into the target Iteratee.
   */
  def mapFlatten[From] = new MapFlatten[From] {
    def apply[To](f: From => Enumerator[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] {
      val pec = ec.prepare()

      def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
        case Input.El(e) =>
          new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(e))(pec).flatMap(_.apply(Cont(k)))(dec))

        case Input.Empty =>
          new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> k(Input.Empty)

        case Input.EOF => Done(Cont(k), Input.EOF)
      }

      def continue[A](k: K[To, A]) = Cont(step(k))
    }
  }

  /**
   * A partially-applied function returned by the `mapInputFlatten` method.
   */
  trait MapInputFlatten[From] {
    /**
     * @param f Used to transform each input into an Enumerator.
     * $paramEcSingle
     */
    def apply[To](f: Input[From] => Enumerator[To])(implicit ec: ExecutionContext): Enumeratee[From, To]
  }

  /**
   * Create an Enumeratee that transforms its input into an Enumerator that is fed into the target Iteratee.
   */
  def mapInputFlatten[From] = new MapInputFlatten[From] {
    def apply[To](f: Input[From] => Enumerator[To])(implicit ec: ExecutionContext) = new CheckDone[From, To] {
      val pec = ec.prepare()

      def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
        case in =>
          new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(Future(f(in))(pec).flatMap(_.apply(Cont(k)))(dec))
      }

      def continue[A](k: K[To, A]) = Cont(step(k))
    }
  }

  /**
   * A partially-applied function returned by the `mapInputM` method.
   */
  trait MapInputM[From] {
    /**
     * @param f Used to transform each input.
     * $paramEcSingle
     */
    def apply[To](f: Input[From] => Future[Input[To]])(implicit ec: ExecutionContext): Enumeratee[From, To]
  }

  /**
   * Like `mapInput`, but allows the map function to asynchronously return the mapped input.
   */
  def mapInputM[From] = new MapInputM[From] {
    def apply[To](f: Input[From] => Future[Input[To]])(implicit ec: ExecutionContext) = new CheckDone[From, To] {
      val pec = ec.prepare()

      def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {
        case in @ (Input.El(_) | Input.Empty) =>
          new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> Iteratee.flatten(executeFuture(f(in))(pec).map(k(_))(dec))

        case Input.EOF => Done(Cont(k), Input.EOF)
      }

      def continue[A](k: K[To, A]) = Cont(step(k))
    }
  }

  /**
   * A partially-applied function returned by the `mapM` method.
   */
  trait MapM[E] {
    /**
     * @param f Used to transform each input element.
     * $paramEcSingle
     */
    def apply[NE](f: E => Future[NE])(implicit ec: ExecutionContext): Enumeratee[E, NE]
  }

  /**
   * Like `map`, but allows the map function to asynchronously return the mapped element.
   */
  def mapM[E] = new MapM[E] {
    def apply[NE](f: E => Future[NE])(implicit ec: ExecutionContext): Enumeratee[E, NE] = mapInputM[E] {
      case Input.Empty => Future.successful(Input.Empty)
      case Input.EOF => Future.successful(Input.EOF)
      case Input.El(e) => f(e).map(Input.El(_))(dec)
    }(ec)
  }

  /**
   * A partially-applied function returned by the `map` method.
   */
  trait Map[E] {
    /**
     * @param f A function to transform input elements.
     * $paramEcSingle
     */
    def apply[NE](f: E => NE)(implicit ec: ExecutionContext): Enumeratee[E, NE]
  }

  /**
   * Create an Enumeratee which transforms its input using a given function
   */
  def map[E] = new Map[E] {
    def apply[NE](f: E => NE)(implicit ec: ExecutionContext): Enumeratee[E, NE] = mapInput[E](in => in.map(f))(ec)
  }

  /**
   * Create an Enumeratee that will take `count` input elements to pass to the target Iteratee, and then be done
   *
   * @param count The number of elements to take
   */
  def take[E](count: Int): Enumeratee[E, E] = new CheckDone[E, E] {

    def step[A](remaining: Int)(k: K[E, A]): K[E, Iteratee[E, A]] = {

      case in @ Input.El(_) if remaining == 1 => Done(k(in), Input.Empty)

      case in @ Input.El(_) if remaining > 1 =>
        new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(remaining - 1)(k)) } &> k(in)

      case Input.Empty if remaining > 0 =>
        new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(remaining)(k)) } &> k(Input.Empty)

      case Input.EOF => Done(Cont(k), Input.EOF)

      case in => Done(Cont(k), in)
    }

    def continue[A](k: K[E, A]) = if (count <= 0) Done(Cont(k), Input.EOF) else Cont(step(count)(k))

  }

  /**
   * A partially-applied function returned by the `scanLeft` method.
   */
  trait ScanLeft[From] {
    def apply[To](seed: To)(f: (To, From) => To): Enumeratee[From, To]
  }

  def scanLeft[From] = new ScanLeft[From] {

    def apply[To](seed: To)(f: (To, From) => To): Enumeratee[From, To] = new CheckDone[From, To] {

      def step[A](lastTo: To)(k: K[To, A]): K[From, Iteratee[To, A]] = {

        case in @ Input.El(e) =>
          val next = f(lastTo, e)
          new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(next)(k)) } &> k(Input.El(next))

        case Input.Empty =>
          new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(lastTo)(k)) } &> k(Input.Empty)

        case Input.EOF => Done(Cont(k), Input.EOF)

      }

      def continue[A](k: K[To, A]) = Cont(step(seed)(k))
    }
  }

  /**
   * A partially-applied function returned by the `grouped` method.
   */
  trait Grouped[From] {
    def apply[To](folder: Iteratee[From, To]): Enumeratee[From, To]
  }

  /**
   * Create an Enumeratee that groups input using the given Iteratee.
   *
   * This will apply that Iteratee over and over, passing the result each time as the input for the target Iteratee,
   * until EOF is reached.  For example, let's say you had an Iteratee that took a stream of characters and parsed a
   * single line:
   *
   * {{{
   * def takeLine = for {
   *   line <- Enumeratee.takeWhile[Char](_ != '\n') &>> Iteratee.getChunks
   *   _    <- Enumeratee.take(1) &>> Iteratee.ignore[Char]
   * } yield line.mkString
   * }}}
   *
   * This could be used to build an Enumeratee that converts a stream of characters into a stream of lines:
   *
   * {{{
   * def asLines = Enumeratee.grouped(takeLine)
   * }}}
   */
  def grouped[From] = new Grouped[From] {

    def apply[To](folder: Iteratee[From, To]): Enumeratee[From, To] = new CheckDone[From, To] {

      def step[A](f: Iteratee[From, To])(k: K[To, A]): K[From, Iteratee[To, A]] = {

        case in @ (Input.El(_) | Input.Empty) =>

          Iteratee.flatten(f.feed(in)).pureFlatFold {
            case Step.Done(a, left) => new CheckDone[From, To] {
              def continue[A](k: K[To, A]) =
                (left match {
                  case Input.El(_) => step(folder)(k)(left)
                  case _ => Cont(step(folder)(k))
                })
            } &> k(Input.El(a))
            case Step.Cont(kF) => Cont(step(Cont(kF))(k))
            case Step.Error(msg, e) => Error(msg, in)
          }(dec)

        case Input.EOF => Iteratee.flatten(f.run.map[Iteratee[From, Iteratee[To, A]]]((c: To) => Done(k(Input.El(c)), Input.EOF))(dec))

      }

      def continue[A](k: K[To, A]) = Cont(step(folder)(k))
    }
  }

  /**
   * Create an Enumeratee that filters the inputs using the given predicate
   *
   * @param predicate A function to filter the input elements.
   * $paramEcSingle
   */
  def filter[E](predicate: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = new CheckDone[E, E] {
    val pec = ec.prepare()

    def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = {

      case in @ Input.El(e) => Iteratee.flatten(Future(predicate(e))(pec).map { b =>
        if (b) (new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in)) else Cont(step(k))
      }(dec))

      case Input.Empty =>
        new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(Input.Empty)

      case Input.EOF => Done(Cont(k), Input.EOF)

    }

    def continue[A](k: K[E, A]) = Cont(step(k))

  }

  /**
   * Create an Enumeratee that filters the inputs using the negation of the given predicate
   *
   * @param predicate A function to filter the input elements.
   * $paramEcSingle
   */
  def filterNot[E](predicate: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = filter[E](e => !predicate(e))(ec)

  /**
   * A partially-applied function returned by the `collect` method.
   */
  trait Collect[From] {
    /**
     * @param transformer A function to transform and filter the input elements with.
     * $paramSingleEc
     */
    def apply[To](transformer: PartialFunction[From, To])(implicit ec: ExecutionContext): Enumeratee[From, To]
  }

  /**
   * Create an Enumeratee that both filters and transforms its input. The input is transformed by the given
   * PartialFunction. If the PartialFunction isn't defined for an input element then that element is discarded.
   */
  def collect[From] = new Collect[From] {
    def apply[To](transformer: PartialFunction[From, To])(implicit ec: ExecutionContext): Enumeratee[From, To] = new CheckDone[From, To] {
      val pec = ec.prepare()

      def step[A](k: K[To, A]): K[From, Iteratee[To, A]] = {

        case in @ Input.El(e) => Iteratee.flatten(Future {
          if (transformer.isDefinedAt(e)) {
            new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> k(Input.El(transformer(e)))
          } else {
            Cont(step(k))
          }
        }(pec))

        case Input.Empty =>
          new CheckDone[From, To] { def continue[A](k: K[To, A]) = Cont(step(k)) } &> k(Input.Empty)

        case Input.EOF => Done(Cont(k), Input.EOF)

      }

      def continue[A](k: K[To, A]) = Cont(step(k))

    }
  }

  def drop[E](count: Int): Enumeratee[E, E] = new CheckDone[E, E] {

    def step[A](remaining: Int)(k: K[E, A]): K[E, Iteratee[E, A]] = {

      case in @ Input.El(_) if remaining == 1 => passAlong[E](Cont(k))

      case in @ Input.El(_) if remaining > 1 => Cont(step(remaining - 1)(k))

      case Input.Empty if remaining > 0 => Cont(step(remaining)(k))

      case Input.EOF => Done(Cont(k), Input.EOF)

      case in => passAlong[E] &> k(in)

    }

    def continue[A](k: K[E, A]) = Cont(step(count)(k))

  }

  /**
   * Create an Enumeratee that drops input until a predicate is satisfied.
   *
   * @param f A predicate to test the input with.
   * $paramEcSingle
   */
  def dropWhile[E](p: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = {
    val pec = ec.prepare()
    new CheckDone[E, E] {

      def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = {

        case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map {
          b => if (b) Cont(step(k)) else (passAlong[E] &> k(in))
        }(dec))

        case Input.Empty => Cont(step(k))

        case Input.EOF => Done(Cont(k), Input.EOF)

      }

      def continue[A](k: K[E, A]) = Cont(step(k))

    }
  }

  /**
   * Create an Enumeratee that passes input through while a predicate is satisfied. Once the predicate
   * fails, no more input is passed through.
   *
   * @param f A predicate to test the input with.
   * $paramEcSingle
   */
  def takeWhile[E](p: E => Boolean)(implicit ec: ExecutionContext): Enumeratee[E, E] = {
    val pec = ec.prepare()
    new CheckDone[E, E] {

      def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = {

        case in @ Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map {
          b => if (b) (new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in)) else Done(Cont(k), in)
        }(dec))

        case Input.Empty =>
          new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(Input.Empty)

        case Input.EOF => Done(Cont(k), Input.EOF)
      }

      def continue[A](k: K[E, A]) = Cont(step(k))

    }
  }

  /**
   * Create an Enumeratee that passes input through until a predicate is satisfied. Once the predicate
   * is satisfied, no more input is passed through.
   *
   * @param f A predicate to test the input with.
   * $paramEcSingle
   */
  def breakE[E](p: E => Boolean)(implicit ec: ExecutionContext) = new Enumeratee[E, E] {
    val pec = ec.prepare()
    def applyOn[A](inner: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = {
      def step(inner: Iteratee[E, A])(in: Input[E]): Iteratee[E, Iteratee[E, A]] = in match {
        case Input.El(e) => Iteratee.flatten(Future(p(e))(pec).map(b => if (b) Done(inner, in) else stepNoBreak(inner)(in))(dec))
        case _ => stepNoBreak(inner)(in)
      }
      def stepNoBreak(inner: Iteratee[E, A])(in: Input[E]): Iteratee[E, Iteratee[E, A]] =
        inner.pureFlatFold {
          case Step.Cont(k) => {
            val next = k(in)
            next.pureFlatFold {
              case Step.Cont(k) => Cont(step(next))
              case _ => Done(inner, in)
            }(dec)
          }
          case _ => Done(inner, in)
        }(dec)
      Cont(step(inner))
    }
  }

  /**
   * An enumeratee that passes all input through until EOF is reached, redeeming the final iteratee with EOF as the
   * left over input.
   */
  def passAlong[M] = new Enumeratee.CheckDone[M, M] {

    def step[A](k: K[M, A]): K[M, Iteratee[M, A]] = {

      case in @ (Input.El(_) | Input.Empty) => new Enumeratee.CheckDone[M, M] { def continue[A](k: K[M, A]) = Cont(step(k)) } &> k(in)

      case Input.EOF => Done(Cont(k), Input.EOF)
    }
    def continue[A](k: K[M, A]) = Cont(step(k))
  }

  def heading[E](es: Enumerator[E]) = new Enumeratee[E, E] {

    def applyOn[A](it: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = passAlong[E] &> Iteratee.flatten(es(it))

  }

  def trailing[M](es: Enumerator[M]) = new Enumeratee.CheckDone[M, M] {

    def step[A](k: K[M, A]): K[M, Iteratee[M, A]] = {

      case in @ (Input.El(_) | Input.Empty) => new Enumeratee.CheckDone[M, M] { def continue[A](k: K[M, A]) = Cont(step(k)) } &> k(in)

      case Input.EOF => Iteratee.flatten((es |>> Cont(k)).map[Iteratee[M, Iteratee[M, A]]](it => Done(it, Input.EOF))(dec))
    }
    def continue[A](k: K[M, A]) = Cont(step(k))
  }

  /**
   * Create an Enumeratee that performs an action when its Iteratee is done.
   *
   * @param action The action to perform.
   * $paramEcSingle
   */
  def onIterateeDone[E](action: () => Unit)(implicit ec: ExecutionContext): Enumeratee[E, E] = new Enumeratee[E, E] {
    val pec = ec.prepare()

    def applyOn[A](iteratee: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = passAlong[E](iteratee).map(_.map { a => action(); a }(pec))(dec)

  }

  /**
   * Create an Enumeratee that performs an action on EOF.
   *
   * @param action The action to perform.
   * $paramEcSingle
   */
  def onEOF[E](action: () => Unit)(implicit ec: ExecutionContext): Enumeratee[E, E] = new CheckDone[E, E] {
    val pec = ec.prepare()

    def step[A](k: K[E, A]): K[E, Iteratee[E, A]] = {

      case Input.EOF =>
        Iteratee.flatten(Future(action())(pec).map(_ => Done[E, Iteratee[E, A]](Cont(k), Input.EOF))(dec))

      case in =>
        new CheckDone[E, E] { def continue[A](k: K[E, A]) = Cont(step(k)) } &> k(in)
    }

    def continue[A](k: K[E, A]) = Cont(step(k))

  }

  /**
   * Create an Enumeratee that recovers an iteratee in Error state.
   *
   * This will ignore the input that caused the iteratee's error state
   * and use the previous state of the iteratee to handle the next input.
   *
   * {{{
   *  Enumerator(0, 2, 4) &> Enumeratee.recover { (error, input) =>
   *    Logger.error(f"oops failure occurred with input: $input", error)
   *  } &> Enumeratee.map { i =>
   *    8 / i
   *  } |>>> Iteratee.getChunks // => List(4, 2)
   * }}}
   *
   * @param f Called when an error occurs with the cause of the error and the input associated with the error.
   * $paramEcSingle
   */
  def recover[E](f: (Throwable, Input[E]) => Unit = (_: Throwable, _: Input[E]) => ())(implicit ec: ExecutionContext): Enumeratee[E, E] = {
    val pec = ec.prepare()
    new Enumeratee[E, E] {

      def applyOn[A](it: Iteratee[E, A]): Iteratee[E, Iteratee[E, A]] = {

        def step(it: Iteratee[E, A])(input: Input[E]): Iteratee[E, Iteratee[E, A]] = input match {
          case in @ (Input.El(_) | Input.Empty) =>
            val next: Future[Iteratee[E, Iteratee[E, A]]] = it.pureFlatFold[E, Iteratee[E, A]] {
              case Step.Cont(k) =>
                val n = k(in)
                n.pureFlatFold[E, Iteratee[E, A]] {
                  case Step.Cont(k) => Cont(step(n))
                  case _ => Done(n, Input.Empty)
                }(dec)
              case other => Done(other.it, in)
            }(dec).unflatten.map({ s =>
              s.it
            })(dec).recover({
              case NonFatal(e) =>
                f(e, in)
                Cont(step(it))
            })(pec)
            Iteratee.flatten(next)
          case Input.EOF =>
            Done(it, Input.Empty)
        }

        Cont(step(it))

      }
    }
  }

}

Other Play Framework source code examples

Here is a short list of links related to this Play Framework Enumeratee.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.