alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ProducerFeatureTest.scala)

This example Akka source code file (ProducerFeatureTest.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, akkacamelexception, any, camelmessage, concurrent, duration, failure, map, producer, string, test, testforwarder, testing, testproducer, time

The ProducerFeatureTest.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.camel

import language.postfixOps

import org.apache.camel.{ Exchange, Processor }
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.component.mock.MockEndpoint
import scala.concurrent.Await
import akka.camel.TestSupport.SharedCamelSystem
import akka.actor.SupervisorStrategy.Stop
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpecLike }
import akka.actor._
import akka.pattern._
import scala.concurrent.duration._
import akka.util.Timeout
import org.scalatest.Matchers
import akka.testkit._
import akka.actor.Status.Failure

/**
 * Tests the features of the Camel Producer.
 */
class ProducerFeatureTest extends TestKit(ActorSystem("ProducerFeatureTest", AkkaSpec.testConf)) with WordSpecLike with BeforeAndAfterAll with BeforeAndAfterEach with Matchers {

  import ProducerFeatureTest._
  implicit def camel = CamelExtension(system)

  override protected def afterAll() {
    super.afterAll()
    shutdown()
  }

  val camelContext = camel.context
  // to make testing equality of messages easier, otherwise the breadcrumb shows up in the result.
  camelContext.setUseBreadcrumb(false)
  val timeoutDuration = 1 second
  implicit val timeout = Timeout(timeoutDuration)
  override protected def beforeAll { camelContext.addRoutes(new TestRoute(system)) }

  override protected def afterEach { mockEndpoint.reset() }

