alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (TypedActor.scala)

This example Akka source code file (TypedActor.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

akka, anyref, array, atomvar, class, collection, concurrent, duration, option, props, r, t, time, typedprops, unit, utilities

The TypedActor.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.actor

import language.existentials

import scala.util.control.NonFatal
import scala.util.{ Try, Success, Failure }
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.concurrent.{ Await, Future }
import akka.japi.{ Creator, Option ⇒ JOption }
import akka.japi.Util.{ immutableSeq, immutableSingletonSeq }
import akka.util.Timeout
import akka.util.Reflect.instantiator
import akka.serialization.{ JavaSerializer, SerializationExtension }
import akka.dispatch._
import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar }
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.io.ObjectStreamException
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.pattern.AskTimeoutException

/**
 * A TypedActorFactory is something that can created TypedActor instances.
 */
trait TypedActorFactory {

  /**
   * Underlying dependency is to be able to create normal Actors
   */
  protected def actorFactory: ActorRefFactory

  /**
   * Underlying dependency to a TypedActorExtension, which can either be contextual or ActorSystem "global"
   */
  protected def typedActor: TypedActorExtension

  /**
   * Stops the underlying ActorRef for the supplied TypedActor proxy,
   * if any, returns whether it could find the find the ActorRef or not
   */
  def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
    case null ⇒ false
    case ref  ⇒ ref.asInstanceOf[InternalActorRef].stop; true
  }

  /**
   * Sends a PoisonPill the underlying ActorRef for the supplied TypedActor proxy,
   * if any, returns whether it could find the find the ActorRef or not
   */
  def poisonPill(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
    case null ⇒ false
    case ref  ⇒ ref ! PoisonPill; true
  }

  /**
   * Returns wether the supplied AnyRef is a TypedActor proxy or not
   */
  def isTypedActor(proxyOrNot: AnyRef): Boolean

  /**
   * Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
   */
  def getActorRefFor(proxy: AnyRef): ActorRef

  /**
   * Creates a new TypedActor with the specified properties
   */
  def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T]): R = {
    val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver
    val c = props.creator //Cache this to avoid closing over the Props
    val i = props.interfaces //Cache this to avoid closing over the Props
    val ap = Props(new TypedActor.TypedActor[R, T](proxyVar, c(), i)).withDeploy(props.actorProps.deploy)
    typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap))
  }

  /**
   * Creates a new TypedActor with the specified properties
   */
  def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], name: String): R = {
    val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver
    val c = props.creator //Cache this to avoid closing over the Props
    val i = props.interfaces //Cache this to avoid closing over the Props
    val ap = Props(new akka.actor.TypedActor.TypedActor[R, T](proxyVar, c(), i)).withDeploy(props.actorProps.deploy)
    typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap, name))
  }

  /**
   * Creates a TypedActor that intercepts the calls and forwards them as [[akka.actor.TypedActor.MethodCall]]
   * to the provided ActorRef.
   */
  def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], actorRef: ActorRef): R =
    typedActor.createActorRefProxy(props, null: AtomVar[R], actorRef)

}

/**
 * This represents the TypedActor Akka Extension, access to the functionality is done through a given ActorSystem.
 */
object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvider {
  override def get(system: ActorSystem): TypedActorExtension = super.get(system)

  def lookup() = this
  def createExtension(system: ExtendedActorSystem): TypedActorExtension = new TypedActorExtension(system)

  /**
   * Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension
   * will be children to the specified context, this allows for creating hierarchies of TypedActors.
   * Do _not_ let this instance escape the TypedActor since that will not be thread-safe.
   */
  def apply(context: ActorContext): TypedActorFactory = ContextualTypedActorFactory(apply(context.system), context)

  /**
   * Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension
   * will be children to the specified context, this allows for creating hierarchies of TypedActors.
   * Do _not_ let this instance escape the TypedActor since that will not be thread-safe.
   *
   * Java API
   */
  def get(context: ActorContext): TypedActorFactory = apply(context)

