|
Akka/Scala example source code file (SerializedSuspendableExecutionContext.scala)
The SerializedSuspendableExecutionContext.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.util import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.ExecutionContext import scala.util.control.NonFatal import scala.annotation.{ tailrec, switch } import akka.dispatch.AbstractNodeQueue private[akka] object SerializedSuspendableExecutionContext { final val Off = 0 final val On = 1 final val Suspended = 2 def apply(throughput: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext = new SerializedSuspendableExecutionContext(throughput)(context match { case s: SerializedSuspendableExecutionContext ⇒ s.context case other ⇒ other }) } /** * This `ExecutionContext` allows to wrap an underlying `ExecutionContext` and provide guaranteed serial execution * of tasks submitted to it. On top of that it also allows for *suspending* and *resuming* processing of tasks. * * WARNING: This type must never leak into User code as anything but `ExecutionContext` * * @param throughput maximum number of tasks to be executed in serial before relinquishing the executing thread. * @param context the underlying context which will be used to actually execute the submitted tasks */ private[akka] final class SerializedSuspendableExecutionContext(throughput: Int)(val context: ExecutionContext) extends AbstractNodeQueue[Runnable] with Runnable with ExecutionContext { import SerializedSuspendableExecutionContext._ require(throughput > 0, s"SerializedSuspendableExecutionContext.throughput must be greater than 0 but was $throughput") private final val state = new AtomicInteger(Off) @tailrec private final def addState(newState: Int): Boolean = { val c = state.get state.compareAndSet(c, c | newState) || addState(newState) } @tailrec private final def remState(oldState: Int) { val c = state.get if (state.compareAndSet(c, c & ~oldState)) attach() else remState(oldState) } /** * Resumes execution of tasks until `suspend` is called, * if it isn't currently suspended, it is a no-op. * This operation is idempotent. */ final def resume(): Unit = remState(Suspended) /** * Suspends execution of tasks until `resume` is called, * this operation is idempotent. */ final def suspend(): Unit = addState(Suspended) final def run(): Unit = { @tailrec def run(done: Int): Unit = if (done < throughput && state.get == On) { poll() match { case null ⇒ () case some ⇒ try some.run() catch { case NonFatal(t) ⇒ context reportFailure t } run(done + 1) } } try run(0) finally remState(On) } final def attach(): Unit = if (!isEmpty() && state.compareAndSet(Off, On)) context execute this override final def execute(task: Runnable): Unit = try add(task) finally attach() override final def reportFailure(t: Throwable): Unit = context reportFailure t /** * O(N) * @return the number of Runnable's currently enqueued */ final def size(): Int = count() override final def toString: String = (state.get: @switch) match { case 0 ⇒ "Off" case 1 ⇒ "On" case 2 ⇒ "Off & Suspended" case 3 ⇒ "On & Suspended" } } Other Akka source code examplesHere is a short list of links related to this Akka SerializedSuspendableExecutionContext.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.