alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Scala example source code file (ReplyReactor.scala)

This example Scala source code file (ReplyReactor.scala) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Scala tags/keywords

any, any, boolean, list, list, mqueue, none, nothing, outputchannel, outputchannel, partialfunction, partialfunction, reactorcanreply, unit, util

The Scala ReplyReactor.scala source code

/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2011, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.actors

import java.util.{Timer, TimerTask}

/** 
 * Extends the [[scala.actors.Reactor]]
 *    trait with methods to reply to the sender of a message.
 *    Sending a message to a <code>ReplyReactor implicitly
 *    passes a reference to the sender together with the message.
 *
 *  @author Philipp Haller
 *
 *  @define actor `ReplyReactor`
 */
trait ReplyReactor extends Reactor[Any] with ReactorCanReply {

  /* A list of the current senders. The head of the list is
   * the sender of the message that was received last.
   */
  @volatile
  private[actors] var senders: List[OutputChannel[Any]] = List()

  /* This option holds a TimerTask when the actor waits in a
   * reactWithin. The TimerTask is cancelled when the actor
   * resumes.
   *
   * guarded by this
   */
  private[actors] var onTimeout: Option[TimerTask] = None

  /**
   * Returns the $actor which sent the last received message.
   */
  protected[actors] def sender: OutputChannel[Any] = senders.head

  /**
   * Replies with <code>msg to the sender.
   */
  protected[actors] def reply(msg: Any) {
    sender ! msg
  }

  override def !(msg: Any) {
    send(msg, Actor.rawSelf(scheduler))
  }

  override def forward(msg: Any) {
    send(msg, Actor.sender)
  }

  private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
    synchronized {
      if (!onTimeout.isEmpty) {
        onTimeout.get.cancel()
        onTimeout = None
      }
    }
    senders = List(item._2)
    super.resumeReceiver(item, handler, onSameThread)
  }

  private[actors] override def searchMailbox(startMbox: MQueue[Any],
                                             handler: PartialFunction[Any, Any],
                                             resumeOnSameThread: Boolean) {
    var tmpMbox = startMbox
    var done = false
    while (!done) {
      val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
        senders = List(replyTo)
        handler.isDefinedAt(msg)
      })
      if (tmpMbox ne mailbox)
        tmpMbox.foreach((m, s) => mailbox.append(m, s))
      if (null eq qel) {
        synchronized {
          // in mean time new stuff might have arrived
          if (!sendBuffer.isEmpty) {
            tmpMbox = new MQueue[Any]("Temp")
            drainSendBuffer(tmpMbox)
            // keep going
          } else {
            waitingFor = handler
            // see Reactor.searchMailbox
            throw Actor.suspendException
          }
        }
      } else {
        resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
        done = true
      }
    }
  }

  private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable =
    new ReplyReactorTask(this, fun, handler, msg)

  protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = {
    assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
    super.react(handler)
  }

  /**
   * Receives a message from this $actor's mailbox within a certain
   * time span.
   *
   * This method never returns. Therefore, the rest of the computation
   * has to be contained in the actions of the partial function.
   *
   * @param  msec     the time span before timeout
   * @param  handler  a partial function with message patterns and actions
   */
  protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
    assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")

    synchronized { drainSendBuffer(mailbox) }

    // first, remove spurious TIMEOUT message from mailbox if any
    mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)

    while (true) {
      val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
        senders = List(replyTo)
        handler isDefinedAt m
      })
      if (null eq qel) {
        synchronized {
          // in mean time new messages might have arrived
          if (!sendBuffer.isEmpty) {
            drainSendBuffer(mailbox)
            // keep going
          } else if (msec == 0L) {
            // throws Actor.suspendException
            resumeReceiver((TIMEOUT, this), handler, false)
          } else {
            waitingFor = handler
            val thisActor = this
            onTimeout = Some(new TimerTask {
              def run() { thisActor.send(TIMEOUT, thisActor) }
            })
            Actor.timer.schedule(onTimeout.get, msec)
            throw Actor.suspendException
          }
        }
      } else
        resumeReceiver((qel.msg, qel.session), handler, false)
    }
    throw Actor.suspendException
  }

  override def getState: Actor.State.Value = synchronized {
    if (waitingFor ne Reactor.waitingForNone) {
      if (onTimeout.isEmpty)
        Actor.State.Suspended
      else
        Actor.State.TimedSuspended
    } else
      _state
  }

}

Other Scala examples (source code examples)

Here is a short list of links related to this Scala ReplyReactor.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.