alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ActorComponent.scala)

This example Akka source code file (ActorComponent.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorendpoint, actorendpointpath, actorref, akka, bean, boolean, camel, camelexchangeadapter, concurrent, failure, string, success, time, timeoutexception, utilities

The ActorComponent.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.camel.internal.component

import language.postfixOps
import java.util.{ Map ⇒ JMap }
import org.apache.camel._
import org.apache.camel.impl.{ DefaultProducer, DefaultEndpoint, DefaultComponent }
import akka.actor._
import akka.pattern._
import scala.beans.BeanProperty
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import java.util.concurrent.{ TimeUnit, TimeoutException, CountDownLatch }
import akka.util.Timeout
import akka.camel.internal.CamelExchangeAdapter
import akka.camel.{ ActorNotRegisteredException, Camel, Ack, FailureResult, CamelMessage }
import support.TypeConverterSupport
import scala.util.{ Failure, Success, Try }

/**
 * INTERNAL API
 * Creates Camel [[org.apache.camel.Endpoint]]s that send messages to [[akka.camel.Consumer]] actors through an [[akka.camel.internal.component.ActorProducer]].
 * The `ActorComponent` is a Camel [[org.apache.camel.Component]].
 *
 * Camel integrates with Akka through this component. The [[akka.camel.internal.DefaultCamel]] module adds the
 * `ActorComponent` to the [[org.apache.camel.CamelContext]] under the 'actor' component name.
 * Messages are sent to [[akka.camel.Consumer]] actors through a [[akka.camel.internal.component.ActorEndpoint]] that
 * this component provides.
 *
 *
 */
private[camel] class ActorComponent(camel: Camel, system: ActorSystem) extends DefaultComponent {
  /**
   * @see org.apache.camel.Component
   */
  def createEndpoint(uri: String, remaining: String, parameters: JMap[String, Object]): ActorEndpoint =
    new ActorEndpoint(uri, this, ActorEndpointPath.fromCamelPath(uri), camel)
}

/**
 * INTERNAL API
 * Does what an endpoint does, creates consumers and producers for the component. The `ActorEndpoint` is a Camel [[org.apache.camel.Endpoint]] that is used to
 * receive messages from Camel. Sending messages from the `ActorComponent` is not supported, a [[akka.camel.Producer]] actor should be used instead.
 *
 * The `ActorEndpoint`s are created by the [[akka.camel.internal.component.ActorComponent]].
 *
 * Actors are referenced using actor endpoint URIs of the following format:
 * <code>[actorPath]?[options]%s</code>,
 * where <code>[actorPath]</code> refers to the actor path to the actor.
 *
 *
 */
private[camel] class ActorEndpoint(uri: String,
                                   comp: ActorComponent,
                                   val path: ActorEndpointPath,
                                   val camel: Camel) extends DefaultEndpoint(uri, comp) with ActorEndpointConfig {

  /**
   * The ActorEndpoint only supports receiving messages from Camel.
   * The createProducer method (not to be confused with a producer actor) is used to send messages into the endpoint.
   * The ActorComponent is only there to send to actors registered through an actor endpoint URI.
   * You can use an actor as an endpoint to send to in a camel route (as in, a Camel Consumer Actor). so from(someuri) to (actoruri), but not 'the other way around'.
   * Supporting createConsumer would mean that messages are consumed from an Actor endpoint in a route, and an Actor is not necessarily a producer of messages.
   * [[akka.camel.Producer]] Actors can be used for sending messages to some other uri/ component type registered in Camel.
   * @throws UnsupportedOperationException this method is not supported
   */
  def createConsumer(processor: Processor): org.apache.camel.Consumer =
    throw new UnsupportedOperationException("actor consumer not supported yet")

  /**
   * Creates a new producer which is used send messages into the endpoint.
   *
   * @return a newly created producer
   */
  def createProducer: ActorProducer = new ActorProducer(this, camel)

  /**
   * Returns true.
   */
  def isSingleton: Boolean = true
}

/**
 * INTERNAL API
 * Configures the `ActorEndpoint`. This needs to be a `bean` for Camel purposes.
 *
 */
private[camel] trait ActorEndpointConfig {
  def path: ActorEndpointPath
  def camel: Camel

  @BeanProperty var replyTimeout: FiniteDuration = camel.settings.ReplyTimeout

  @BeanProperty var autoAck: Boolean = camel.settings.AutoAck
}

/**
 * Sends the in-message of an exchange to an untyped actor, identified by an [[akka.camel.internal.component.ActorEndPoint]]
 *
 * @see akka.camel.component.ActorComponent
 * @see akka.camel.component.ActorEndpoint
 *
 *
 */
private[camel] class ActorProducer(val endpoint: ActorEndpoint, camel: Camel) extends DefaultProducer(endpoint) with AsyncProcessor {
  /**
   * Processes the exchange.
   * Calls the synchronous version of the method and waits for the result (blocking).
   * @param exchange the exchange to process
   */
  def process(exchange: Exchange): Unit = processExchangeAdapter(new CamelExchangeAdapter(exchange))

  /**
   * Processes the message exchange. the caller supports having the exchange asynchronously processed.
   * If there was a failure processing then the caused Exception would be set on the Exchange.
   *
   * @param exchange the message exchange
   * @param callback the AsyncCallback will be invoked when the processing of the exchange is completed.
   *        If the exchange is completed synchronously, then the callback is also invoked synchronously.
   *        The callback should therefore be careful of starting recursive loop.
   * @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously
   */
  def process(exchange: Exchange, callback: AsyncCallback): Boolean = processExchangeAdapter(new CamelExchangeAdapter(exchange), callback)

  /**
   * INTERNAL API
   * Processes the [[akka.camel.internal.CamelExchangeAdapter]]
   * @param exchange the [[akka.camel.internal.CamelExchangeAdapter]]
   */
  private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter): Unit = {
    val isDone = new CountDownLatch(1)
    processExchangeAdapter(exchange, new AsyncCallback { def done(doneSync: Boolean) { isDone.countDown() } })
    isDone.await(endpoint.replyTimeout.length, endpoint.replyTimeout.unit)
  }

  /**
   * INTERNAL API
   * Processes the [[akka.camel.internal.CamelExchangeAdapter]].
   * This method is blocking when the exchange is inOnly. The method returns true if it executed synchronously/blocking.
   * @param exchange the [[akka.camel.internal.CamelExchangeAdapter]]
   * @param callback the callback
   * @return (doneSync) true to continue execute synchronously, false to continue being executed asynchronously
   */
  private[camel] def processExchangeAdapter(exchange: CamelExchangeAdapter, callback: AsyncCallback): Boolean = {
    if (!exchange.isOutCapable && endpoint.autoAck) {
      fireAndForget(messageFor(exchange), exchange)
      callback.done(true)
      true // done sync
    } else {
      val action: PartialFunction[Try[Any], Unit] =
        if (exchange.isOutCapable) {
          case Success(failure: FailureResult) ⇒ exchange.setFailure(failure)
          case Success(msg)                    ⇒ exchange.setResponse(CamelMessage.canonicalize(msg))
          case Failure(e: TimeoutException)    ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
          case Failure(throwable)              ⇒ exchange.setFailure(FailureResult(throwable))
        } else {
          case Success(Ack)                    ⇒ () /* no response message to set */
          case Success(failure: FailureResult) ⇒ exchange.setFailure(failure)
          case Success(msg)                    ⇒ exchange.setFailure(FailureResult(new IllegalArgumentException("Expected Ack or Failure message, but got: [%s] from actor [%s]" format (msg, endpoint.path))))
          case Failure(e: TimeoutException)    ⇒ exchange.setFailure(FailureResult(new TimeoutException("Failed to get Ack or Failure response from the actor [%s] within timeout [%s]. Check replyTimeout and blocking settings [%s]" format (endpoint.path, endpoint.replyTimeout, endpoint))))
          case Failure(throwable)              ⇒ exchange.setFailure(FailureResult(throwable))
        }

      // FIXME #3074 how do we solve this with actorSelection?
      val async = try actorFor(endpoint.path).ask(messageFor(exchange))(Timeout(endpoint.replyTimeout)) catch { case NonFatal(e) ⇒ Future.failed(e) }
      implicit val ec = camel.system.dispatcher // FIXME which ExecutionContext should be used here?
      async.onComplete(action andThen { _ ⇒ callback.done(false) })
      false
    }

  }

  // FIXME #3074 how do we solve this with actorSelection?
  private def fireAndForget(message: CamelMessage, exchange: CamelExchangeAdapter): Unit =
    try { actorFor(endpoint.path) ! message } catch { case NonFatal(e) ⇒ exchange.setFailure(new FailureResult(e)) }

  private[this] def actorFor(path: ActorEndpointPath): ActorRef =
    path.findActorIn(camel.system) getOrElse (throw new ActorNotRegisteredException(path.actorPath))

  private[this] def messageFor(exchange: CamelExchangeAdapter) =
    exchange.toRequestMessage(Map(CamelMessage.MessageExchangeId -> exchange.getExchangeId))
}

