|
Play Framework/Scala example source code file (RunQueue.scala)
The RunQueue.scala Play Framework example source code
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package play.api.libs.iteratee
import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, Future }
import java.util.concurrent.atomic.AtomicReference
/**
* Runs asynchronous operations in order. Operations are queued until
* they can be run. Each item is added to a schedule, then each item
* in the schedule is executed in order.
*
* {{{
* val runQueue = new RunQueue()
*
* // This operation will run first. It completes when
* // the future it returns is completed.
* runQueue.schedule {
* Future { ... do some stuff ... }
* }
*
* // This operation will run second. It will start running
* // when the previous operation's futures complete.
* runQueue.schedule {
* future1.flatMap(x => future2.map(y => x + y))
* }
*
* // This operation will run when the second operation's
* // future finishes. It's a simple synchronous operation.
* runQueue.scheduleSimple {
* 25
* }
* }}}
*
* Unlike solutions built around a standard concurrent queue, there is no
* need to use a separate thread to read from the queue and execute each
* operation. The RunQueue runner performs both scheduling and
* executions of operations internally without the need for a separate
* thread. This means the RunQueue doesn't consume any resources
* when it isn't being used.
*
* No locks are held by this class, only atomic operations are used.
*/
private[play] final class RunQueue {
import RunQueue._
/**
* The state of the RunQueue, either Inactive or Runnning.
*/
private val state = new AtomicReference[State](Inactive)
/**
* Schedule an operation to be run. The operation is considered
* complete when the Future that it returns is completed. In other words,
* the next operation will not be started until the future is completed.
*
* Successive calls to the `run` and `runSynchronous` methods use an
* atomic value to guarantee ordering (a *happens-before* relationship).
*
* The operation will execute in the given ExecutionContext.
*/
def schedule[A](body: => Future[A])(implicit ec: ExecutionContext): Unit = {
schedule(Op(() => body.asInstanceOf[Future[Unit]], ec.prepare))
}
/**
* Schedule a simple synchronous operation to be run. The operation is considered
* complete when it finishes executing. In other words, the next operation will begin
* execution immediately when this operation finishes execution.
*
* This method is equivalent to
* {{{
* schedule {
* body
* Future.successful(())
* }
* }}}
*
* Successive calls to the `run` and `runSynchronous` methods use an
* atomic value to guarantee ordering (a *happens-before* relationship).
*
* The operation will execute in the given ExecutionContext.
*/
def scheduleSimple(body: => Unit)(implicit ec: ExecutionContext): Unit = {
schedule {
body
Future.successful(())
}
}
/**
* Schedule a reified operation for execution. If no other operations
* are currently executing then this operation will be started immediately.
* But if there are other operations currently running then this operation
* be added to the pending queue of operations awaiting execution.
*
* This method encapsulates an atomic compare-and-set operation, therefore
* it may be retried.
*/
@tailrec
private def schedule(op: Op): Unit = {
val prevState = state.get
val newState = prevState match {
case Inactive => Running(Vector.empty)
case Running(pending) => Running(pending :+ op)
}
val updated = state.compareAndSet(prevState, newState)
if (updated) {
prevState match {
case Inactive =>
// We've update the state to say that we're running an op,
// so we need to actually start it running.
execute(op)
case _ =>
}
} else schedule(op) // Try again
}
private def execute(op: Op): Unit = {
val f1: Future[Future[Unit]] = Future(op.thunk())(op.ec)
val f2: Future[Unit] = f1.flatMap(identity)(Execution.trampoline)
f2.onComplete(_ => deschedule())(Execution.trampoline)
}
/**
* *De*schedule a reified operation for execution. If no other operations
* are pending, then the RunQueue will enter an inactive state.
* Otherwise, the first pending item will be scheduled for execution.
*/
@tailrec
private def deschedule(): Unit = {
val prevState = state.get
val newState = prevState match {
case Inactive => throw new IllegalStateException("Can't be inactive, must be Running or Pending")
case Running(pending) if !pending.isEmpty => Running(pending.tail)
case Running(pending) => Inactive
}
val updated = state.compareAndSet(prevState, newState)
if (updated) {
prevState match {
// We have a pending operation to execute
case Running(pending) if !pending.isEmpty => execute(pending.head)
case _ =>
}
} else deschedule() // Try again
}
}
private object RunQueue {
/**
* A reified operation to be executed.
*
* @param thunk The logic to execute.
* @param ec The ExecutionContext to use for execution. Already prepared.
*/
final case class Op(thunk: () => Future[Unit], ec: ExecutionContext)
sealed trait State
/**
* The state of a RunQueue that is neither running an operation nor
* has any pending operations.
*/
final case object Inactive extends State
/**
* The state of a RunQueue that is currently running a task and has
* zero or more pending operations.
*/
final case class Running(pending: Vector[Op]) extends State
}
Other Play Framework source code examplesHere is a short list of links related to this Play Framework RunQueue.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.