  /**
   * This class represents a Method call, and has a reference to the Method to be called and the parameters to supply
   * It's sent to the ActorRef backing the TypedActor and can be serialized and deserialized
   */
  final case class MethodCall(method: Method, parameters: Array[AnyRef]) {

    def isOneWay = method.getReturnType == java.lang.Void.TYPE
    def returnsFuture = classOf[Future[_]] isAssignableFrom method.getReturnType
    def returnsJOption = classOf[akka.japi.Option[_]] isAssignableFrom method.getReturnType
    def returnsOption = classOf[scala.Option[_]] isAssignableFrom method.getReturnType

    /**
     * Invokes the Method on the supplied instance
     *
     * @throws the underlying exception if there's an InvocationTargetException thrown on the invocation
     */
    def apply(instance: AnyRef): AnyRef = try {
      parameters match {
        case null                     ⇒ method.invoke(instance)
        case args if args.length == 0 ⇒ method.invoke(instance)
        case args                     ⇒ method.invoke(instance, args: _*)
      }
    } catch { case i: InvocationTargetException ⇒ throw i.getTargetException }

    @throws(classOf[ObjectStreamException]) private def writeReplace(): AnyRef = parameters match {
      case null                 ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null)
      case ps if ps.length == 0 ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array())
      case ps ⇒
        val serialization = SerializationExtension(akka.serialization.JavaSerializer.currentSystem.value)
        val serializedParameters = Array.ofDim[(Int, Class[_], Array[Byte])](ps.length)
        for (i ← 0 until ps.length) {
          val p = ps(i)
          val s = serialization.findSerializerFor(p)
          val m = if (s.includeManifest) p.getClass else null
          serializedParameters(i) = (s.identifier, m, s toBinary parameters(i)) //Mutable for the sake of sanity
        }

        SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, serializedParameters)
    }
  }

  /**
   * INTERNAL API
   *
   * Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
   */
  private[akka] final case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Int, Class[_], Array[Byte])]) {

    //TODO implement writeObject and readObject to serialize
    //TODO Possible optimization is to special encode the parameter-types to conserve space
    @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = {
      val system = akka.serialization.JavaSerializer.currentSystem.value
      if (system eq null) throw new IllegalStateException(
        "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
          " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
      val serialization = SerializationExtension(system)
      MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
        case null               ⇒ null
        case a if a.length == 0 ⇒ Array[AnyRef]()
        case a ⇒
          val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
          for (i ← 0 until a.length) {
            val (sId, manifest, bytes) = a(i)
            deserializedParameters(i) =
              serialization.serializerByIdentity(sId).fromBinary(bytes, Option(manifest))
          }

          deserializedParameters
      })
    }
  }

  private val selfReference = new ThreadLocal[AnyRef]
  private val currentContext = new ThreadLocal[ActorContext]

  @SerialVersionUID(1L)
  private case object NullResponse

  /**
   * Returns the reference to the proxy when called inside a method call in a TypedActor
   *
   * Example:
   * <p/>
   * class FooImpl extends Foo {
   *   def doFoo {
   *     val myself = TypedActor.self[Foo]
   *   }
   * }
   *
   * Useful when you want to send a reference to this TypedActor to someone else.
   *
   * NEVER EXPOSE "this" to someone else, always use "self[TypeOfInterface(s)]"
   *
   * @throws IllegalStateException if called outside of the scope of a method on this TypedActor
   * @throws ClassCastException if the supplied type T isn't the type of the proxy associated with this TypedActor
   */
  def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] match {
    case null ⇒ throw new IllegalStateException("Calling TypedActor.self outside of a TypedActor implementation method!")
    case some ⇒ some
  }

  /**
   * Returns the ActorContext (for a TypedActor) when inside a method call in a TypedActor.
   */
  def context: ActorContext = currentContext.get match {
    case null ⇒ throw new IllegalStateException("Calling TypedActor.context outside of a TypedActor implementation method!")
    case some ⇒ some
  }

  /**
   * Returns the default dispatcher (for a TypedActor) when inside a method call in a TypedActor.
   */
  implicit def dispatcher = context.dispatcher

  /**
   * INTERNAL API
   *
   * Implementation of TypedActor as an Actor
   */
  private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T, interfaces: immutable.Seq[Class[_]]) extends Actor {
    // if we were remote deployed we need to create a local proxy
    if (!context.parent.asInstanceOf[InternalActorRef].isLocal)
      TypedActor.get(context.system).createActorRefProxy(
        TypedProps(interfaces, createInstance), proxyVar, context.self)

    private val me = withContext[T](createInstance)

    override def supervisorStrategy: SupervisorStrategy = me match {
      case l: Supervisor ⇒ l.supervisorStrategy
      case _             ⇒ super.supervisorStrategy
    }

    override def preStart(): Unit = withContext {
      me match {
        case l: PreStart ⇒ l.preStart()
        case _           ⇒ super.preStart()
      }
    }

    override def postStop(): Unit = try {
      withContext {
        me match {
          case l: PostStop ⇒ l.postStop()
          case _           ⇒ super.postStop()
        }
      }
    } finally {
      TypedActor(context.system).invocationHandlerFor(proxyVar.get) match {
        case null ⇒
        case some ⇒
          some.actorVar.set(context.system.deadLetters) //Point it to the DLQ
          proxyVar.set(null.asInstanceOf[R])
      }
    }

    override def preRestart(reason: Throwable, message: Option[Any]): Unit = withContext {
      me match {
        case l: PreRestart ⇒ l.preRestart(reason, message)
        case _             ⇒ context.children foreach context.stop //Can't be super.preRestart(reason, message) since that would invoke postStop which would set the actorVar to DL and proxyVar to null
      }
    }

    override def postRestart(reason: Throwable): Unit = withContext {
      me match {
        case l: PostRestart ⇒ l.postRestart(reason)
        case _              ⇒ super.postRestart(reason)
      }
    }

    protected def withContext[T](unitOfWork: ⇒ T): T = {
      TypedActor.selfReference set proxyVar.get
      TypedActor.currentContext set context
      try unitOfWork finally {
        TypedActor.selfReference set null
        TypedActor.currentContext set null
      }
    }

    def receive = {
      case m: MethodCall ⇒ withContext {
        if (m.isOneWay) m(me)
        else {
          try {
            val s = sender()
            m(me) match {
              case f: Future[_] if m.returnsFuture ⇒
                implicit val dispatcher = context.dispatcher
                f onComplete {
                  case Success(null)   ⇒ s ! NullResponse
                  case Success(result) ⇒ s ! result
                  case Failure(f)      ⇒ s ! Status.Failure(f)
                }
              case null   ⇒ s ! NullResponse
              case result ⇒ s ! result
            }
          } catch {
            case NonFatal(e) ⇒
              sender() ! Status.Failure(e)
              throw e
          }
        }
      }

      case msg if me.isInstanceOf[Receiver] ⇒ withContext {
        me.asInstanceOf[Receiver].onReceive(msg, sender())
      }
    }
  }

  /**
   * Mix this into your TypedActor to be able to define supervisor strategy
   */
  trait Supervisor {
    /**
     * User overridable definition the strategy to use for supervising
     * child actors.
     */
    def supervisorStrategy(): SupervisorStrategy
  }

  /**
   * Mix this into your TypedActor to be able to intercept Terminated messages
   */
  trait Receiver {
    def onReceive(message: Any, sender: ActorRef): Unit
  }

  /**
   * Mix this into your TypedActor to be able to hook into its lifecycle
   */
  trait PreStart {
    /**
     * User overridable callback.
     * <p/>
     * Is called when an Actor is started by invoking 'actor'.
     */
    def preStart(): Unit
  }

  /**
   * Mix this into your TypedActor to be able to hook into its lifecycle
   */
  trait PostStop {
    /**
     * User overridable callback.
     * <p/>
     * Is called when 'actor.stop()' is invoked.
     */
    def postStop(): Unit
  }

  /**
   * Mix this into your TypedActor to be able to hook into its lifecycle
   */
  trait PreRestart {
    /**
     * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''
     * @param reason the Throwable that caused the restart to happen
     * @param message optionally the current message the actor processed when failing, if applicable
     * <p/>
     * Is called on a crashed Actor right BEFORE it is restarted to allow clean
     * up of resources before Actor is terminated.
     * By default it terminates all children and calls postStop()
     */
    def preRestart(reason: Throwable, message: Option[Any]): Unit
  }

  trait PostRestart {
    /**
     * User overridable callback: By default it calls `preStart()`.
     * @param reason the Throwable that caused the restart to happen
     * <p/>
     * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
     */
    def postRestart(reason: Throwable): Unit
  }

  /**
   * INTERNAL API
   */
  private[akka] class TypedActorInvocationHandler(@transient val extension: TypedActorExtension, @transient val actorVar: AtomVar[ActorRef], @transient val timeout: Timeout) extends InvocationHandler with Serializable {

    def actor = actorVar.get
    @throws(classOf[Throwable])
    def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
      case "toString" ⇒ actor.toString
      case "equals"   ⇒ (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
      case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef]
      case _ ⇒
        implicit val dispatcher = extension.system.dispatcher
        import akka.pattern.ask
        MethodCall(method, args) match {
          case m if m.isOneWay ⇒
            actor ! m; null //Null return value
          case m if m.returnsFuture ⇒ ask(actor, m)(timeout) map {
            case NullResponse ⇒ null
            case other        ⇒ other
          }
          case m if m.returnsJOption || m.returnsOption ⇒
            val f = ask(actor, m)(timeout)
            (try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException ⇒ None }) match {
              case None | Some(Success(NullResponse)) | Some(Failure(_: AskTimeoutException)) ⇒
                if (m.returnsJOption) JOption.none[Any] else None
              case Some(t: Try[_]) ⇒
                t.get.asInstanceOf[AnyRef]
            }
          case m ⇒ Await.result(ask(actor, m)(timeout), timeout.duration) match {
            case NullResponse ⇒ null
            case other        ⇒ other.asInstanceOf[AnyRef]
          }
        }
    }
    @throws(classOf[ObjectStreamException]) private def writeReplace(): AnyRef = SerializedTypedActorInvocationHandler(actor, timeout.duration)
  }

  /**
   * INTERNAL API
   */
  private[akka] final case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) {
    @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match {
      case null ⇒ throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value")
      case some ⇒ toTypedActorInvocationHandler(some)
    }

    def toTypedActorInvocationHandler(system: ActorSystem): TypedActorInvocationHandler =
      new TypedActorInvocationHandler(TypedActor(system), new AtomVar[ActorRef](actor), new Timeout(timeout))
  }
}

