|
Play Framework/Scala example source code file (WebSocketActor.scala)
The WebSocketActor.scala Play Framework example source code
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package play.core.actors
import akka.actor._
import play.api.libs.iteratee._
import akka.actor.Terminated
import scala.reflect.ClassTag
/**
* Integration between Play WebSockets and actors
*/
private[play] object WebSocketActor {
object WebSocketActorSupervisor {
def props[In, Out: ClassTag](enumerator: Enumerator[In], iteratee: Iteratee[Out, Unit],
createHandler: ActorRef => Props) =
Props(new WebSocketActorSupervisor[In, Out](enumerator, iteratee, createHandler))
}
/**
* The actor that supervises and handles all messages to/from the WebSocket actor.
*/
private class WebSocketActorSupervisor[In, Out](enumerator: Enumerator[In], iteratee: Iteratee[Out, Unit],
createHandler: ActorRef => Props)(implicit messageType: ClassTag[Out]) extends Actor {
import context.dispatcher
/*
* There are two ways that this actor might shutdown. One is that the child might send a poison pill. The other
* is that the WebSocket might close, which will send a PoisonPill. The problem is, if the child sends a poison
* pill, then the WebSocket will close as a result, but there's no way to cancel the callback being invoked when
* the WebSocket closes. So that callback will send a PoisonPill, but that PoisonPill will end up in the dead
* letter queue because the actor will already be stopped. So, to prevent that second PoisonPill from being sent,
* we use this flag.
*
* There still is of course a race condition - the client might close the WebSocket at the same time as the child
* actor decides to close, and then two PoisonPills might still be sent - we can't avoid that. But this just
* prevents the double PoisonPill from being a normal thing that happens every time a child initiates shutdown.
*/
@volatile var shutdown = false
// The actor to handle the WebSocket
val webSocketActor = context.watch(context.actorOf(createHandler(self), "handler"))
// Use a broadcast enumerator to imperatively push messages into the WebSocket
val channel = {
val (enum, chan) = Concurrent.broadcast[Out]
// Ensure we feed EOF into the iteratee when done, to ensure that the WebSocket gets closed
enum |>>> iteratee
chan
}
// Use a foreach iteratee to consume the WebSocket and feed it into the Actor
// It's very important that we use the trampoline execution context here, otherwise it's possible that
val consumer = Iteratee.foreach[In] { msg =>
webSocketActor ! msg
}(play.api.libs.iteratee.Execution.trampoline)
(enumerator |>> consumer).onComplete { _ =>
// When the WebSocket is complete, either due to an error or not, shutdown
if (!shutdown)
webSocketActor ! PoisonPill
}
def receive = {
case _: Terminated =>
shutdown = true
// Child has terminated, close the WebSocket.
channel.end()
context.stop(self)
// A message of the type that we're handling has been received
case messageType(a) => channel.push(a)
}
override def postStop() = {
shutdown = true
// In the normal shutdown case, this will already have been called, that's ok, channel.end() is a no-op in that
// case. This does however handle the case where this supervisor crashes, or when it's stopped externally.
channel.end()
}
override def supervisorStrategy = OneForOneStrategy() {
case _ => SupervisorStrategy.Stop
}
}
object WebSocketsActor {
val props = Props(new WebSocketsActor)
/**
* Connect an actor to the WebSocket on the end of the given enumerator/iteratee.
*
* @param requestId The requestId. Used to name the actor.
* @param enumerator The enumerator to send messages to.
* @param iteratee The iteratee to consume messages from.
* @param createHandler A function that creates a handler to handle the WebSocket, given an actor to send messages
* to.
* @param messageType The type of message this WebSocket deals with.
*/
case class Connect[In, Out](requestId: Long, enumerator: Enumerator[In], iteratee: Iteratee[Out, Unit],
createHandler: ActorRef => Props)(implicit val messageType: ClassTag[Out])
}
/**
* The actor responsible for creating all web sockets
*/
private class WebSocketsActor extends Actor {
import WebSocketsActor._
def receive = {
case c @ Connect(requestId, enumerator, iteratee, createHandler) =>
implicit val mt = c.messageType
context.actorOf(WebSocketActorSupervisor.props(enumerator, iteratee, createHandler),
requestId.toString)
}
}
/**
* The extension for managing WebSockets
*/
object WebSocketsExtension extends ExtensionId[WebSocketsExtension] {
def createExtension(system: ExtendedActorSystem) = {
new WebSocketsExtension(system.systemActorOf(WebSocketsActor.props, "websockets"))
}
}
class WebSocketsExtension(val actor: ActorRef) extends Extension
}
Other Play Framework source code examplesHere is a short list of links related to this Play Framework WebSocketActor.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.