|
Scala example source code file (Proxy.scala)
The Proxy.scala Scala example source code/* __ *\ ** ________ ___ / / ___ Scala API ** ** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** ** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** ** /____/\___/_/ |_/____/_/ | | ** ** |/ ** \* */ package scala.actors package remote import scala.collection.mutable /** * @author Philipp Haller */ private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor with Serializable { import java.io.{IOException, ObjectOutputStream, ObjectInputStream} type Future[+P] = scala.actors.Future[P] @transient private[remote] var del: Actor = null startDelegate() @throws(classOf[IOException]) private def writeObject(out: ObjectOutputStream) { out.defaultWriteObject() } @throws(classOf[ClassNotFoundException]) @throws(classOf[IOException]) private def readObject(in: ObjectInputStream) { in.defaultReadObject() setupKernel() startDelegate() } private def startDelegate() { del = new DelegateActor(this, node, name, kernel) del.start() } private def setupKernel() { kernel = RemoteActor.someNetKernel kernel.registerProxy(node, name, this) } def !(msg: Any): Unit = del ! msg def send(msg: Any, replyCh: OutputChannel[Any]): Unit = del.send(msg, replyCh) def forward(msg: Any): Unit = del.forward(msg) def receiver: Actor = del def !?(msg: Any): Any = del !? msg def !?(msec: Long, msg: Any): Option[Any] = del !? (msec, msg) def !!(msg: Any): Future[Any] = del !! msg def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = del !! (msg, f) def linkTo(to: AbstractActor): Unit = del ! Apply0(new LinkToFun) def unlinkFrom(from: AbstractActor): Unit = del ! Apply0(new UnlinkFromFun) def exit(from: AbstractActor, reason: AnyRef): Unit = del ! Apply0(new ExitFun(reason)) override def toString() = name+"@"+node } // Proxy is private[remote], but these classes are public and use it in a public // method signature. That makes the only method they have non-overriddable. // So I made them final, which seems appropriate anyway. final class LinkToFun extends Function2[AbstractActor, Proxy, Unit] with Serializable { def apply(target: AbstractActor, creator: Proxy) { target.linkTo(creator) } override def toString = "<LinkToFun>" } final class UnlinkFromFun extends Function2[AbstractActor, Proxy, Unit] with Serializable { def apply(target: AbstractActor, creator: Proxy) { target.unlinkFrom(creator) } override def toString = "<UnlinkFromFun>" } final class ExitFun(reason: AnyRef) extends Function2[AbstractActor, Proxy, Unit] with Serializable { def apply(target: AbstractActor, creator: Proxy) { target.exit(creator, reason) } override def toString = "<ExitFun>("+reason.toString+")" } private[remote] case class Apply0(rfun: Function2[AbstractActor, Proxy, Unit]) /** * @author Philipp Haller */ private[remote] class DelegateActor(creator: Proxy, node: Node, name: Symbol, kernel: NetKernel) extends Actor { var channelMap = new mutable.HashMap[Symbol, OutputChannel[Any]] var sessionMap = new mutable.HashMap[OutputChannel[_], Symbol] def act() { Actor.loop { react { case cmd@Apply0(rfun) => kernel.remoteApply(node, name, sender, rfun) case cmd@LocalApply0(rfun, target) => rfun(target, creator) // Request from remote proxy. // `this` is local proxy. case cmd@SendTo(out, msg, session) => if (session.name == "nosession") { // local send out.send(msg, this) } else { // is this an active session? channelMap.get(session) match { case None => // create a new reply channel... val replyCh = new Channel[Any](this) // ...that maps to session sessionMap(replyCh) = session // local send out.send(msg, replyCh) // finishes request-reply cycle case Some(replyCh) => channelMap -= session replyCh ! msg } } case cmd@Terminate => exit() // local proxy receives response to // reply channel case ch ! resp => // lookup session ID sessionMap.get(ch) match { case Some(sid) => sessionMap -= ch val msg = resp.asInstanceOf[AnyRef] // send back response kernel.forward(sender, node, name, msg, sid) case None => Debug.info(this+": cannot find session for "+ch) } // remote proxy receives request case msg: AnyRef => // find out whether it's a synchronous send if (sender.getClass.toString.contains("Channel")) { // create fresh session ID... val fresh = FreshNameCreator.newName(node+"@"+name) // ...that maps to reply channel channelMap(fresh) = sender kernel.forward(sender, node, name, msg, fresh) } else { kernel.forward(sender, node, name, msg, 'nosession) } } } } } Other Scala source code examplesHere is a short list of links related to this Scala Proxy.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.