/**
 * TypedProps is a TypedActor configuration object, that is thread safe and fully sharable.
 * It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance.
 */
object TypedProps {

  val defaultDispatcherId: String = Dispatchers.DefaultDispatcherId
  val defaultTimeout: Option[Timeout] = None
  val defaultLoader: Option[ClassLoader] = None

  /**
   * @return a sequence of interfaces that the specified class implements,
   * or a sequence containing only itself, if itself is an interface.
   */
  def extractInterfaces(clazz: Class[_]): immutable.Seq[Class[_]] =
    if (clazz.isInterface) immutableSingletonSeq(clazz) else immutableSeq(clazz.getInterfaces)

  /**
   * Uses the supplied class as the factory for the TypedActor implementation,
   * proxying all the interfaces it implements.
   *
   * Scala API
   */
  def apply[T <: AnyRef](implementation: Class[T]): TypedProps[T] =
    new TypedProps[T](implementation)

  /**
   * Uses the supplied class as the factory for the TypedActor implementation,
   * and that has the specified interface,
   * or if the interface class is not an interface, all the interfaces it implements,
   * appended in the sequence of interfaces.
   *
   * Scala API
   */
  def apply[T <: AnyRef](interface: Class[_ >: T], implementation: Class[T]): TypedProps[T] =
    new TypedProps[T](extractInterfaces(interface), instantiator(implementation))

