|
Play Framework/Scala example source code file (NettyResultStreamer.scala)
The NettyResultStreamer.scala Play Framework example source code
/*
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package play.core.server.netty
import play.api.mvc._
import play.api.libs.iteratee._
import play.api._
import org.jboss.netty.handler.codec.http.HttpHeaders.Names._
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.handler.codec.http.HttpHeaders.Values._
import com.typesafe.netty.http.pipelining.{ OrderedDownstreamChannelEvent, OrderedUpstreamMessageEvent }
import scala.concurrent.Future
import scala.util.{ Failure, Success }
/**
* Streams Play results to Netty
*/
object NettyResultStreamer {
import NettyFuture._
// A channel status holds whether the connection must be closed and the last subsequence sent
class ChannelStatus(val closeConnection: Boolean, val lastSubsequence: Int)
/**
* Send the result to netty
*
* @return A Future that will be redeemed when the result is completely sent
*/
def sendResult(result: Result, closeConnection: Boolean, httpVersion: HttpVersion, startSequence: Int)(implicit ctx: ChannelHandlerContext, oue: OrderedUpstreamMessageEvent): Future[_] = {
// Result of this iteratee is a completion status
val sentResponse: Future[ChannelStatus] = result match {
case res if res.header.headers.exists(_._2 == null) => {
// Make sure enumerator knows it's done, so that any resources it uses can be cleaned up
result.body |>> Done(())
Play.logger.debug("Response with headers set to null, sending 500 response.")
val error = Results.InternalServerError("")
error.body |>>> nettyStreamIteratee(createNettyResponse(error.header, true, httpVersion), startSequence, true)
}
// Sanitisation: ensure that we don't send chunked responses to HTTP 1.0 requests
case UsesTransferEncoding() if httpVersion == HttpVersion.HTTP_1_0 => {
// Make sure enumerator knows it's done, so that any resources it uses can be cleaned up
result.body |>> Done(())
Play.logger.debug("Chunked response to HTTP/1.0 request, sending 505 response.")
val error = Results.HttpVersionNotSupported("The response to this request is chunked and hence requires HTTP 1.1 to be sent, but this is a HTTP 1.0 request.")
error.body |>>> nettyStreamIteratee(createNettyResponse(error.header, closeConnection, httpVersion), startSequence, closeConnection)
}
case CloseConnection() => {
result.body |>>> nettyStreamIteratee(createNettyResponse(result.header, true, httpVersion), startSequence, true)
}
case EndOfBodyInProtocol() => {
result.body |>>> nettyStreamIteratee(createNettyResponse(result.header, closeConnection, httpVersion), startSequence, closeConnection)
}
case _ => {
result.body |>>> bufferingIteratee(createNettyResponse(result.header, closeConnection, httpVersion), startSequence, closeConnection, httpVersion)
}
}
// Clean up
import play.api.libs.iteratee.Execution.Implicits.trampoline
sentResponse.onComplete {
case Success(cs: ChannelStatus) =>
if (cs.closeConnection) {
// Close in an orderely fashion.
val channel = oue.getChannel;
val closeEvent = new DownstreamChannelStateEvent(
channel, channel.getCloseFuture, ChannelState.OPEN, java.lang.Boolean.FALSE);
val ode = new OrderedDownstreamChannelEvent(oue, cs.lastSubsequence + 1, true, closeEvent)
ctx.sendDownstream(ode)
}
case Failure(ex) =>
Play.logger.debug(ex.toString)
Channels.close(oue.getChannel)
}
sentResponse
}
/**
* An iteratee that buffers the first element from the enumerator, and if it then receives EOF, sends the a result
* immediately with the body and a content length.
*
* If there is more than one element from the enumerator, it sends the response either as chunked or as a stream that
* gets closed, depending on whether the protocol is HTTP 1.0 or HTTP 1.1.
*/
def bufferingIteratee(nettyResponse: HttpResponse, startSequence: Int, closeConnection: Boolean, httpVersion: HttpVersion)(implicit ctx: ChannelHandlerContext, e: OrderedUpstreamMessageEvent): Iteratee[Array[Byte], ChannelStatus] = {
// Left is the first chunk if there was more than one chunk, right is the zero or one and only chunk
def takeUpToOneChunk(chunk: Option[Array[Byte]]): Iteratee[Array[Byte], Either[Array[Byte], Option[Array[Byte]]]] = Cont {
// We have a second chunk, fail with left
case in @ Input.El(data) if chunk.isDefined => Done(Left(chunk.get), in)
// This is the first chunk
case Input.El(data) => takeUpToOneChunk(Some(data))
case Input.Empty => takeUpToOneChunk(chunk)
// We reached EOF, which means we either have one or zero chunks
case Input.EOF => Done(Right(chunk))
}
import play.api.libs.iteratee.Execution.Implicits.trampoline
takeUpToOneChunk(None).flatMap {
case Right(chunk) => {
val buffer = chunk.map(ChannelBuffers.wrappedBuffer).getOrElse(ChannelBuffers.EMPTY_BUFFER)
// We successfully buffered it, so set the content length and send the whole thing as one buffer
nettyResponse.headers().set(CONTENT_LENGTH, buffer.readableBytes)
nettyResponse.setContent(buffer)
val promise = sendDownstream(startSequence, !closeConnection, nettyResponse).toScala
val done = Done[Array[Byte], ChannelStatus](new ChannelStatus(closeConnection, startSequence))
Iteratee.flatten(promise.map(_ => done).recover {
case _ => done
})
}
case Left(chunk) => {
val bufferedAsEnumerator = Enumerator(chunk)
// Get the iteratee, maybe chunked or maybe not according HTTP version
val bodyIteratee = if (httpVersion == HttpVersion.HTTP_1_0) {
nettyStreamIteratee(nettyResponse, startSequence, true)
} else {
// Chunk it
nettyResponse.headers().set(TRANSFER_ENCODING, CHUNKED)
Results.chunk &>> nettyStreamIteratee(nettyResponse, startSequence, closeConnection)
}
// Feed the buffered content into the iteratee, and return the iteratee so that future content can continue
// to be fed directly in as normal
Iteratee.flatten(bufferedAsEnumerator |>> bodyIteratee)
}
}
}
// Construct an iteratee for the purposes of streaming responses to a downstream handler.
def nettyStreamIteratee(nettyResponse: HttpResponse, startSequence: Int, closeConnection: Boolean)(implicit ctx: ChannelHandlerContext, e: OrderedUpstreamMessageEvent): Iteratee[Array[Byte], ChannelStatus] = {
def step(subsequence: Int)(in: Input[Array[Byte]]): Iteratee[Array[Byte], ChannelStatus] = in match {
case Input.El(x) =>
val b = ChannelBuffers.wrappedBuffer(x)
nextWhenComplete(sendDownstream(subsequence, false, b), step(subsequence + 1), new ChannelStatus(closeConnection, subsequence))
case Input.Empty =>
Cont(step(subsequence))
case Input.EOF =>
sendDownstream(subsequence, !closeConnection, ChannelBuffers.EMPTY_BUFFER)
Done(new ChannelStatus(closeConnection, subsequence))
}
nextWhenComplete(sendDownstream(startSequence, false, nettyResponse), step(startSequence + 1), new ChannelStatus(closeConnection, startSequence))
}
def createNettyResponse(header: ResponseHeader, closeConnection: Boolean, httpVersion: HttpVersion) = {
val nettyResponse = new DefaultHttpResponse(httpVersion, HttpResponseStatus.valueOf(header.status))
import scala.collection.JavaConverters._
// Set response headers
header.headers.foreach {
// Fix a bug for Set-Cookie header.
// Multiple cookies could be merged in a single header
// but it's not properly supported by some browsers
case (name @ play.api.http.HeaderNames.SET_COOKIE, value) => {
val cookieValues = Cookies.decode(value).map {
c: play.api.mvc.Cookie => Cookies.encode(Seq(c))
}.asJava
nettyResponse.headers().set(name, cookieValues)
}
case (name, value) => nettyResponse.headers().set(name, value)
}
// Response header Connection: Keep-Alive is needed for HTTP 1.0
if (!closeConnection && httpVersion == HttpVersion.HTTP_1_0) {
nettyResponse.headers().set(CONNECTION, KEEP_ALIVE)
} else if (closeConnection && httpVersion == HttpVersion.HTTP_1_1) {
nettyResponse.headers().set(CONNECTION, CLOSE)
}
nettyResponse
}
def sendDownstream(subSequence: Int, last: Boolean, message: Object)(implicit ctx: ChannelHandlerContext, oue: OrderedUpstreamMessageEvent) = {
val ode = new OrderedDownstreamChannelEvent(oue, subSequence, last, message)
ctx.sendDownstream(ode)
ode.getFuture
}
def nextWhenComplete[E, A](future: ChannelFuture, step: (Input[E]) => Iteratee[E, A], done: A)(implicit ctx: ChannelHandlerContext): Iteratee[E, A] = {
// If the channel isn't currently connected, then this future will never be redeemed. This is racey, and impossible
// to 100% detect, but it's better to fail fast if possible than to sit there waiting forever
import play.api.libs.iteratee.Execution.Implicits.trampoline
if (!ctx.getChannel.isConnected) {
Done(done)
} else {
Iteratee.flatten(
future.toScala.map[Iteratee[E, A]] {
_ => if (ctx.getChannel.isConnected()) Cont(step) else Done(done)
}.recover[Iteratee[E, A]] {
case _ => Done(done)
}
)
}
}
/**
* Extractor object that determines whether the end of the body is specified by the protocol
*/
object EndOfBodyInProtocol {
def unapply(result: Result): Boolean = {
import result.header.headers
headers.contains(TRANSFER_ENCODING) || headers.contains(CONTENT_LENGTH)
}
}
/**
* Extractor object that determines whether the result specifies that the connection should be closed
*/
object CloseConnection {
def unapply(result: Result): Boolean = result.connection == HttpConnection.Close
}
/**
* Extractor object that determines whether the result uses a transfer encoding
*/
object UsesTransferEncoding {
def unapply(result: Result): Boolean = play.core.actions.UsesTransferEncoding.unapply(result)
}
}
Other Play Framework source code examplesHere is a short list of links related to this Play Framework NettyResultStreamer.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.