  "A Producer on a sync Camel route" must {

    "01 produce a message and receive normal response" in {
      val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "01-direct-producer-2")
      val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
      producer.tell(message, testActor)
      expectMsg(CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123")))
    }

    "02 produce a message and receive failure response" in {
      val latch = TestLatch()
      var deadActor: Option[ActorRef] = None
      val supervisor = system.actorOf(Props(new Actor {
        def receive = {
          case p: Props ⇒ {
            val producer = context.actorOf(p)
            context.watch(producer)
            sender() ! producer
          }
          case Terminated(actorRef) ⇒ {
            deadActor = Some(actorRef)
            latch.countDown()
          }
        }
        override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
          case _: AkkaCamelException ⇒ Stop
        }
      }), name = "02-prod-anonymous-supervisor")

      supervisor.tell(Props(new TestProducer("direct:producer-test-2")), testActor)
      val producer = receiveOne(timeoutDuration).asInstanceOf[ActorRef]
      val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
      filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
        producer.tell(message, testActor)
        expectMsgPF(timeoutDuration) {
          case Failure(e: AkkaCamelException) ⇒
            e.getMessage should be("failure")
            e.headers should be(Map(CamelMessage.MessageExchangeId -> "123"))
        }
      }
      Await.ready(latch, timeoutDuration)
      deadActor should be(Some(producer))
    }

    "03 produce a message oneway" in {
      val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "03-direct-producer-1-oneway")
      mockEndpoint.expectedBodiesReceived("TEST")
      producer ! CamelMessage("test", Map())
      mockEndpoint.assertIsSatisfied()
    }

    "04 produces message twoway without sender reference" in {
      // this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used
      // to communicate with it and the response is ignored, which ends up in a dead letter
      val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "04-ignore-this-deadletter-direct-producer-test-no-sender")
      mockEndpoint.expectedBodiesReceived("test")
      producer ! CamelMessage("test", Map())
      mockEndpoint.assertIsSatisfied()
    }
  }

  "A Producer on an async Camel route" must {

    "10 produce message to direct:producer-test-3 and receive normal response" in {
      val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "10-direct-producer-test-3")
      val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
      producer.tell(message, testActor)
      expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123")))
    }

    "11 produce message to direct:producer-test-3 and receive failure response" in {
      val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "11-direct-producer-test-3-receive-failure")
      val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))

      filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
        producer.tell(message, testActor)
        expectMsgPF(timeoutDuration) {
          case Failure(e: AkkaCamelException) ⇒
            e.getMessage should be("failure")
            e.headers should be(Map(CamelMessage.MessageExchangeId -> "123"))
        }
      }
    }

    "12 produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in {
      val target = system.actorOf(Props[ReplyingForwardTarget], name = "12-reply-forwarding-target")
      val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "12-direct-producer-test-2-forwarder")
      val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))
      producer.tell(message, testActor)
      expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")))
    }

    "13 produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in {
      val target = system.actorOf(Props[ReplyingForwardTarget], name = "13-reply-forwarding-target")
      val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "13-direct-producer-test-2-forwarder-failure")
      val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))

      filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
        producer.tell(message, testActor)
        expectMsgPF(timeoutDuration) {
          case Failure(e: AkkaCamelException) ⇒
            e.getMessage should be("failure")
            e.headers should be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
        }
      }
    }

    "14 produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in {
      val target = system.actorOf(Props[ProducingForwardTarget], name = "14-producer-forwarding-target")
      val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "14-direct-producer-test-2-forwarder-to-producing-target")
      mockEndpoint.expectedBodiesReceived("received test")
      producer.tell(CamelMessage("test", Map()), producer)
      mockEndpoint.assertIsSatisfied()
    }

    "15 produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in {
      val target = system.actorOf(Props[ProducingForwardTarget], name = "15-producer-forwarding-target-failure")
      val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "15-direct-producer-test-2-forward-failure")
      filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
        mockEndpoint.expectedMessageCount(1)
        mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
        producer.tell(CamelMessage("fail", Map()), producer)
        mockEndpoint.assertIsSatisfied()
      }
    }

    "16 produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in {
      val target = system.actorOf(Props[ReplyingForwardTarget], name = "16-reply-forwarding-target")
      val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "16-direct-producer-test-3-to-replying-actor")
      val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123"))

      producer.tell(message, testActor)
      expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result")))
    }

    "17 produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in {
      val target = system.actorOf(Props[ReplyingForwardTarget], name = "17-reply-forwarding-target")
      val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "17-direct-producer-test-3-forward-failure")

      val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123"))
      filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
        producer.tell(message, testActor)
        expectMsgPF(timeoutDuration) {
          case Failure(e: AkkaCamelException) ⇒
            e.getMessage should be("failure")
            e.headers should be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure"))
        }
      }
    }

    "18 produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
      val target = system.actorOf(Props[ProducingForwardTarget], "18-producing-forward-target-normal")
      val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "18-direct-producer-test-3-forward-normal")
      mockEndpoint.expectedBodiesReceived("received test")
      producer.tell(CamelMessage("test", Map()), producer)
      mockEndpoint.assertIsSatisfied()
    }

    "19 produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in {
      val target = system.actorOf(Props[ProducingForwardTarget], "19-producing-forward-target-failure")
      val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "19-direct-producer-test-3-forward-failure-producing-target")
      filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
        mockEndpoint.expectedMessageCount(1)
        mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure])
        producer.tell(CamelMessage("fail", Map()), producer)
        mockEndpoint.assertIsSatisfied()
      }
    }

    "20 keep producing messages after error" in {
      import TestSupport._
      val consumer = start(new IntermittentErrorConsumer("direct:intermittentTest-1"), "20-intermittentTest-error-consumer")
      val producer = start(new SimpleProducer("direct:intermittentTest-1"), "20-intermittentTest-producer")
      filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) {
        val futureFailed = producer.tell("fail", testActor)
        expectMsgPF(timeoutDuration) {
          case Failure(e) ⇒
            e.getMessage should be("fail")
        }
        producer.tell("OK", testActor)
        expectMsg("OK")
      }
      stop(consumer)
      stop(producer)
    }

    "21 be able to transform outgoing messages and have a valid sender reference" in {
      import TestSupport._
      filterEvents(EventFilter[Exception](occurrences = 1)) {
        val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "21-ignore-deadletter-sender-ref-test")
        mockEndpoint.reset()
        producerSupervisor.tell(CamelMessage("test", Map()), testActor)
        producerSupervisor.tell(CamelMessage("err", Map()), testActor)
        mockEndpoint.expectedMessageCount(1)
        mockEndpoint.expectedBodiesReceived("TEST")
        expectMsg("TEST")
      }
    }
  }

  private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint])
}

