alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Scala example source code file (ProcessImpl.scala)

This example Scala source code file (ProcessImpl.scala) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Scala tags/keywords

basicio, io, none, processbuilder, processbuilder, processio, sequentialprocess, some, syncvar, syncvar, t, thread, thread, unit, unit

The Scala ProcessImpl.scala source code

/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2011, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala.sys
package process

import processInternal._
import java.io.{ PipedInputStream, PipedOutputStream }

private[process] trait ProcessImpl {
  self: Process.type =>
  
  /** Runs provided code in a new Thread and returns the Thread instance. */
  private[process] object Spawn {
    def apply(f: => Unit): Thread = apply(f, false)
    def apply(f: => Unit, daemon: Boolean): Thread = {
      val thread = new Thread() { override def run() = { f } }
      thread.setDaemon(daemon)
      thread.start()
      thread
    }
  }
  private[process] object Future {
    def apply[T](f: => T): () => T = {
      val result = new SyncVar[Either[Throwable, T]]
      def run(): Unit =
        try result set Right(f)
        catch { case e: Exception => result set Left(e) }
      
      Spawn(run)

      () => result.get match {
        case Right(value)    => value
        case Left(exception) => throw exception      
      }
    }
  }

  private[process] class AndProcess(
    a: ProcessBuilder,
    b: ProcessBuilder,
    io: ProcessIO
  ) extends SequentialProcess(a, b, io, _ == 0)

  private[process] class OrProcess(
    a: ProcessBuilder,
    b: ProcessBuilder,
    io: ProcessIO
  ) extends SequentialProcess(a, b, io, _ != 0)

  private[process] class ProcessSequence(
    a: ProcessBuilder,
    b: ProcessBuilder,
    io: ProcessIO
  ) extends SequentialProcess(a, b, io, _ => true)

  private[process] class SequentialProcess(
    a: ProcessBuilder,
    b: ProcessBuilder,
    io: ProcessIO,
    evaluateSecondProcess: Int => Boolean
  ) extends CompoundProcess {
  
    protected[this] override def runAndExitValue() = {
      val first = a.run(io)
      runInterruptible(first.exitValue)(first.destroy()) flatMap { codeA =>
        if (evaluateSecondProcess(codeA)) {
          val second = b.run(io)
          runInterruptible(second.exitValue)(second.destroy())
        }
        else Some(codeA)
      }
    }
  }

  private[process] abstract class BasicProcess extends Process {
    def start(): Unit
  }

  private[process] abstract class CompoundProcess extends BasicProcess {
    def destroy()   = destroyer()
    def exitValue() = getExitValue() getOrElse sys.error("No exit code: process destroyed.")
    def start()     = getExitValue

    protected lazy val (getExitValue, destroyer) = {
      val code = new SyncVar[Option[Int]]()
      code set None
      val thread = Spawn(code set runAndExitValue())

      (
        Future { thread.join(); code.get },
        () => thread.interrupt()
      )
    }

    /** Start and block until the exit value is available and then return it in Some.  Return None if destroyed (use 'run')*/
    protected[this] def runAndExitValue(): Option[Int]

    protected[this] def runInterruptible[T](action: => T)(destroyImpl: => Unit): Option[T] = {
      try   Some(action)
      catch onInterrupt { destroyImpl; None }
    }
  }

  private[process] class PipedProcesses(a: ProcessBuilder, b: ProcessBuilder, defaultIO: ProcessIO, toError: Boolean) extends CompoundProcess {
    protected[this] override def runAndExitValue() = {
      val currentSource = new SyncVar[Option[InputStream]]
      val pipeOut       = new PipedOutputStream
      val source        = new PipeSource(currentSource, pipeOut, a.toString)
      source.start()
    
      val pipeIn      = new PipedInputStream(pipeOut)
      val currentSink = new SyncVar[Option[OutputStream]]
      val sink        = new PipeSink(pipeIn, currentSink, b.toString)
      sink.start()

      def handleOutOrError(fromOutput: InputStream) = currentSource put Some(fromOutput)

      val firstIO =
        if (toError)
          defaultIO.withError(handleOutOrError)
        else
          defaultIO.withOutput(handleOutOrError)
      val secondIO = defaultIO.withInput(toInput => currentSink put Some(toInput))
    
      val second = b.run(secondIO)
      val first = a.run(firstIO)
      try {
        runInterruptible {
          val exit1 = first.exitValue
          currentSource put None
          currentSink put None
          val exit2 = second.exitValue
          // Since file redirection (e.g. #>) is implemented as a piped process,
          // we ignore its exit value so cmd #> file doesn't always return 0.
          if (b.hasExitValue) exit2 else exit1
        } {
          first.destroy()
          second.destroy()
        }
      }
      finally {
        BasicIO close pipeIn
        BasicIO close pipeOut
      }
    }
  }

  private[process] abstract class PipeThread(isSink: Boolean, labelFn: () => String) extends Thread {
    def run(): Unit
  
    private[process] def runloop(src: InputStream, dst: OutputStream): Unit = {
      try     BasicIO.transferFully(src, dst)
      catch   ioFailure(ioHandler)
      finally BasicIO close {
        if (isSink) dst else src
      }
    }
    private def ioHandler(e: IOException) {
      println("I/O error " + e.getMessage + " for process: " + labelFn())
      e.printStackTrace()
    }
  }

  private[process] class PipeSource(
    currentSource: SyncVar[Option[InputStream]],
    pipe: PipedOutputStream,
    label: => String
  ) extends PipeThread(false, () => label) {

    final override def run(): Unit = currentSource.get match {
      case Some(source) =>
        try runloop(source, pipe)
        finally currentSource.unset()

        run()
      case None =>
        currentSource.unset()
        BasicIO close pipe
    }
  }
  private[process] class PipeSink(
    pipe: PipedInputStream,
    currentSink: SyncVar[Option[OutputStream]],
    label: => String
  ) extends PipeThread(true, () => label) {

    final override def run(): Unit = currentSink.get match {
      case Some(sink) =>
        try runloop(pipe, sink)
        finally currentSink.unset()
      
        run()
      case None =>
        currentSink.unset()
    }
  }

  /** A thin wrapper around a java.lang.Process.  `ioThreads` are the Threads created to do I/O.
  * The implementation of `exitValue` waits until these threads die before returning. */
  private[process] class DummyProcess(action: => Int) extends Process {
    private[this] val exitCode = Future(action)
    override def exitValue() = exitCode()
    override def destroy() { }
  }
  /** A thin wrapper around a java.lang.Process.  `outputThreads` are the Threads created to read from the
  * output and error streams of the process.  `inputThread` is the Thread created to write to the input stream of
  * the process.
  * The implementation of `exitValue` interrupts `inputThread` and then waits until all I/O threads die before
  * returning. */
  private[process] class SimpleProcess(p: JProcess, inputThread: Thread, outputThreads: List[Thread]) extends Process {
    override def exitValue() = {
      try p.waitFor()                   // wait for the process to terminate
      finally inputThread.interrupt()   // we interrupt the input thread to notify it that it can terminate
      outputThreads foreach (_.join())  // this ensures that all output is complete before returning (waitFor does not ensure this)

      p.exitValue()
    }
    override def destroy() = {
      try p.destroy()
      finally inputThread.interrupt()
    }
  }
  private[process] final class ThreadProcess(thread: Thread, success: SyncVar[Boolean]) extends Process {
    override def exitValue() = {
      thread.join()
      if (success.get) 0 else 1
    }
    override def destroy() { thread.interrupt() }
  }
}

Other Scala examples (source code examples)

Here is a short list of links related to this Scala ProcessImpl.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.