alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Scala example source code file (NetKernel.scala)

This example Scala source code file (NetKernel.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Scala source code examples by using tags.

All credit for the original source code belongs to scala-lang.org; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Scala tags/keywords

actor, anyref, collection, function2, locator, node, none, outputchannel, proxy, some, symbol, unit

The NetKernel.scala Scala example source code

/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.actors
package remote

import scala.collection.mutable

case class NamedSend(senderLoc: Locator, receiverLoc: Locator, data: Array[Byte], session: Symbol)

case class RemoteApply0(senderLoc: Locator, receiverLoc: Locator, rfun: Function2[AbstractActor, Proxy, Unit])
case class LocalApply0(rfun: Function2[AbstractActor, Proxy, Unit], a: AbstractActor)

case class  SendTo(a: OutputChannel[Any], msg: Any, session: Symbol)
case object Terminate

case class Locator(node: Node, name: Symbol)

/**
 * @version 0.9.17
 * @author Philipp Haller
 */
private[remote] class NetKernel(service: Service) {

  def sendToNode(node: Node, msg: AnyRef) = {
    val bytes = service.serializer.serialize(msg)
    service.send(node, bytes)
  }

  def namedSend(senderLoc: Locator, receiverLoc: Locator,
                msg: AnyRef, session: Symbol) {
    val bytes = service.serializer.serialize(msg)
    sendToNode(receiverLoc.node, NamedSend(senderLoc, receiverLoc, bytes, session))
  }

  private val actors = new mutable.HashMap[Symbol, OutputChannel[Any]]
  private val names = new mutable.HashMap[OutputChannel[Any], Symbol]

  def register(name: Symbol, a: OutputChannel[Any]): Unit = synchronized {
    actors(name) = a
    names(a) = name
  }

  def getOrCreateName(from: OutputChannel[Any]) = names.get(from) match {
    case None =>
      val freshName = FreshNameCreator.newName("remotesender")
      register(freshName, from)
      freshName
    case Some(name) =>
      name
  }

  def send(node: Node, name: Symbol, msg: AnyRef): Unit =
    send(node, name, msg, 'nosession)

  def send(node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
    val senderLoc = Locator(service.node, getOrCreateName(Actor.self(Scheduler)))
    val receiverLoc = Locator(node, name)
    namedSend(senderLoc, receiverLoc, msg, session)
  }

  def forward(from: OutputChannel[Any], node: Node, name: Symbol, msg: AnyRef, session: Symbol) {
    val senderLoc = Locator(service.node, getOrCreateName(from))
    val receiverLoc = Locator(node, name)
    namedSend(senderLoc, receiverLoc, msg, session)
  }

  def remoteApply(node: Node, name: Symbol, from: OutputChannel[Any], rfun: Function2[AbstractActor, Proxy, Unit]) {
    val senderLoc = Locator(service.node, getOrCreateName(from))
    val receiverLoc = Locator(node, name)
    sendToNode(receiverLoc.node, RemoteApply0(senderLoc, receiverLoc, rfun))
  }

  def createProxy(node: Node, sym: Symbol): Proxy = {
    val p = new Proxy(node, sym, this)
    proxies((node, sym)) = p
    p
  }

  val proxies = new mutable.HashMap[(Node, Symbol), Proxy]

  def getOrCreateProxy(senderNode: Node, senderName: Symbol): Proxy =
    proxies.synchronized {
      proxies.get((senderNode, senderName)) match {
        case Some(senderProxy) => senderProxy
        case None              => createProxy(senderNode, senderName)
      }
    }

  /* Register proxy if no other proxy has been registered.
   */
  def registerProxy(senderNode: Node, senderName: Symbol, p: Proxy): Unit =
    proxies.synchronized {
      proxies.get((senderNode, senderName)) match {
        case Some(senderProxy) => // do nothing
        case None              => proxies((senderNode, senderName)) = p
      }
    }

  def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
    msg match {
      case cmd@RemoteApply0(senderLoc, receiverLoc, rfun) =>
        Debug.info(this+": processing "+cmd)
        actors.get(receiverLoc.name) match {
          case Some(a) =>
            val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
            senderProxy.send(LocalApply0(rfun, a.asInstanceOf[AbstractActor]), null)

          case None =>
            // message is lost
            Debug.info(this+": lost message")
        }

      case cmd@NamedSend(senderLoc, receiverLoc, data, session) =>
        Debug.info(this+": processing "+cmd)
        actors.get(receiverLoc.name) match {
          case Some(a) =>
            try {
              val msg = service.serializer.deserialize(data)
              val senderProxy = getOrCreateProxy(senderLoc.node, senderLoc.name)
              senderProxy.send(SendTo(a, msg, session), null)
            } catch {
              case e: Exception =>
                Debug.error(this+": caught "+e)
            }

          case None =>
            // message is lost
            Debug.info(this+": lost message")
        }
    }
  }

  def terminate() {
    // tell all proxies to terminate
    proxies.values foreach { _.send(Terminate, null) }

    // tell service to terminate
    service.terminate()
  }
}

Other Scala source code examples

Here is a short list of links related to this Scala NetKernel.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.