alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Scala example source code file (BatchingExecutor.scala)

This example Scala source code file (BatchingExecutor.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Scala source code examples by using tags.

All credit for the original source code belongs to scala-lang.org; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Scala tags/keywords

annotation, batch, blockcontext, boolean, concurrent, executor, if, list, nil, runnable, t, unit

The BatchingExecutor.scala Scala example source code

/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala.concurrent

import java.util.concurrent.Executor
import scala.annotation.tailrec

/**
 * Mixin trait for an Executor
 * which groups multiple nested `Runnable.run()` calls
 * into a single Runnable passed to the original
 * Executor. This can be a useful optimization
 * because it bypasses the original context's task
 * queue and keeps related (nested) code on a single
 * thread which may improve CPU affinity. However,
 * if tasks passed to the Executor are blocking
 * or expensive, this optimization can prevent work-stealing
 * and make performance worse. Also, some ExecutionContext
 * may be fast enough natively that this optimization just
 * adds overhead.
 * The default ExecutionContext.global is already batching
 * or fast enough not to benefit from it; while
 * `fromExecutor` and `fromExecutorService` do NOT add
 * this optimization since they don't know whether the underlying
 * executor will benefit from it.
 * A batching executor can create deadlocks if code does
 * not use `scala.concurrent.blocking` when it should,
 * because tasks created within other tasks will block
 * on the outer task completing.
 * This executor may run tasks in any order, including LIFO order.
 * There are no ordering guarantees.
 *
 * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
 * in the calling thread synchronously. It must enqueue/handoff the Runnable.
 */
private[concurrent] trait BatchingExecutor extends Executor {

  // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside
  private val _tasksLocal = new ThreadLocal[List[Runnable]]()

  private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext {
    private var parentBlockContext: BlockContext = _
    // this method runs in the delegate ExecutionContext's thread
    override def run(): Unit = {
      require(_tasksLocal.get eq null)

      val prevBlockContext = BlockContext.current
      BlockContext.withBlockContext(this) {
        try {
          parentBlockContext = prevBlockContext

          @tailrec def processBatch(batch: List[Runnable]): Unit = batch match {
            case Nil => ()
            case head :: tail =>
              _tasksLocal set tail
              try {
                head.run()
              } catch {
                case t: Throwable =>
                  // if one task throws, move the
                  // remaining tasks to another thread
                  // so we can throw the exception
                  // up to the invoking executor
                  val remaining = _tasksLocal.get
                  _tasksLocal set Nil
                  unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails?
                  throw t // rethrow
              }
              processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here
          }

          processBatch(initial)
        } finally {
          _tasksLocal.remove()
          parentBlockContext = null
        }
      }
    }

    override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
      // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
      {
        val tasks = _tasksLocal.get
        _tasksLocal set Nil
        if ((tasks ne null) && tasks.nonEmpty)
          unbatchedExecute(new Batch(tasks))
      }

      // now delegate the blocking to the previous BC
      require(parentBlockContext ne null)
      parentBlockContext.blockOn(thunk)
    }
  }

  protected def unbatchedExecute(r: Runnable): Unit

  override def execute(runnable: Runnable): Unit = {
    if (batchable(runnable)) { // If we can batch the runnable
      _tasksLocal.get match {
        case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
        case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch
      }
    } else unbatchedExecute(runnable) // If not batchable, just delegate to underlying
  }

  /** Override this to define which runnables will be batched. */
  def batchable(runnable: Runnable): Boolean = runnable match {
    case _: OnCompleteRunnable => true
    case _                     => false
  }
}

Other Scala source code examples

Here is a short list of links related to this Scala BatchingExecutor.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.