/**
 * INTERNAL API
 * Converts Strings to [[scala.concurrent.duration.Duration]]
 */
private[camel] object DurationTypeConverter extends TypeConverterSupport {

  @throws(classOf[TypeConversionException])
  def convertTo[T](valueType: Class[T], exchange: Exchange, value: AnyRef): T = valueType.cast(try {
    val d = Duration(value.toString)
    if (valueType.isInstance(d)) d else null
  } catch {
    case NonFatal(throwable) ⇒ throw new TypeConversionException(value, valueType, throwable)
  })
}

/**
 * INTERNAL API
 * An endpoint to an [[akka.actor.ActorRef]]
 * @param actorPath the String representation of the path to the actor
 */
private[camel] case class ActorEndpointPath private (actorPath: String) {
  import ActorEndpointPath._
  require(actorPath != null)
  require(actorPath.length() > 0)
  require(actorPath.startsWith("akka://"))

  def findActorIn(system: ActorSystem): Option[ActorRef] = {
    // FIXME #3074 how do we solve this with actorSelection?
    val ref = system.actorFor(actorPath)
    if (ref.isTerminated) None else Some(ref)
  }
}

/**
 * Converts ActorRefs and actorPaths to URI's that point to the actor through the Camel Actor Component.
 * Can also be used in the Java API as a helper for custom route builders. the Scala API has an implicit conversion in the camel package to
 * directly use `to(actorRef)`. In java you could use `to(CamelPath.toUri(actorRef)`.
 * The URI to the actor is exactly the same as the string representation of the ActorPath, except that it can also have optional URI parameters to configure the Consumer Actor.
 */
