alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Play Framework/Scala example source code file (RunQueue.scala)

This example Play Framework source code file (RunQueue.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Play Framework (and Scala) source code examples by using tags.

All credit for the original source code belongs to Play Framework; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Play Framework tags/keywords

api, concurrent, executioncontext, future, inactive, iterate, library, op, play, play framework, running, runqueue, state, try, unit, vector

The RunQueue.scala Play Framework example source code

/*
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package play.api.libs.iteratee

import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, Future }
import java.util.concurrent.atomic.AtomicReference

/**
 * Runs asynchronous operations in order. Operations are queued until
 * they can be run. Each item is added to a schedule, then each item
 * in the schedule is executed in order.
 *
 * {{{
 * val runQueue = new RunQueue()
 *
 * // This operation will run first. It completes when
 * // the future it returns is completed.
 * runQueue.schedule {
 *   Future { ... do some stuff ... }
 * }
 *
 * // This operation will run second. It will start running
 * // when the previous operation's futures complete.
 * runQueue.schedule {
 *   future1.flatMap(x => future2.map(y => x + y))
 * }
 *
 * // This operation will run when the second operation's
 * // future finishes. It's a simple synchronous operation.
 * runQueue.scheduleSimple {
 *   25
 * }
 * }}}
 *
 * Unlike solutions built around a standard concurrent queue, there is no
 * need to use a separate thread to read from the queue and execute each
 * operation. The RunQueue runner performs both scheduling and
 * executions of operations internally without the need for a separate
 * thread. This means the RunQueue doesn't consume any resources
 * when it isn't being used.
 *
 * No locks are held by this class, only atomic operations are used.
 */
private[play] final class RunQueue {

  import RunQueue._

  /**
   * The state of the RunQueue, either Inactive or Runnning.
   */
  private val state = new AtomicReference[State](Inactive)

  /**
   * Schedule an operation to be run. The operation is considered
   * complete when the Future that it returns is completed. In other words,
   * the next operation will not be started until the future is completed.
   *
   * Successive calls to the `run` and `runSynchronous` methods use an
   * atomic value to guarantee ordering (a *happens-before* relationship).
   *
   * The operation will execute in the given ExecutionContext.
   */
  def schedule[A](body: => Future[A])(implicit ec: ExecutionContext): Unit = {
    schedule(Op(() => body.asInstanceOf[Future[Unit]], ec.prepare))
  }

  /**
   * Schedule a simple synchronous operation to be run. The operation is considered
   * complete when it finishes executing. In other words, the next operation will begin
   * execution immediately when this operation finishes execution.
   *
   * This method is equivalent to
   * {{{
   * schedule {
   *   body
   *   Future.successful(())
   * }
   * }}}
   *
   * Successive calls to the `run` and `runSynchronous` methods use an
   * atomic value to guarantee ordering (a *happens-before* relationship).
   *
   * The operation will execute in the given ExecutionContext.
   */
  def scheduleSimple(body: => Unit)(implicit ec: ExecutionContext): Unit = {
    schedule {
      body
      Future.successful(())
    }
  }

  /**
   * Schedule a reified operation for execution. If no other operations
   * are currently executing then this operation will be started immediately.
   * But if there are other operations currently running then this operation
   * be added to the pending queue of operations awaiting execution.
   *
   * This method encapsulates an atomic compare-and-set operation, therefore
   * it may be retried.
   */
  @tailrec
  private def schedule(op: Op): Unit = {
    val prevState = state.get
    val newState = prevState match {
      case Inactive => Running(Vector.empty)
      case Running(pending) => Running(pending :+ op)
    }
    val updated = state.compareAndSet(prevState, newState)
    if (updated) {
      prevState match {
        case Inactive =>
          // We've update the state to say that we're running an op,
          // so we need to actually start it running.
          execute(op)
        case _ =>
      }
    } else schedule(op) // Try again
  }

  private def execute(op: Op): Unit = {
    val f1: Future[Future[Unit]] = Future(op.thunk())(op.ec)
    val f2: Future[Unit] = f1.flatMap(identity)(Execution.trampoline)
    f2.onComplete(_ => deschedule())(Execution.trampoline)
  }

  /**
   * *De*schedule a reified operation for execution. If no other operations
   * are pending, then the RunQueue will enter an inactive state.
   * Otherwise, the first pending item will be scheduled for execution.
   */
  @tailrec
  private def deschedule(): Unit = {
    val prevState = state.get
    val newState = prevState match {
      case Inactive => throw new IllegalStateException("Can't be inactive, must be Running or Pending")
      case Running(pending) if !pending.isEmpty => Running(pending.tail)
      case Running(pending) => Inactive
    }
    val updated = state.compareAndSet(prevState, newState)
    if (updated) {
      prevState match {
        // We have a pending operation to execute
        case Running(pending) if !pending.isEmpty => execute(pending.head)
        case _ =>
      }
    } else deschedule() // Try again
  }

}

private object RunQueue {

  /**
   * A reified operation to be executed.
   *
   * @param thunk The logic to execute.
   * @param ec The ExecutionContext to use for execution. Already prepared.
   */
  final case class Op(thunk: () => Future[Unit], ec: ExecutionContext)

  sealed trait State
  /**
   * The state of a RunQueue that is neither running an operation nor
   * has any pending operations.
   */
  final case object Inactive extends State
  /**
   * The state of a RunQueue that is currently running a task and has
   * zero or more pending operations.
   */
  final case class Running(pending: Vector[Op]) extends State

}

Other Play Framework source code examples

Here is a short list of links related to this Play Framework RunQueue.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.