|
Scala example source code file (ResizableThreadPoolScheduler.scala)
This example Scala source code file (ResizableThreadPoolScheduler.scala) is included in the DevDaily.com
"Java Source Code
Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.
The Scala ResizableThreadPoolScheduler.scala source code
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2005-2011, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.actors.scheduler
import scala.actors.threadpool.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue,
ThreadFactory}
import scala.actors.{Debug, IScheduler}
import scala.concurrent.ManagedBlocker
/**
* This scheduler class uses a <code>ThreadPoolExecutor
* to execute <code>Actors.
*
* The scheduler attempts to shut down itself and the underlying
* <code>ThreadPoolExecutor only if terminate
* is set to true. Otherwise, the scheduler must be shut down
* explicitly.
*
* @author Philipp Haller
*/
class ResizableThreadPoolScheduler(protected val terminate: Boolean,
protected val daemon: Boolean)
extends Thread with IScheduler with TerminationMonitor {
setDaemon(daemon)
// guarded by this
private var terminating = false
// guarded by this
private var suspending = false
// this has to be a java.util.Collection, since this is what
// the ForkJoinPool returns.
@volatile
private var drainedTasks: java.util.List[_] = null
// guarded by this
private var coreSize = ThreadPoolConfig.corePoolSize
private val maxSize = ThreadPoolConfig.maxPoolSize
private val numCores = Runtime.getRuntime().availableProcessors()
protected val CHECK_FREQ = 10
private class DaemonThreadFactory extends ThreadFactory {
def newThread(r: Runnable): Thread = {
val t = new Thread(r)
t.setDaemon(daemon)
t
}
}
private val threadFac = new DaemonThreadFactory
private def makeNewPool(): ThreadPoolExecutor = {
val workQueue = new LinkedBlockingQueue
new ThreadPoolExecutor(coreSize,
maxSize,
60000L,
TimeUnit.MILLISECONDS,
workQueue,
threadFac,
new ThreadPoolExecutor.CallerRunsPolicy)
}
// guarded by this
private var executor = makeNewPool()
Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize)
def this(d: Boolean) {
this(true, d)
}
def this() {
this(false)
}
private def numWorkersBlocked = {
executor.mainLock.lock()
val iter = executor.workers.iterator()
var numBlocked = 0
while (iter.hasNext()) {
val w = iter.next().asInstanceOf[ThreadPoolExecutor#Worker]
if (w.tryLock()) {
// worker is idle
w.unlock()
} else {
val s = w.thread.getState()
if (s == Thread.State.WAITING || s == Thread.State.TIMED_WAITING)
numBlocked += 1
}
}
executor.mainLock.unlock()
numBlocked
}
override def run() {
try {
while (true) {
this.synchronized {
try {
wait(CHECK_FREQ)
} catch {
case _: InterruptedException =>
}
if (terminating)
throw new QuitControl
if (!suspending) {
gc()
// check if we need more worker threads
val activeBlocked = numWorkersBlocked
if (coreSize - activeBlocked < numCores && coreSize < maxSize) {
coreSize = numCores + activeBlocked
executor.setCorePoolSize(coreSize)
} else if (terminate && allActorsTerminated) {
// if all worker threads idle terminate
if (executor.getActiveCount() == 0) {
Debug.info(this+": initiating shutdown...")
Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize)
terminating = true
throw new QuitControl
}
}
} else {
drainedTasks = executor.shutdownNow()
Debug.info(this+": drained "+drainedTasks.size()+" tasks")
terminating = true
throw new QuitControl
}
} // sync
}
} catch {
case _: QuitControl =>
executor.shutdown()
// allow thread to exit
}
}
def execute(task: Runnable): Unit =
executor execute task
def execute(fun: => Unit): Unit =
executor.execute(new Runnable {
def run() { fun }
})
/** Shuts down the scheduler.
*/
def shutdown(): Unit = synchronized {
terminating = true
}
def isActive = synchronized {
!terminating && (executor ne null) && !executor.isShutdown()
}
def managedBlock(blocker: ManagedBlocker) {
blocker.block()
}
/** Suspends the scheduler. All threads that were in use by the
* scheduler and its internal thread pool are terminated.
*/
def snapshot() = synchronized {
suspending = true
}
/** Resumes the execution of the scheduler if it was previously
* suspended using <code>snapshot.
*/
def restart() {
synchronized {
if (!suspending)
sys.error("snapshot has not been invoked")
else if (isActive)
sys.error("scheduler is still active")
else
suspending = false
executor = makeNewPool()
}
val iter = drainedTasks.iterator()
while (iter.hasNext()) {
executor.execute(iter.next().asInstanceOf[Runnable])
}
start()
}
}
Other Scala examples (source code examples)
Here is a short list of links related to this Scala ResizableThreadPoolScheduler.scala source code file:
|