|
Play Framework/Scala example source code file (NingWS.scala)
The NingWS.scala Play Framework example source code/* * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> */ package play.api.libs.ws.ning import com.ning.http.client.{ Response => AHCResponse, ProxyServer => AHCProxyServer, _ } import com.ning.http.client.cookie.{ Cookie => AHCCookie } import com.ning.http.client.Realm.{ RealmBuilder, AuthScheme } import com.ning.http.util.AsyncHttpProviderUtils import collection.immutable.TreeMap import scala.concurrent.{ Future, Promise } import java.util.concurrent.atomic.AtomicReference import play.api.libs.ws._ import play.api.libs.ws.ssl._ import play.api.libs.iteratee._ import play.api.{ Mode, Application, Play } import play.core.utils.CaseInsensitiveOrdered import play.api.libs.ws.DefaultWSResponseHeaders import play.api.libs.iteratee.Input.El import play.api.libs.ws.ssl.debug._ import scala.collection.JavaConverters._ /** * A WS client backed by a Ning AsyncHttpClient. * * If you need to debug Ning, set logger.com.ning.http.client=DEBUG in your application.conf file. * * @param config a client configuration object */ class NingWSClient(config: AsyncHttpClientConfig) extends WSClient { private val asyncHttpClient = new AsyncHttpClient(config) def underlying[T] = asyncHttpClient.asInstanceOf[T] private[libs] def executeRequest[T](request: Request, handler: AsyncHandler[T]): ListenableFuture[T] = asyncHttpClient.executeRequest(request, handler) def close() = asyncHttpClient.close() def url(url: String): WSRequestHolder = NingWSRequestHolder(this, url, "GET", EmptyBody, Map(), Map(), None, None, None, None, None, None) } /** * A WS Request. */ case class NingWSRequest(client: NingWSClient, method: String, private val auth: Option[(String, String, WSAuthScheme)], private val calc: Option[WSSignatureCalculator], private val headers: Map[String, Seq[String]], private val body: WSBody, builder: RequestBuilder) extends WSRequest { //this will do a java mutable set hence the {} response auth.foreach(data => auth(data._1, data._2, authScheme(data._3))) /** * Return the current headers of the request being constructed */ def allHeaders: Map[String, Seq[String]] = headers /** * Return the current query string parameters */ def queryString: Map[String, Seq[String]] = { val request = builder.build() val params = request.getParams require(params != null) mapAsScalaMapConverter(params).asScala.map { e => e._1 -> e._2.asScala.toSeq }.toMap } /** * Retrieve an HTTP header. */ def header(name: String): Option[String] = headers.get(name).flatMap(_.headOption) def url: String = { val request = builder.build() request.getUrl } def getBody: Option[Array[Byte]] = { body match { case InMemoryBody(bytes) => Some(bytes) case _ => None } } /** * Set an HTTP header. */ @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") override def setHeader(name: String, value: String): NingWSRequest = { this.copy(builder = builder.setHeader(name, value)) } /** * Add an HTTP header (used for headers with multiple values). */ @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") override def addHeader(name: String, value: String): NingWSRequest = { this.copy(builder = builder.addHeader(name, value)) } /** * Defines the request headers. */ @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setHeaders(hdrs: FluentCaseInsensitiveStringsMap): NingWSRequest = { this.copy(builder = builder.setHeaders(hdrs)) } /** * Defines the request headers. */ @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setHeaders(hdrs: java.util.Map[String, java.util.Collection[String]]): NingWSRequest = { this.copy(builder = builder.setHeaders(hdrs)) } /** * Defines the request headers. */ @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setHeaders(hdrs: Map[String, Seq[String]]): NingWSRequest = { // roll up the builders using two foldlefts... val newBuilder = hdrs.foldLeft(builder) { (b, header) => header._2.foldLeft(b) { (b2, value) => b2.addHeader(header._1, value) } } this.copy(builder = newBuilder) } /** * Defines the query string. */ @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setQueryString(queryString: Map[String, Seq[String]]): NingWSRequest = { val newBuilder = queryString.foldLeft(builder) { (b, entry) => val (key, values) = entry values.foldLeft(b) { (b2, value) => b2.addQueryParameter(key, value) } } this.copy(builder = newBuilder) } /** * Defines the URL. */ @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setUrl(url: String): NingWSRequest = { this.copy(builder = builder.setUrl(url)) } @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setPerRequestConfig(config: PerRequestConfig): NingWSRequest = { this.copy(builder = builder.setPerRequestConfig(config)) } @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setFollowRedirects(followRedirects: Boolean): NingWSRequest = { this.copy(builder = builder.setFollowRedirects(followRedirects)) } @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setVirtualHost(virtualHost: String): NingWSRequest = { this.copy(builder = builder.setVirtualHost(virtualHost)) } @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setProxyServer(proxyServer: AHCProxyServer): NingWSRequest = { this.copy(builder = builder.setProxyServer(proxyServer)) } @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setBody(s: String): NingWSRequest = { this.copy(builder = builder.setBody(s)) } @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setBody(bodyGenerator: BodyGenerator): NingWSRequest = { this.copy(builder = builder.setBody(bodyGenerator)) } @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def setBody(byteArray: Array[Byte]): NingWSRequest = { this.copy(builder = builder.setBody(byteArray)) } @scala.deprecated("This will be a protected method, please use WSRequestHolder", "2.3.0") def build: com.ning.http.client.Request = { builder.build() } private def authScheme(scheme: WSAuthScheme): AuthScheme = scheme match { case WSAuthScheme.DIGEST => AuthScheme.DIGEST case WSAuthScheme.BASIC => AuthScheme.BASIC case WSAuthScheme.NTLM => AuthScheme.NTLM case WSAuthScheme.SPNEGO => AuthScheme.SPNEGO case WSAuthScheme.KERBEROS => AuthScheme.KERBEROS case WSAuthScheme.NONE => AuthScheme.NONE case _ => throw new RuntimeException("Unknown scheme " + scheme) } /** * Add http auth headers. Defaults to HTTP Basic. */ private def auth(username: String, password: String, scheme: AuthScheme = AuthScheme.BASIC): Unit = { builder.setRealm((new RealmBuilder) .setScheme(scheme) .setPrincipal(username) .setPassword(password) .setUsePreemptiveAuth(true) .build()) } private def ningHeadersToMap(headers: java.util.Map[String, java.util.Collection[String]]) = mapAsScalaMapConverter(headers).asScala.map(e => e._1 -> e._2.asScala.toSeq).toMap private def ningHeadersToMap(headers: FluentCaseInsensitiveStringsMap) = { val res = mapAsScalaMapConverter(headers).asScala.map(e => e._1 -> e._2.asScala.toSeq).toMap //todo: wrap the case insensitive ning map instead of creating a new one (unless perhaps immutabilty is important) TreeMap(res.toSeq: _*)(CaseInsensitiveOrdered) } private[libs] def execute: Future[NingWSResponse] = { import com.ning.http.client.AsyncCompletionHandler val result = Promise[NingWSResponse]() calc.foreach(_.sign(this)) client.executeRequest(builder.build(), new AsyncCompletionHandler[AHCResponse]() { override def onCompleted(response: AHCResponse) = { result.success(NingWSResponse(response)) response } override def onThrowable(t: Throwable) = { result.failure(t) } }) result.future } private[libs] def executeStream(): Future[(WSResponseHeaders, Enumerator[Array[Byte]])] = { import com.ning.http.client.AsyncHandler import play.api.libs.concurrent.Execution.Implicits.defaultContext val result = Promise[(WSResponseHeaders, Enumerator[Array[Byte]])]() val errorInStream = Promise[Unit]() calc.foreach(_.sign(this)) val promisedIteratee = Promise[Iteratee[Array[Byte], Unit]]() @volatile var doneOrError = false @volatile var statusCode = 0 @volatile var current: Iteratee[Array[Byte], Unit] = Iteratee.flatten(promisedIteratee.future) client.executeRequest(builder.build(), new AsyncHandler[Unit]() { import com.ning.http.client.AsyncHandler.STATE override def onStatusReceived(status: HttpResponseStatus) = { statusCode = status.getStatusCode STATE.CONTINUE } override def onHeadersReceived(h: HttpResponseHeaders) = { val headers = h.getHeaders val responseHeader = DefaultWSResponseHeaders(statusCode, ningHeadersToMap(headers)) val enumerator = new Enumerator[Array[Byte]]() { def apply[A](i: Iteratee[Array[Byte], A]) = { val doneIteratee = Promise[Iteratee[Array[Byte], A]]() // Map it so that we can complete the iteratee when it returns val mapped = i.map { a => doneIteratee.trySuccess(Done(a)) () }.recover { // but if an error happens, we want to propogate that case e => doneIteratee.tryFailure(e) throw e } // Redeem the iteratee that we promised to the AsyncHandler promisedIteratee.trySuccess(mapped) // If there's an error in the stream from upstream, then fail this returned future with that errorInStream.future.onFailure { case e => doneIteratee.tryFailure(e) } doneIteratee.future } } result.trySuccess((responseHeader, enumerator)) STATE.CONTINUE } override def onBodyPartReceived(bodyPart: HttpResponseBodyPart) = { if (!doneOrError) { current = current.pureFlatFold { case Step.Done(a, e) => doneOrError = true Done(a, e) case Step.Cont(k) => k(El(bodyPart.getBodyPartBytes)) case Step.Error(e, input) => doneOrError = true Error(e, input) } STATE.CONTINUE } else { current = null // Must close underlying connection, otherwise async http client will drain the stream bodyPart.markUnderlyingConnectionAsClosed() STATE.ABORT } } override def onCompleted() = { Option(current).foreach(_.run) } override def onThrowable(t: Throwable) = { result.tryFailure(t) errorInStream.tryFailure(t) } }) result.future } } /** * A WS Request builder. */ case class NingWSRequestHolder(client: NingWSClient, url: String, method: String, body: WSBody, headers: Map[String, Seq[String]], queryString: Map[String, Seq[String]], calc: Option[WSSignatureCalculator], auth: Option[(String, String, WSAuthScheme)], followRedirects: Option[Boolean], requestTimeout: Option[Int], virtualHost: Option[String], proxyServer: Option[WSProxyServer]) extends WSRequestHolder { def sign(calc: WSSignatureCalculator): WSRequestHolder = copy(calc = Some(calc)) def withAuth(username: String, password: String, scheme: WSAuthScheme) = copy(auth = Some((username, password, scheme))) def withHeaders(hdrs: (String, String)*) = { val headers = hdrs.foldLeft(this.headers)((m, hdr) => if (m.contains(hdr._1)) m.updated(hdr._1, m(hdr._1) :+ hdr._2) else m + (hdr._1 -> Seq(hdr._2)) ) copy(headers = headers) } def withQueryString(parameters: (String, String)*) = copy(queryString = parameters.foldLeft(queryString) { case (m, (k, v)) => m + (k -> (v +: m.get(k).getOrElse(Nil))) }) def withFollowRedirects(follow: Boolean) = copy(followRedirects = Some(follow)) def withRequestTimeout(timeout: Int) = copy(requestTimeout = Some(timeout)) def withVirtualHost(vh: String) = copy(virtualHost = Some(vh)) def withProxyServer(proxyServer: WSProxyServer) = copy(proxyServer = Some(proxyServer)) def withBody(body: WSBody) = copy(body = body) def withMethod(method: String) = copy(method = method) def execute(): Future[WSResponse] = { prepare().execute } def stream(): Future[(WSResponseHeaders, Enumerator[Array[Byte]])] = { prepare().executeStream() } private[ning] def prepare(): NingWSRequest = { val builder = createBuilder() val builderWithBody = body match { case EmptyBody => builder case FileBody(file) => import com.ning.http.client.generators.FileBodyGenerator val bodyGenerator = new FileBodyGenerator(file) builder.setBody(bodyGenerator) case InMemoryBody(bytes) => builder.setBody(bytes) case StreamedBody(bytes) => builder } new NingWSRequest(client, method, auth, calc, headers, body, builderWithBody) } private def createBuilder() = { val builder = new RequestBuilder(method).setUrl(url) for { header <- headers value <- header._2 } builder.addHeader(header._1, value) for { (key, values) <- queryString value <- values } builder.addQueryParameter(key, value) virtualHost.foreach(builder.setVirtualHost) followRedirects.foreach(builder.setFollowRedirects) proxyServer.foreach { p => builder.setProxyServer(createProxy(p)) } requestTimeout.foreach { t => val config = new PerRequestConfig() config.setRequestTimeoutInMs(t) builder.setPerRequestConfig(config) } builder } private[play] def createProxy(wsServer: WSProxyServer) = { import com.ning.http.client.ProxyServer.Protocol val protocol: Protocol = wsServer.protocol.getOrElse("http").toLowerCase match { case "http" => Protocol.HTTP case "https" => Protocol.HTTPS case "kerberos" => Protocol.KERBEROS case "ntlm" => Protocol.NTLM case "spnego" => Protocol.SPNEGO case _ => scala.sys.error("Unrecognized protocol!") } val ningServer = new AHCProxyServer( protocol, wsServer.host, wsServer.port, wsServer.principal.getOrElse(null), wsServer.password.getOrElse(null)) wsServer.encoding.foreach(ningServer.setEncoding) wsServer.ntlmDomain.foreach(ningServer.setNtlmDomain) for { hosts <- wsServer.nonProxyHosts host <- hosts } ningServer.addNonProxyHost(host) ningServer } } /** * WSPlugin implementation hook. */ class NingWSPlugin(app: Application) extends WSPlugin { @volatile var loaded = false override lazy val enabled = true private val config = new DefaultWSConfigParser(app.configuration, app.classloader).parse() private lazy val ningAPI = new NingWSAPI(app, config) override def onStart() { loaded = true } override def onStop() { if (loaded) { ningAPI.resetClient() loaded = false } } def api = ningAPI } class NingWSAPI(app: Application, clientConfig: WSClientConfig) extends WSAPI { private val clientHolder: AtomicReference[Option[NingWSClient]] = new AtomicReference(None) private[play] def newClient(): NingWSClient = { val asyncClientConfig = buildAsyncClientConfig(clientConfig) new SystemConfiguration().configure(clientConfig) clientConfig.ssl.foreach { _.debug.foreach { debugConfig => app.mode match { case Mode.Prod => Play.logger.warn("NingWSAPI: ws.ssl.debug settings enabled in production mode!") case _ => // do nothing } new DebugConfiguration().configure(debugConfig) } } new NingWSClient(asyncClientConfig) } def client: NingWSClient = { clientHolder.get.getOrElse({ // A critical section of code. Only one caller has the opportunity of creating a default client. synchronized { clientHolder.get match { case None => val client = newClient() clientHolder.set(Some(client)) client case Some(client) => client } } }) } def url(url: String) = client.url(url) /** * resets the underlying AsyncHttpClient */ private[play] def resetClient(): Unit = { clientHolder.getAndSet(None).foreach(oldClient => oldClient.close()) } private[play] def buildAsyncClientConfig(wsClientConfig: WSClientConfig): AsyncHttpClientConfig = { new NingAsyncHttpClientConfigBuilder(wsClientConfig).build() } } /** * The Ning implementation of a WS cookie. */ private class NingWSCookie(ahcCookie: AHCCookie) extends WSCookie { private def noneIfEmpty(value: String): Option[String] = { if (value.isEmpty) None else Some(value) } /** * The underlying cookie object for the client. */ def underlying[T] = ahcCookie.asInstanceOf[T] /** * The domain. */ def domain: String = ahcCookie.getDomain /** * The cookie name. */ def name: Option[String] = noneIfEmpty(ahcCookie.getName) /** * The cookie value. */ def value: Option[String] = noneIfEmpty(ahcCookie.getValue) /** * The path. */ def path: String = ahcCookie.getPath /** * The expiry date. */ def expires: Option[Long] = if (ahcCookie.getExpires == -1) None else Some(ahcCookie.getExpires) /** * The maximum age. */ def maxAge: Option[Int] = if (ahcCookie.getMaxAge == -1) None else Some(ahcCookie.getMaxAge) /** * If the cookie is secure. */ def secure: Boolean = ahcCookie.isSecure /* * Cookie ports should not be used; cookies for a given host are shared across * all the ports on that host. */ override def toString: String = ahcCookie.toString } /** * A WS HTTP response. */ case class NingWSResponse(ahcResponse: AHCResponse) extends WSResponse { import scala.xml._ import play.api.libs.json._ /** * Get the underlying response object. */ @deprecated("Use underlying", "2.3.0") def getAHCResponse = ahcResponse /** * Return the headers of the response as a case-insensitive map */ lazy val allHeaders: Map[String, Seq[String]] = { TreeMap[String, Seq[String]]()(CaseInsensitiveOrdered) ++ mapAsScalaMapConverter(ahcResponse.getHeaders).asScala.mapValues(_.asScala) } /** * @return The underlying response object. */ def underlying[T] = ahcResponse.asInstanceOf[T] /** * The response status code. */ def status: Int = ahcResponse.getStatusCode /** * The response status message. */ def statusText: String = ahcResponse.getStatusText /** * Get a response header. */ def header(key: String): Option[String] = Option(ahcResponse.getHeader(key)) /** * Get all the cookies. */ def cookies: Seq[WSCookie] = { ahcResponse.getCookies.asScala.map(new NingWSCookie(_)) } /** * Get only one cookie, using the cookie name. */ def cookie(name: String): Option[WSCookie] = cookies.find(_.name == Option(name)) /** * The response body as String. */ lazy val body: String = { // RFC-2616#3.7.1 states that any text/* mime type should default to ISO-8859-1 charset if not // explicitly set, while Plays default encoding is UTF-8. So, use UTF-8 if charset is not explicitly // set and content type is not text/*, otherwise default to ISO-8859-1 val contentType = Option(ahcResponse.getContentType).getOrElse("application/octet-stream") val charset = Option(AsyncHttpProviderUtils.parseCharset(contentType)).getOrElse { if (contentType.startsWith("text/")) AsyncHttpProviderUtils.DEFAULT_CHARSET else "utf-8" } ahcResponse.getResponseBody(charset) } /** * The response body as Xml. */ lazy val xml: Elem = Play.XML.loadString(body) /** * The response body as Json. */ lazy val json: JsValue = Json.parse(ahcResponse.getResponseBodyAsBytes) } Other Play Framework source code examplesHere is a short list of links related to this Play Framework NingWS.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.