alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Producer.scala)

This example Akka source code file (Producer.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, any, boolean, camel, endpoint, failureresult, messageresult, noserializationverificationneeded, producer, producersupport, unit

The Producer.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.camel

import akka.actor.{ Props, NoSerializationVerificationNeeded, ActorRef, Actor }
import internal.CamelSupervisor.{ CamelProducerObjects, Register }
import internal.CamelExchangeAdapter
import akka.actor.Status.Failure
import org.apache.camel.{ Endpoint, ExchangePattern, AsyncCallback }
import org.apache.camel.processor.SendProcessor

/**
 * Support trait for producing messages to Camel endpoints.
 */
trait ProducerSupport extends Actor with CamelSupport {
  private[this] var messages = Vector.empty[(ActorRef, Any)]
  private[this] var producerChild: Option[ActorRef] = None

  override def preStart() {
    super.preStart()
    register()
  }

  private[this] def register() { camel.supervisor ! Register(self, endpointUri) }

  /**
   * CamelMessage headers to copy by default from request message to response-message.
   */
  private val headersToCopyDefault: Set[String] = Set(CamelMessage.MessageExchangeId)

  /**
   * If set to false (default), this producer expects a response message from the Camel endpoint.
   * If set to true, this producer initiates an in-only message exchange with the Camel endpoint
   * (fire and forget).
   */
  def oneway: Boolean = false

  /**
   * Returns the Camel endpoint URI to produce messages to.
   */
  def endpointUri: String

  /**
   * Returns the names of message headers to copy from a request message to a response message.
   * By default only the CamelMessage.MessageExchangeId is copied. Applications may override this to
   * define an application-specific set of message headers to copy.
   */
  def headersToCopy: Set[String] = headersToCopyDefault

  /**
   * Produces <code>msg</code> to the endpoint specified by <code>endpointUri</code>. Before the message is
   * actually sent it is pre-processed by calling <code>receiveBeforeProduce</code>. If <code>oneway</code>
   * is <code>true</code>, an in-only message exchange is initiated, otherwise an in-out message exchange.
   *
   * @see Producer#produce(Any, ExchangePattern)
   */
  protected def produce: Receive = {
    case CamelProducerObjects(endpoint, processor) ⇒
      if (producerChild.isEmpty) {
        producerChild = Some(context.actorOf(Props(new ProducerChild(endpoint, processor))))
        messages = {
          for (
            child ← producerChild;
            (snd, msg) ← messages
          ) child.tell(transformOutgoingMessage(msg), snd)
          Vector.empty
        }
      }
    case res: MessageResult ⇒ routeResponse(res.message)
    case res: FailureResult ⇒
      val e = new AkkaCamelException(res.cause, res.headers)
      routeResponse(Failure(e))
      throw e

    case msg ⇒
      producerChild match {
        case Some(child) ⇒ child forward transformOutgoingMessage(msg)
        case None        ⇒ messages :+= ((sender(), msg))
      }
  }

  /**
   * Called before the message is sent to the endpoint specified by <code>endpointUri</code>. The original
   * message is passed as argument. By default, this method simply returns the argument but may be overridden
   * by subtraits or subclasses.
   */
  protected def transformOutgoingMessage(msg: Any): Any = msg

  /**
   * Called before the response message is sent to the original sender. The original
   * message is passed as argument. By default, this method simply returns the argument but may be overridden
   * by subtraits or subclasses.
   */
  protected def transformResponse(msg: Any): Any = msg

  /**
   * Called after a response was received from the endpoint specified by <code>endpointUri</code>. The
   * response is passed as argument. By default, this method sends the response back to the original sender
   * if <code>oneway</code> is <code>false</code>. If <code>oneway</code> is <code>true</code>, nothing is
   * done. This method may be overridden by subtraits or subclasses (e.g. to forward responses to another
   * actor).
   */

  protected def routeResponse(msg: Any): Unit = if (!oneway) sender() ! transformResponse(msg)

  private class ProducerChild(endpoint: Endpoint, processor: SendProcessor) extends Actor {
    def receive = {
      case msg @ (_: FailureResult | _: MessageResult) ⇒ context.parent forward msg
      case msg                                         ⇒ produce(endpoint, processor, msg, if (oneway) ExchangePattern.InOnly else ExchangePattern.InOut)
    }
    /**
     * Initiates a message exchange of given <code>pattern</code> with the endpoint specified by
     * <code>endpointUri</code>. The in-message of the initiated exchange is the canonical form
     * of <code>msg</code>. After sending the in-message, the processing result (response) is passed
     * as argument to <code>receiveAfterProduce</code>. If the response is received synchronously from
     * the endpoint then <code>receiveAfterProduce</code> is called synchronously as well. If the
     * response is received asynchronously, the <code>receiveAfterProduce</code> is called
     * asynchronously. The original sender is preserved.
     *
     * @see CamelMessage#canonicalize(Any)
     * @param endpoint the endpoint
     * @param processor the processor
     * @param msg message to produce
     * @param pattern exchange pattern
     */
    protected def produce(endpoint: Endpoint, processor: SendProcessor, msg: Any, pattern: ExchangePattern): Unit = {
      // Need copies of sender reference here since the callback could be done
      // later by another thread.
      val producer = self
      val originalSender = sender()
      val xchg = new CamelExchangeAdapter(endpoint.createExchange(pattern))
      val cmsg = CamelMessage.canonicalize(msg)
      xchg.setRequest(cmsg)

      processor.process(xchg.exchange, new AsyncCallback {
        // Ignoring doneSync, sending back async uniformly.
        def done(doneSync: Boolean): Unit = producer.tell(
          if (xchg.exchange.isFailed) xchg.toFailureResult(cmsg.headers(headersToCopy))
          else MessageResult(xchg.toResponseMessage(cmsg.headers(headersToCopy))), originalSender)
      })
    }
  }
}
/**
 * Mixed in by Actor implementations to produce messages to Camel endpoints.
 */
trait Producer extends ProducerSupport { this: Actor ⇒

  /**
   * Implementation of Actor.receive. Any messages received by this actor
   * will be produced to the endpoint specified by <code>endpointUri</code>.
   */
  final def receive: Actor.Receive = produce
}

/**
 * INTERNAL API
 */
private final case class MessageResult(message: CamelMessage) extends NoSerializationVerificationNeeded

/**
 * INTERNAL API
 */
private final case class FailureResult(cause: Throwable, headers: Map[String, Any] = Map.empty) extends NoSerializationVerificationNeeded

/**
 * A one-way producer.
 *
 *
 */
trait Oneway extends Producer { this: Actor ⇒
  override def oneway: Boolean = true
}

Other Akka source code examples

Here is a short list of links related to this Akka Producer.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.