  /**
   * Uses the supplied thunk as the factory for the TypedActor implementation,
   * and that has the specified interface,
   * or if the interface class is not an interface, all the interfaces it implements,
   * appended in the sequence of interfaces.
   *
   * Scala API
   */
  def apply[T <: AnyRef](interface: Class[_ >: T], creator: ⇒ T): TypedProps[T] =
    new TypedProps[T](extractInterfaces(interface), () ⇒ creator)

  /**
   * Uses the supplied class as the factory for the TypedActor implementation,
   * proxying all the interfaces it implements.
   *
   * Scala API
   */
  def apply[T <: AnyRef: ClassTag](): TypedProps[T] =
    new TypedProps[T](implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])

  /**
   * INTERNAL API
   */
  private[akka] def apply[T <: AnyRef](interfaces: immutable.Seq[Class[_]], creator: ⇒ T): TypedProps[T] =
    new TypedProps[T](interfaces, () ⇒ creator)
}

/**
 * TypedProps is a TypedActor configuration object, that is thread safe and fully sharable.
 * It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance.
 */
@SerialVersionUID(1L)
final case class TypedProps[T <: AnyRef] protected[TypedProps] (
  interfaces: immutable.Seq[Class[_]],
  creator: () ⇒ T,
  dispatcher: String = TypedProps.defaultDispatcherId,
  deploy: Deploy = Props.defaultDeploy,
  timeout: Option[Timeout] = TypedProps.defaultTimeout,
  loader: Option[ClassLoader] = TypedProps.defaultLoader) {

  /**
   * Uses the supplied class as the factory for the TypedActor implementation,
   * and that has the specified interface,
   * or if the interface class is not an interface, all the interfaces it implements,
   * appended in the sequence of interfaces.
   */
  def this(implementation: Class[T]) =
    this(interfaces = TypedProps.extractInterfaces(implementation),
      creator = instantiator(implementation))

  /**
   * Java API: Uses the supplied Creator as the factory for the TypedActor implementation,
   * and that has the specified interface,
   * or if the interface class is not an interface, all the interfaces it implements,
   * appended in the sequence of interfaces.
   */
  def this(interface: Class[_ >: T], implementation: Creator[T]) =
    this(interfaces = TypedProps.extractInterfaces(interface),
      creator = implementation.create _)

  /**
   * Java API: Uses the supplied class as the factory for the TypedActor implementation,
   * and that has the specified interface,
   * or if the interface class is not an interface, all the interfaces it implements,
   * appended in the sequence of interfaces.
   */
  def this(interface: Class[_ >: T], implementation: Class[T]) =
    this(interfaces = TypedProps.extractInterfaces(interface),
      creator = instantiator(implementation))

  /**
   * Returns a new TypedProps with the specified dispatcher set.
   */
  def withDispatcher(d: String): TypedProps[T] = copy(dispatcher = d)

  /**
   * Returns a new TypedProps with the specified deployment configuration.
   */
  def withDeploy(d: Deploy): TypedProps[T] = copy(deploy = d)

  /**
   * Java API: return a new TypedProps that will use the specified ClassLoader to create its proxy class in
   * If loader is null, it will use the bootstrap classloader.
   */
  def withLoader(loader: ClassLoader): TypedProps[T] = withLoader(Option(loader))

  /**
   * Scala API: return a new TypedProps that will use the specified ClassLoader to create its proxy class in
   * If loader is null, it will use the bootstrap classloader.
   *
   * Scala API
   */
  def withLoader(loader: Option[ClassLoader]): TypedProps[T] = this.copy(loader = loader)

  /**
   * Java API: return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
   * if null is specified, it will use the default timeout as specified in the configuration.
   */
  def withTimeout(timeout: Timeout): TypedProps[T] = this.copy(timeout = Option(timeout))

  /**
   * Scala API: return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
   * if None is specified, it will use the default timeout as specified in the configuration.
   *
   *
   */
  def withTimeout(timeout: Option[Timeout]): TypedProps[T] = this.copy(timeout = timeout)

  /**
   * Returns a new TypedProps that has the specified interface,
   * or if the interface class is not an interface, all the interfaces it implements,
   * appended in the sequence of interfaces.
   */
  def withInterface(interface: Class[_ >: T]): TypedProps[T] =
    this.copy(interfaces = interfaces ++ TypedProps.extractInterfaces(interface))

  /**
   * Returns a new TypedProps without the specified interface,
   * or if the interface class is not an interface, all the interfaces it implements.
   */
  def withoutInterface(interface: Class[_ >: T]): TypedProps[T] =
    this.copy(interfaces = interfaces diff TypedProps.extractInterfaces(interface))

  /**
   * Returns the akka.actor.Props representation of this TypedProps
   */
  def actorProps(): Props =
    if (dispatcher == Props.default.dispatcher)
      Props.default.withDeploy(deploy)
    else Props.default.withDispatcher(dispatcher).withDeploy(deploy)
}