object CamelPath {
  /**
   * Converts the actorRef to a Camel URI (string) which can be used in custom routes.
   * The created URI will have no parameters, it is purely the string representation of the actor's path.
   * @param actorRef the actorRef
   * @return the Camel URI to the actor.
   */
  def toUri(actorRef: ActorRef): String = actorRef.path.toString

  /**
   * Converts the actorRef to a Camel URI (string) which can be used in custom routes.
   * Use this version of toUri when you know that the actorRef points to a Consumer Actor and you would like to
   * set autoAck and replyTimeout parameters to non-default values.
   *
   * @param actorRef the actorRef
   * @param autoAck parameter for a Consumer Actor, see [[akka.camel.ConsumerConfig]]
   * @param replyTimeout parameter for a Consumer Actor, see [[akka.camel.ConsumerConfig]]
   * @return the Camel URI to the Consumer Actor, including the parameters for auto acknowledgement and replyTimeout.
   */
  def toUri(actorRef: ActorRef, autoAck: Boolean, replyTimeout: Duration): String = "%s?autoAck=%s&replyTimeout=%s".format(actorRef.path.toString, autoAck, replyTimeout.toString)
}

/**
 * INTERNAL API
 * Companion of `ActorEndpointPath`
 */
private[camel] case object ActorEndpointPath {

  def apply(actorRef: ActorRef): ActorEndpointPath = new ActorEndpointPath(actorRef.path.toString)

  /**
   * Creates an [[akka.camel.internal.component.ActorEndpointPath]] from the uri
   * Expects the uri in the akka [[akka.actor.ActorPath]] format, i.e 'akka://system/user/someactor'.
   * parameters can be optionally added to the actor path to indicate auto-acknowledgement and replyTimeout for a [[akka.camel.Consumer]] actor.
   */
  def fromCamelPath(camelPath: String): ActorEndpointPath = camelPath match {
    case id if id startsWith "akka://" ⇒ new ActorEndpointPath(id.split('?')(0))
    case _                             ⇒ throw new IllegalArgumentException("Invalid path: [%s] - should be an actorPath starting with 'akka://', optionally followed by options" format camelPath)
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ActorComponent.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.