|
Play Framework/Scala example source code file (RequestBodyHandler.scala)
The RequestBodyHandler.scala Play Framework example source code/* * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> */ package play.core.server.netty import org.jboss.netty.channel._ import org.jboss.netty.handler.codec.http._ import play.api._ import play.api.libs.iteratee._ import play.api.libs.iteratee.Input._ import scala.concurrent.{ Future, Promise } import scala.util.{ Try, Success } private[server] trait RequestBodyHandler { /** * Creates a new upstream handler for the purposes of receiving chunked requests. Requests are buffered as an * optimization. * * @param bodyHandler the iteratee used to handle the body. * @param replaceHandler a function to handle the registration of a new handler. A handler is passed as a param. * @param handlerFinished a function to handle the de-registration of the handler i.e. when the chunked request is complete. * @return a future of an iteratee that will return the result. */ def newRequestBodyUpstreamHandler[A](bodyHandler: Iteratee[Array[Byte], A], replaceHandler: ChannelUpstreamHandler => Unit, handlerFinished: => Unit): Future[A] = { implicit val internalContext = play.core.Execution.internalContext import scala.concurrent.stm._ // A promise for the result of the body handler. This will be returned to the caller immediately. val bodyHandlerResult = Promise[Iteratee[Array[Byte], A]]() // Constants for how many messages we're prepared to allow to be in flight. val MaxMessages = 10 val MinMessages = 10 // How messages we currently have "in flight" that the bodyHandler hasn't acknowledged. val counter = Ref(0) // An STM reference to the body handler. // I don't think STM is needed here, since Netty won't give us two chunks at once. // But I don't want to remove it, in case I've misunderstood. // TODO: Work out whether STM is needed. val iteratee: Ref[Iteratee[Array[Byte], A]] = Ref(bodyHandler) def pushChunk(ctx: ChannelHandlerContext, chunk: Input[Array[Byte]]) { // If we have more messages in flight than the maximum, ensure that we have told upstream that // we don't want to received more. But only if the channel is still open and we haven't finished handling it. if (counter.single.transformAndGet { _ + 1 } > MaxMessages && ctx.getChannel.isOpen && !bodyHandlerResult.isCompleted) ctx.getChannel.setReadable(false) // Promise for the next iteratee val itPromise = Promise[Iteratee[Array[Byte], A]]() // Update our body handler iteratee to be the next iteratee, and get the current one atomically val current = atomic { implicit txn => if (!bodyHandlerResult.isCompleted) { Some(iteratee.single.swap(Iteratee.flatten(itPromise.future))) } else { // We already have a result, but we're not at the end of the stream yet. Replace the handler with one that // will simply ignore the rest of the body. // This means we can free up resources that this handler holds, namely, the promise for the parsed body, // which could be large, while the rest of the body comes. if (chunk != Input.EOF) { replaceHandler(new IgnoreBodyHandler(handlerFinished)) } None } } current.foreach { currentIteratee => // Feed the chunk currentIteratee.feed(chunk).flatMap(_.unflatten).onComplete { case Success(c @ Step.Cont(k)) => continue(c.it) case done => finish(done.map(_.it)) } } def continue(it: Iteratee[Array[Byte], A]) { // If we have less messages in flight than the minimum, ensure that we have told upstream that // we are ready to receive more. if (counter.single.transformAndGet { _ - 1 } <= MinMessages && ctx.getChannel.isOpen) ctx.getChannel.setReadable(true) itPromise.success(it) } def finish(result: Try[Iteratee[Array[Byte], A]]) { // Redeem the body handler result if (!bodyHandlerResult.tryComplete(result)) { if (ctx.getChannel.isOpen) ctx.getChannel.setReadable(true) } itPromise.complete(result) } } replaceHandler(new SimpleChannelUpstreamHandler { override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { e.getMessage match { case chunk: HttpChunk if !chunk.isLast => val cBuffer = chunk.getContent val bytes = new Array[Byte](cBuffer.readableBytes()) cBuffer.readBytes(bytes) pushChunk(ctx, El(bytes)) case chunk: HttpChunk if chunk.isLast => { pushChunk(ctx, EOF) handlerFinished } case unexpected => Play.logger.error("Oops, unexpected message received in NettyServer/RequestBodyHandler" + " (please report this problem): " + unexpected) } } override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) { Play.logger.error("Exception caught in RequestBodyHandler", e.getCause) e.getChannel.close() } override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { pushChunk(ctx, EOF) } }) bodyHandlerResult.future.flatMap(_.run) } /** * Ignores the body, but calls finish when finished. */ private class IgnoreBodyHandler(handlerFinished: => Unit) extends SimpleChannelUpstreamHandler { override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { e.getMessage match { case chunk: HttpChunk => { // Ignore, and invoke the callback if it's the last chunk. if (chunk.isLast) handlerFinished } // Even though this handler essentially ignores everything it receives, it should only be handling HTTP chunks, // so if it gets something else log it so that we can know there's a bug. case unexpected => Play.logger.error("Oops, unexpected message received in NettyServer/IgnoreBodyHandler" + " (please report this problem): " + unexpected) } } override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) { Play.logger.error("Exception caught in IgnoreBodyHandler", e.getCause) e.getChannel.close() } } } Other Play Framework source code examplesHere is a short list of links related to this Play Framework RequestBodyHandler.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.