/**
 * ContextualTypedActorFactory allows TypedActors to create children, effectively forming the same Actor Supervision Hierarchies
 * as normal Actors can.
 */
final case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory {
  override def getActorRefFor(proxy: AnyRef): ActorRef = typedActor.getActorRefFor(proxy)
  override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot)
}

class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFactory with Extension {
  import TypedActor._ //Import the goodies from the companion object
  protected def actorFactory: ActorRefFactory = system
  protected def typedActor = this

  import system.settings
  import akka.util.Helpers.ConfigOps

  /**
   * Default timeout for typed actor methods with non-void return type
   */
  final val DefaultReturnTimeout = Timeout(settings.config.getMillisDuration("akka.actor.typed.timeout"))

  /**
   * Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
   */
  def getActorRefFor(proxy: AnyRef): ActorRef = invocationHandlerFor(proxy) match {
    case null    ⇒ null
    case handler ⇒ handler.actor
  }

  /**
   * Returns wether the supplied AnyRef is a TypedActor proxy or not
   */
  def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null

  // Private API
  /**
   * INTERNAL API
   */
  private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ⇒ ActorRef): R = {
    //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
    val actorVar = new AtomVar[ActorRef](null)
    val proxy = Proxy.newProxyInstance(
      (props.loader orElse props.interfaces.collectFirst { case any ⇒ any.getClassLoader }).orNull, //If we have no loader, we arbitrarily take the loader of the first interface
      props.interfaces.toArray,
      new TypedActorInvocationHandler(this, actorVar, props.timeout getOrElse DefaultReturnTimeout)).asInstanceOf[R]

    if (proxyVar eq null) {
      actorVar set actorRef
      proxy
    } else {
      proxyVar set proxy // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
      actorVar set actorRef //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
      proxyVar.get
    }
  }

  /**
   * INTERNAL API
   */
  private[akka] def invocationHandlerFor(typedActor: AnyRef): TypedActorInvocationHandler =
    if ((typedActor ne null) && classOf[Proxy].isAssignableFrom(typedActor.getClass) && Proxy.isProxyClass(typedActor.getClass)) typedActor match {
      case null ⇒ null
      case other ⇒ Proxy.getInvocationHandler(other) match {
        case null                                 ⇒ null
        case handler: TypedActorInvocationHandler ⇒ handler
        case _                                    ⇒ null
      }
    }
    else null
}

Other Akka source code examples

Here is a short list of links related to this Akka TypedActor.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.