object ProducerFeatureTest {

  class ProducerSupervisor(childProps: Props) extends Actor {
    override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
    val child = context.actorOf(childProps, "producer-supervisor-child")
    val duration = 10 seconds
    implicit val timeout = Timeout(duration)
    implicit val ec = context.system.dispatcher
    Await.ready(CamelExtension(context.system).activationFutureFor(child), timeout.duration)
    def receive = {
      case msg: CamelMessage ⇒
        child forward (msg)
      case (aref: ActorRef, msg: String) ⇒
        aref ! msg
    }
  }

  class ChildProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
    override def oneway = true

    var lastSender: Option[ActorRef] = None
    var lastMessage: Option[String] = None
    def endpointUri = uri

    override def transformOutgoingMessage(msg: Any) = msg match {
      case msg: CamelMessage ⇒ if (upper) msg.mapBody {
        body: String ⇒
          if (body == "err") throw new Exception("Crash!")
          val upperMsg = body.toUpperCase
          lastSender = Some(sender())
          lastMessage = Some(upperMsg)
      }
      else msg
    }

    override def postStop() {
      for (msg ← lastMessage; aref ← lastSender) context.parent ! ((aref, msg))
      super.postStop()
    }
  }

  class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
    def endpointUri = uri

    override def preRestart(reason: Throwable, message: Option[Any]) {
      //overriding on purpose so it doesn't try to deRegister and reRegister at restart,
      // which would cause a deadletter message in the test output.
    }

    override protected def transformOutgoingMessage(msg: Any) = msg match {
      case msg: CamelMessage ⇒ if (upper) msg.mapBody {
        body: String ⇒ body.toUpperCase
      }
      else msg
    }
  }

  class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer {
    def endpointUri = uri

    override def headersToCopy = Set(CamelMessage.MessageExchangeId, "test")

    override def routeResponse(msg: Any): Unit = target forward msg
  }

  class TestResponder extends Actor {
    def receive = {
      case msg: CamelMessage ⇒ msg.body match {
        case "fail" ⇒ context.sender() ! akka.actor.Status.Failure(new AkkaCamelException(new Exception("failure"), msg.headers))
        case _ ⇒
          context.sender() ! (msg.mapBody {
            body: String ⇒ "received %s" format body
          })
      }
    }
  }

  class ReplyingForwardTarget extends Actor {
    def receive = {
      case msg: CamelMessage ⇒
        context.sender() ! (msg.copy(headers = msg.headers + ("test" -> "result")))
      case msg: akka.actor.Status.Failure ⇒
        msg.cause match {
          case e: AkkaCamelException ⇒ context.sender() ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure")))
        }
    }
  }

  class ProducingForwardTarget extends Actor with Producer with Oneway {
    def endpointUri = "direct:forward-test-1"
  }

  class TestRoute(system: ActorSystem) extends RouteBuilder {
    val responder = system.actorOf(Props[TestResponder], name = "TestResponder")

    def configure {
      from("direct:forward-test-1").to("mock:mock")
      // for one-way messaging tests
      from("direct:producer-test-1").to("mock:mock")
      // for two-way messaging tests (async)
      from("direct:producer-test-3").to(responder)
      // for two-way messaging tests (sync)
      from("direct:producer-test-2").process(new Processor() {
        def process(exchange: Exchange) = {
          exchange.getIn.getBody match {
            case "fail" ⇒ throw new Exception("failure")
            case body   ⇒ exchange.getOut.setBody("received %s" format body)
          }
        }
      })
    }
  }

  class SimpleProducer(override val endpointUri: String) extends Producer {
    override protected def transformResponse(msg: Any) = msg match {
      case m: CamelMessage ⇒ m.bodyAs[String]
      case m: Any          ⇒ m
    }
  }

  class IntermittentErrorConsumer(override val endpointUri: String) extends Consumer {
    def receive = {
      case msg: CamelMessage if msg.bodyAs[String] == "fail" ⇒ sender() ! Failure(new Exception("fail"))
      case msg: CamelMessage                                 ⇒ sender() ! msg
    }
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka ProducerFeatureTest.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.