|
Play Framework/Scala example source code file (PlayDefaultUpstreamHandler.scala)
The PlayDefaultUpstreamHandler.scala Play Framework example source code/* * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> */ package play.core.server.netty import org.jboss.netty.buffer.ChannelBuffers import org.jboss.netty.channel._ import org.jboss.netty.handler.codec.http._ import org.jboss.netty.handler.codec.http.HttpHeaders._ import org.jboss.netty.handler.codec.http.HttpHeaders.Names._ import org.jboss.netty.handler.codec.http.websocketx.{ WebSocketFrame, TextWebSocketFrame, BinaryWebSocketFrame } import org.jboss.netty.handler.codec.frame.TooLongFrameException import org.jboss.netty.handler.ssl._ import org.jboss.netty.channel.group._ import play.api._ import play.api.mvc._ import play.api.http.HeaderNames.{ X_FORWARDED_FOR, X_FORWARDED_PROTO } import play.api.libs.concurrent.Execution import play.api.libs.iteratee._ import play.api.libs.iteratee.Input._ import play.core._ import play.core.server.Server import play.core.websocket._ import scala.collection.JavaConverters._ import scala.util.control.Exception import com.typesafe.netty.http.pipelining.{ OrderedDownstreamChannelEvent, OrderedUpstreamMessageEvent } import scala.concurrent.Future import java.net.URI import java.io.IOException import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame private[play] class PlayDefaultUpstreamHandler(server: Server, allChannels: DefaultChannelGroup) extends SimpleChannelUpstreamHandler with WebSocketHandler with RequestBodyHandler { private val requestIDs = new java.util.concurrent.atomic.AtomicLong(0) /** * We don't know what the consequence of changing logging exceptions from trace to error will be. We hope that it * won't have any impact, but in case it turns out that there are many exceptions that are normal occurrences, we * want to give people the opportunity to turn it off. */ val nettyExceptionLogger = Logger("play.nettyException") override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) { event.getCause match { // IO exceptions happen all the time, it usually just means that the client has closed the connection before fully // sending/receiving the response. case e: IOException => nettyExceptionLogger.trace("Benign IO exception caught in Netty", e) event.getChannel.close() case e: TooLongFrameException => nettyExceptionLogger.warn("Handling TooLongFrameException", e) val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_URI_TOO_LONG) response.headers().set(Names.CONNECTION, "close") ctx.getChannel.write(response).addListener(ChannelFutureListener.CLOSE) case e => nettyExceptionLogger.error("Exception caught in Netty", e) event.getChannel.close() } } override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { Option(ctx.getPipeline.get(classOf[SslHandler])).map { sslHandler => sslHandler.handshake() } } override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { val cleanup = ctx.getAttachment if (cleanup != null) cleanup.asInstanceOf[() => Unit]() ctx.setAttachment(null) } override def channelOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent) { allChannels.add(e.getChannel) } override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { e.getMessage match { case nettyHttpRequest: HttpRequest => Play.logger.trace("Http request received by netty: " + nettyHttpRequest) val keepAlive = isKeepAlive(nettyHttpRequest) val websocketableRequest = websocketable(nettyHttpRequest) var nettyVersion = nettyHttpRequest.getProtocolVersion val nettyUri = new QueryStringDecoder(nettyHttpRequest.getUri) val rHeaders = getHeaders(nettyHttpRequest) def rRemoteAddress = e.getRemoteAddress match { case ra: java.net.InetSocketAddress => val remoteAddress = ra.getAddress.getHostAddress forwardedHeader(remoteAddress, X_FORWARDED_FOR).getOrElse(remoteAddress) } def rSecure = e.getRemoteAddress match { case ra: java.net.InetSocketAddress => val remoteAddress = ra.getAddress.getHostAddress val fh = forwardedHeader(remoteAddress, X_FORWARDED_PROTO) fh.map(_ == "https").getOrElse(ctx.getPipeline.get(classOf[SslHandler]) != null) } /** * Gets the value of a header, if the remote address is localhost or * if the trustxforwarded configuration property is true */ def forwardedHeader(remoteAddress: String, headerName: String) = for { headerValue <- rHeaders.get(headerName) app <- server.applicationProvider.get.toOption trustxforwarded <- app.configuration.getBoolean("trustxforwarded").orElse(Some(false)) if remoteAddress == "127.0.0.1" || trustxforwarded } yield headerValue def tryToCreateRequest = { val parameters = Map.empty[String, Seq[String]] ++ nettyUri.getParameters.asScala.mapValues(_.asScala) createRequestHeader(parameters) } def createRequestHeader(parameters: Map[String, Seq[String]] = Map.empty[String, Seq[String]]) = { //mapping netty request to Play's val untaggedRequestHeader = new RequestHeader { val id = requestIDs.incrementAndGet val tags = Map.empty[String, String] def uri = nettyHttpRequest.getUri def path = new URI(nettyUri.getPath).getRawPath //wrapping into URI to handle absoluteURI def method = nettyHttpRequest.getMethod.getName def version = nettyVersion.getText def queryString = parameters def headers = rHeaders lazy val remoteAddress = rRemoteAddress lazy val secure = rSecure def username = None } untaggedRequestHeader } val (requestHeader, handler: Either[Future[Result], (Handler, Application)]) = Exception .allCatch[RequestHeader].either { val rh = tryToCreateRequest // Force parsing of uri rh.path rh }.fold( e => { val rh = createRequestHeader() val global = server.applicationProvider.get .map(_.global) .getOrElse(DefaultGlobal) val result = Future .successful(()) // Create a dummy future .flatMap { _ => // Call errorHandler in another context, don't block here global.onBadRequest(rh, e.getMessage) }(play.api.libs.iteratee.Execution.trampoline) (rh, Left(result)) }, rh => server.getHandlerFor(rh) match { case directResult @ Left(_) => (rh, directResult) case Right((taggedRequestHeader, handler, application)) => (taggedRequestHeader, Right((handler, application))) } ) // Call onRequestCompletion after all request processing is done. Protected with an AtomicBoolean to ensure can't be executed more than once. val alreadyClean = new java.util.concurrent.atomic.AtomicBoolean(false) def cleanup() { if (!alreadyClean.getAndSet(true)) { play.api.Play.maybeApplication.foreach(_.global.onRequestCompletion(requestHeader)) } } // attach the cleanup function to the channel context for after cleaning ctx.setAttachment(cleanup _) // It is a pre-requesite that we're using the http pipelining capabilities provided and that we have a // handler downstream from this one that produces these events. implicit val msgCtx = ctx implicit val oue = e.asInstanceOf[OrderedUpstreamMessageEvent] def cleanFlashCookie(result: Result): Result = { val header = result.header val flashCookie = { header.headers.get(SET_COOKIE) .map(Cookies.decode(_)) .flatMap(_.find(_.name == Flash.COOKIE_NAME)).orElse { Option(requestHeader.flash).filterNot(_.isEmpty).map { _ => Flash.discard.toCookie } } } flashCookie.map { newCookie => result.withHeaders(SET_COOKIE -> Cookies.merge(header.headers.get(SET_COOKIE).getOrElse(""), Seq(newCookie))) }.getOrElse(result) } handler match { //execute normal action case Right((action: EssentialAction, app)) => val a = EssentialAction { rh => import play.api.libs.iteratee.Execution.Implicits.trampoline Iteratee.flatten(action(rh).unflatten.map(_.it).recover { case error => Iteratee.flatten( app.handleError(requestHeader, error).map(result => Done(result, Input.Empty)) ): Iteratee[Array[Byte], Result] }) } handleAction(a, Some(app)) case Right((ws @ WebSocket(f), app)) if websocketableRequest.check => Play.logger.trace("Serving this request with: " + ws) val executed = Future(f(requestHeader))(play.api.libs.concurrent.Execution.defaultContext) import play.api.libs.iteratee.Execution.Implicits.trampoline executed.flatMap(identity).map { case Left(result) => // WebSocket was rejected, send result val a = EssentialAction(_ => Done(result, Input.Empty)) handleAction(a, Some(app)) case Right(socket) => val bufferLimit = app.configuration.getBytes("play.websocket.buffer.limit").getOrElse(65536L) val enumerator = websocketHandshake(ctx, nettyHttpRequest, e, bufferLimit)(ws.inFormatter) socket(enumerator, socketOut(ctx)(ws.outFormatter)) }.recover { case error => app.handleError(requestHeader, error).map { result => val a = EssentialAction(_ => Done(result, Input.Empty)) handleAction(a, Some(app)) } } //handle bad websocket request case Right((WebSocket(_), app)) => Play.logger.trace("Bad websocket request") val a = EssentialAction(_ => Done(Results.BadRequest, Input.Empty)) handleAction(a, Some(app)) case Left(e) => Play.logger.trace("No handler, got direct result: " + e) import play.api.libs.iteratee.Execution.Implicits.trampoline val a = EssentialAction(_ => Iteratee.flatten(e.map(result => Done(result, Input.Empty)))) handleAction(a, None) } def handleAction(action: EssentialAction, app: Option[Application]) { Play.logger.trace("Serving this request with: " + action) val bodyParser = Iteratee.flatten( scala.concurrent.Future(action(requestHeader))(play.api.libs.concurrent.Execution.defaultContext) ) import play.api.libs.iteratee.Execution.Implicits.trampoline val expectContinue: Option[_] = requestHeader.headers.get("Expect").filter(_.equalsIgnoreCase("100-continue")) // Regardless of whether the client is expecting 100 continue or not, we need to feed the body here in the // Netty thread, so that the handler is replaced in this thread, so that if the client does start sending // body chunks (which it might according to the HTTP spec if we're slow to respond), we can handle them. val eventuallyResult: Future[Result] = if (nettyHttpRequest.isChunked) { val pipeline = ctx.getChannel.getPipeline val result = newRequestBodyUpstreamHandler(bodyParser, { handler => pipeline.replace("handler", "handler", handler) }, { pipeline.replace("handler", "handler", this) }) result } else { val bodyEnumerator = { val body = { val cBuffer = nettyHttpRequest.getContent val bytes = new Array[Byte](cBuffer.readableBytes()) cBuffer.readBytes(bytes) bytes } Enumerator(body).andThen(Enumerator.enumInput(EOF)) } bodyEnumerator |>>> bodyParser } // An iteratee containing the result and the sequence number. // Sequence number will be 1 if a 100 continue response has been sent, otherwise 0. val eventuallyResultWithSequence: Future[(Result, Int)] = expectContinue match { case Some(_) => { bodyParser.unflatten.flatMap { case Step.Cont(k) => sendDownstream(0, false, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)) eventuallyResult.map((_, 1)) case Step.Done(result, _) => { // Return the result immediately, and ensure that the connection is set to close // Connection must be set to close because whatever comes next in the stream is either the request // body, because the client waited too long for our response, or the next request, and there's no way // for us to know which. See RFC2616 Section 8.2.3. Future.successful((result.copy(connection = HttpConnection.Close), 0)) } case Step.Error(msg, _) => { e.getChannel.setReadable(true) val error = new RuntimeException("Body parser iteratee in error: " + msg) val result = app.map(_.handleError(requestHeader, error)).getOrElse(DefaultGlobal.onError(requestHeader, error)) result.map(r => (r.copy(connection = HttpConnection.Close), 0)) } } } case None => eventuallyResult.map((_, 0)) } val sent = eventuallyResultWithSequence.recoverWith { case error => Play.logger.error("Cannot invoke the action, eventually got an error: " + error) e.getChannel.setReadable(true) app.map(_.handleError(requestHeader, error)) .getOrElse(DefaultGlobal.onError(requestHeader, error)) .map((_, 0)) }.flatMap { case (result, sequence) => NettyResultStreamer.sendResult(cleanFlashCookie(result), !keepAlive, nettyVersion, sequence) } // Finally, clean up sent.map { _ => cleanup() ctx.setAttachment(null) } } case unexpected => Play.logger.error("Oops, unexpected message received in NettyServer (please report this problem): " + unexpected) } } def socketOut[A](ctx: ChannelHandlerContext)(frameFormatter: play.api.mvc.WebSocket.FrameFormatter[A]): Iteratee[A, Unit] = { import play.api.libs.iteratee.Execution.Implicits.trampoline val channel = ctx.getChannel val basicFrameFormatter = frameFormatter.asInstanceOf[BasicFrameFormatter[A]] import NettyFuture._ def iteratee: Iteratee[A, _] = Cont { case El(e) => val basicFrame: BasicFrame = basicFrameFormatter.toFrame(e) val nettyFrame: WebSocketFrame = basicFrame match { case TextFrame(text) => new TextWebSocketFrame(true, 0, text) case BinaryFrame(bytes) => new BinaryWebSocketFrame(true, 0, ChannelBuffers.wrappedBuffer(bytes)) } Iteratee.flatten(channel.write(nettyFrame).toScala.map(_ => iteratee)) case e @ EOF => if (channel.isOpen) { Iteratee.flatten(for { _ <- channel.write(new CloseWebSocketFrame(WebSocketNormalClose, "")).toScala _ <- channel.close().toScala } yield Done((), e)) } else Done((), e) case Empty => iteratee } iteratee.map(_ => ()) } def getHeaders(nettyRequest: HttpRequest): Headers = { val pairs = nettyRequest.headers().entries().asScala.groupBy(_.getKey).mapValues(_.map(_.getValue)) new Headers { val data = pairs.toSeq } } def sendDownstream(subSequence: Int, last: Boolean, message: Object)(implicit ctx: ChannelHandlerContext, oue: OrderedUpstreamMessageEvent) = { val ode = new OrderedDownstreamChannelEvent(oue, subSequence, last, message) ctx.sendDownstream(ode) ode.getFuture } } Other Play Framework source code examplesHere is a short list of links related to this Play Framework PlayDefaultUpstreamHandler.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.