alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Play Framework/Scala example source code file (NettyResultStreamer.scala)

This example Play Framework source code file (NettyResultStreamer.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Play Framework (and Scala) source code examples by using tags.

All credit for the original source code belongs to Play Framework; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Play Framework tags/keywords

a, api, boolean, channelhandlercontext, channelstatus, concurrent, core, done, httpversion, int, iteratee, netty, orderedupstreammessageevent, play, play framework, result

The NettyResultStreamer.scala Play Framework example source code

/*
 * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
 */
package play.core.server.netty

import play.api.mvc._
import play.api.libs.iteratee._
import play.api._

import org.jboss.netty.handler.codec.http.HttpHeaders.Names._
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.handler.codec.http.HttpHeaders.Values._

import com.typesafe.netty.http.pipelining.{ OrderedDownstreamChannelEvent, OrderedUpstreamMessageEvent }

import scala.concurrent.Future
import scala.util.{ Failure, Success }

/**
 * Streams Play results to Netty
 */
object NettyResultStreamer {

  import NettyFuture._

  // A channel status holds whether the connection must be closed and the last subsequence sent
  class ChannelStatus(val closeConnection: Boolean, val lastSubsequence: Int)

  /**
   * Send the result to netty
   *
   * @return A Future that will be redeemed when the result is completely sent
   */
  def sendResult(result: Result, closeConnection: Boolean, httpVersion: HttpVersion, startSequence: Int)(implicit ctx: ChannelHandlerContext, oue: OrderedUpstreamMessageEvent): Future[_] = {
    // Result of this iteratee is a completion status
    val sentResponse: Future[ChannelStatus] = result match {

      case res if res.header.headers.exists(_._2 == null) => {
        // Make sure enumerator knows it's done, so that any resources it uses can be cleaned up
        result.body |>> Done(())

        Play.logger.debug("Response with headers set to null, sending 500 response.")
        val error = Results.InternalServerError("")
        error.body |>>> nettyStreamIteratee(createNettyResponse(error.header, true, httpVersion), startSequence, true)
      }

      // Sanitisation: ensure that we don't send chunked responses to HTTP 1.0 requests
      case UsesTransferEncoding() if httpVersion == HttpVersion.HTTP_1_0 => {
        // Make sure enumerator knows it's done, so that any resources it uses can be cleaned up
        result.body |>> Done(())

        Play.logger.debug("Chunked response to HTTP/1.0 request, sending 505 response.")
        val error = Results.HttpVersionNotSupported("The response to this request is chunked and hence requires HTTP 1.1 to be sent, but this is a HTTP 1.0 request.")
        error.body |>>> nettyStreamIteratee(createNettyResponse(error.header, closeConnection, httpVersion), startSequence, closeConnection)
      }

      case CloseConnection() => {
        result.body |>>> nettyStreamIteratee(createNettyResponse(result.header, true, httpVersion), startSequence, true)
      }

      case EndOfBodyInProtocol() => {
        result.body |>>> nettyStreamIteratee(createNettyResponse(result.header, closeConnection, httpVersion), startSequence, closeConnection)
      }

      case _ => {
        result.body |>>> bufferingIteratee(createNettyResponse(result.header, closeConnection, httpVersion), startSequence, closeConnection, httpVersion)
      }

    }

    // Clean up
    import play.api.libs.iteratee.Execution.Implicits.trampoline

    sentResponse.onComplete {
      case Success(cs: ChannelStatus) =>
        if (cs.closeConnection) {
          // Close in an orderely fashion.
          val channel = oue.getChannel;
          val closeEvent = new DownstreamChannelStateEvent(
            channel, channel.getCloseFuture, ChannelState.OPEN, java.lang.Boolean.FALSE);
          val ode = new OrderedDownstreamChannelEvent(oue, cs.lastSubsequence + 1, true, closeEvent)
          ctx.sendDownstream(ode)
        }
      case Failure(ex) =>
        Play.logger.debug(ex.toString)
        Channels.close(oue.getChannel)
    }
    sentResponse
  }

  /**
   * An iteratee that buffers the first element from the enumerator, and if it then receives EOF, sends the a result
   * immediately with the body and a content length.
   *
   * If there is more than one element from the enumerator, it sends the response either as chunked or as a stream that
   * gets closed, depending on whether the protocol is HTTP 1.0 or HTTP 1.1.
   */
  def bufferingIteratee(nettyResponse: HttpResponse, startSequence: Int, closeConnection: Boolean, httpVersion: HttpVersion)(implicit ctx: ChannelHandlerContext, e: OrderedUpstreamMessageEvent): Iteratee[Array[Byte], ChannelStatus] = {

    // Left is the first chunk if there was more than one chunk, right is the zero or one and only chunk
    def takeUpToOneChunk(chunk: Option[Array[Byte]]): Iteratee[Array[Byte], Either[Array[Byte], Option[Array[Byte]]]] = Cont {
      // We have a second chunk, fail with left
      case in @ Input.El(data) if chunk.isDefined => Done(Left(chunk.get), in)
      // This is the first chunk
      case Input.El(data) => takeUpToOneChunk(Some(data))
      case Input.Empty => takeUpToOneChunk(chunk)
      // We reached EOF, which means we either have one or zero chunks
      case Input.EOF => Done(Right(chunk))
    }

    import play.api.libs.iteratee.Execution.Implicits.trampoline

    takeUpToOneChunk(None).flatMap {
      case Right(chunk) => {
        val buffer = chunk.map(ChannelBuffers.wrappedBuffer).getOrElse(ChannelBuffers.EMPTY_BUFFER)
        // We successfully buffered it, so set the content length and send the whole thing as one buffer
        nettyResponse.headers().set(CONTENT_LENGTH, buffer.readableBytes)
        nettyResponse.setContent(buffer)
        val promise = sendDownstream(startSequence, !closeConnection, nettyResponse).toScala
        val done = Done[Array[Byte], ChannelStatus](new ChannelStatus(closeConnection, startSequence))
        Iteratee.flatten(promise.map(_ => done).recover {
          case _ => done
        })
      }
      case Left(chunk) => {
        val bufferedAsEnumerator = Enumerator(chunk)

        // Get the iteratee, maybe chunked or maybe not according HTTP version
        val bodyIteratee = if (httpVersion == HttpVersion.HTTP_1_0) {
          nettyStreamIteratee(nettyResponse, startSequence, true)
        } else {
          // Chunk it
          nettyResponse.headers().set(TRANSFER_ENCODING, CHUNKED)
          Results.chunk &>> nettyStreamIteratee(nettyResponse, startSequence, closeConnection)
        }

        // Feed the buffered content into the iteratee, and return the iteratee so that future content can continue
        // to be fed directly in as normal
        Iteratee.flatten(bufferedAsEnumerator |>> bodyIteratee)
      }
    }

  }

  // Construct an iteratee for the purposes of streaming responses to a downstream handler.
  def nettyStreamIteratee(nettyResponse: HttpResponse, startSequence: Int, closeConnection: Boolean)(implicit ctx: ChannelHandlerContext, e: OrderedUpstreamMessageEvent): Iteratee[Array[Byte], ChannelStatus] = {

    def step(subsequence: Int)(in: Input[Array[Byte]]): Iteratee[Array[Byte], ChannelStatus] = in match {
      case Input.El(x) =>
        val b = ChannelBuffers.wrappedBuffer(x)
        nextWhenComplete(sendDownstream(subsequence, false, b), step(subsequence + 1), new ChannelStatus(closeConnection, subsequence))
      case Input.Empty =>
        Cont(step(subsequence))
      case Input.EOF =>
        sendDownstream(subsequence, !closeConnection, ChannelBuffers.EMPTY_BUFFER)
        Done(new ChannelStatus(closeConnection, subsequence))
    }
    nextWhenComplete(sendDownstream(startSequence, false, nettyResponse), step(startSequence + 1), new ChannelStatus(closeConnection, startSequence))
  }

  def createNettyResponse(header: ResponseHeader, closeConnection: Boolean, httpVersion: HttpVersion) = {
    val nettyResponse = new DefaultHttpResponse(httpVersion, HttpResponseStatus.valueOf(header.status))

    import scala.collection.JavaConverters._

    // Set response headers
    header.headers.foreach {

      // Fix a bug for Set-Cookie header.
      // Multiple cookies could be merged in a single header
      // but it's not properly supported by some browsers
      case (name @ play.api.http.HeaderNames.SET_COOKIE, value) => {
        val cookieValues = Cookies.decode(value).map {
          c: play.api.mvc.Cookie => Cookies.encode(Seq(c))
        }.asJava
        nettyResponse.headers().set(name, cookieValues)
      }

      case (name, value) => nettyResponse.headers().set(name, value)
    }

    // Response header Connection: Keep-Alive is needed for HTTP 1.0
    if (!closeConnection && httpVersion == HttpVersion.HTTP_1_0) {
      nettyResponse.headers().set(CONNECTION, KEEP_ALIVE)
    } else if (closeConnection && httpVersion == HttpVersion.HTTP_1_1) {
      nettyResponse.headers().set(CONNECTION, CLOSE)
    }

    nettyResponse
  }

  def sendDownstream(subSequence: Int, last: Boolean, message: Object)(implicit ctx: ChannelHandlerContext, oue: OrderedUpstreamMessageEvent) = {
    val ode = new OrderedDownstreamChannelEvent(oue, subSequence, last, message)
    ctx.sendDownstream(ode)
    ode.getFuture
  }

  def nextWhenComplete[E, A](future: ChannelFuture, step: (Input[E]) => Iteratee[E, A], done: A)(implicit ctx: ChannelHandlerContext): Iteratee[E, A] = {
    // If the channel isn't currently connected, then this future will never be redeemed.  This is racey, and impossible
    // to 100% detect, but it's better to fail fast if possible than to sit there waiting forever
    import play.api.libs.iteratee.Execution.Implicits.trampoline
    if (!ctx.getChannel.isConnected) {
      Done(done)
    } else {
      Iteratee.flatten(
        future.toScala.map[Iteratee[E, A]] {
          _ => if (ctx.getChannel.isConnected()) Cont(step) else Done(done)
        }.recover[Iteratee[E, A]] {
          case _ => Done(done)
        }
      )
    }
  }

  /**
   * Extractor object that determines whether the end of the body is specified by the protocol
   */
  object EndOfBodyInProtocol {
    def unapply(result: Result): Boolean = {
      import result.header.headers
      headers.contains(TRANSFER_ENCODING) || headers.contains(CONTENT_LENGTH)
    }
  }

  /**
   * Extractor object that determines whether the result specifies that the connection should be closed
   */
  object CloseConnection {
    def unapply(result: Result): Boolean = result.connection == HttpConnection.Close
  }

  /**
   * Extractor object that determines whether the result uses a transfer encoding
   */
  object UsesTransferEncoding {
    def unapply(result: Result): Boolean = play.core.actions.UsesTransferEncoding.unapply(result)
  }

}

Other Play Framework source code examples

Here is a short list of links related to this Play Framework NettyResultStreamer.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.