|
Akka/Scala example source code file (ProducerFeatureTest.scala)
The ProducerFeatureTest.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.camel import language.postfixOps import org.apache.camel.{ Exchange, Processor } import org.apache.camel.builder.RouteBuilder import org.apache.camel.component.mock.MockEndpoint import scala.concurrent.Await import akka.camel.TestSupport.SharedCamelSystem import akka.actor.SupervisorStrategy.Stop import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpecLike } import akka.actor._ import akka.pattern._ import scala.concurrent.duration._ import akka.util.Timeout import org.scalatest.Matchers import akka.testkit._ import akka.actor.Status.Failure /** * Tests the features of the Camel Producer. */ class ProducerFeatureTest extends TestKit(ActorSystem("ProducerFeatureTest", AkkaSpec.testConf)) with WordSpecLike with BeforeAndAfterAll with BeforeAndAfterEach with Matchers { import ProducerFeatureTest._ implicit def camel = CamelExtension(system) override protected def afterAll() { super.afterAll() shutdown() } val camelContext = camel.context // to make testing equality of messages easier, otherwise the breadcrumb shows up in the result. camelContext.setUseBreadcrumb(false) val timeoutDuration = 1 second implicit val timeout = Timeout(timeoutDuration) override protected def beforeAll { camelContext.addRoutes(new TestRoute(system)) } override protected def afterEach { mockEndpoint.reset() } "A Producer on a sync Camel route" must { "01 produce a message and receive normal response" in { val producer = system.actorOf(Props(new TestProducer("direct:producer-test-2", true)), name = "01-direct-producer-2") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) producer.tell(message, testActor) expectMsg(CamelMessage("received TEST", Map(CamelMessage.MessageExchangeId -> "123"))) } "02 produce a message and receive failure response" in { val latch = TestLatch() var deadActor: Option[ActorRef] = None val supervisor = system.actorOf(Props(new Actor { def receive = { case p: Props ⇒ { val producer = context.actorOf(p) context.watch(producer) sender() ! producer } case Terminated(actorRef) ⇒ { deadActor = Some(actorRef) latch.countDown() } } override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: AkkaCamelException ⇒ Stop } }), name = "02-prod-anonymous-supervisor") supervisor.tell(Props(new TestProducer("direct:producer-test-2")), testActor) val producer = receiveOne(timeoutDuration).asInstanceOf[ActorRef] val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { producer.tell(message, testActor) expectMsgPF(timeoutDuration) { case Failure(e: AkkaCamelException) ⇒ e.getMessage should be("failure") e.headers should be(Map(CamelMessage.MessageExchangeId -> "123")) } } Await.ready(latch, timeoutDuration) deadActor should be(Some(producer)) } "03 produce a message oneway" in { val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1", true) with Oneway), name = "03-direct-producer-1-oneway") mockEndpoint.expectedBodiesReceived("TEST") producer ! CamelMessage("test", Map()) mockEndpoint.assertIsSatisfied() } "04 produces message twoway without sender reference" in { // this test causes a dead letter which can be ignored. The producer is two-way but a oneway tell is used // to communicate with it and the response is ignored, which ends up in a dead letter val producer = system.actorOf(Props(new TestProducer("direct:producer-test-1")), name = "04-ignore-this-deadletter-direct-producer-test-no-sender") mockEndpoint.expectedBodiesReceived("test") producer ! CamelMessage("test", Map()) mockEndpoint.assertIsSatisfied() } } "A Producer on an async Camel route" must { "10 produce message to direct:producer-test-3 and receive normal response" in { val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "10-direct-producer-test-3") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) producer.tell(message, testActor) expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123"))) } "11 produce message to direct:producer-test-3 and receive failure response" in { val producer = system.actorOf(Props(new TestProducer("direct:producer-test-3")), name = "11-direct-producer-test-3-receive-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { producer.tell(message, testActor) expectMsgPF(timeoutDuration) { case Failure(e: AkkaCamelException) ⇒ e.getMessage should be("failure") e.headers should be(Map(CamelMessage.MessageExchangeId -> "123")) } } } "12 produce message, forward normal response of direct:producer-test-2 to a replying target actor and receive response" in { val target = system.actorOf(Props[ReplyingForwardTarget], name = "12-reply-forwarding-target") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "12-direct-producer-test-2-forwarder") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) producer.tell(message, testActor) expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))) } "13 produce message, forward failure response of direct:producer-test-2 to a replying target actor and receive response" in { val target = system.actorOf(Props[ReplyingForwardTarget], name = "13-reply-forwarding-target") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "13-direct-producer-test-2-forwarder-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { producer.tell(message, testActor) expectMsgPF(timeoutDuration) { case Failure(e: AkkaCamelException) ⇒ e.getMessage should be("failure") e.headers should be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } } } "14 produce message, forward normal response to a producing target actor and produce response to direct:forward-test-1" in { val target = system.actorOf(Props[ProducingForwardTarget], name = "14-producer-forwarding-target") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "14-direct-producer-test-2-forwarder-to-producing-target") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map()), producer) mockEndpoint.assertIsSatisfied() } "15 produce message, forward failure response to a producing target actor and produce response to direct:forward-test-1" in { val target = system.actorOf(Props[ProducingForwardTarget], name = "15-producer-forwarding-target-failure") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-2", target)), name = "15-direct-producer-test-2-forward-failure") filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { mockEndpoint.expectedMessageCount(1) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) mockEndpoint.assertIsSatisfied() } } "16 produce message, forward normal response from direct:producer-test-3 to a replying target actor and receive response" in { val target = system.actorOf(Props[ReplyingForwardTarget], name = "16-reply-forwarding-target") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "16-direct-producer-test-3-to-replying-actor") val message = CamelMessage("test", Map(CamelMessage.MessageExchangeId -> "123")) producer.tell(message, testActor) expectMsg(CamelMessage("received test", Map(CamelMessage.MessageExchangeId -> "123", "test" -> "result"))) } "17 produce message, forward failure response from direct:producer-test-3 to a replying target actor and receive response" in { val target = system.actorOf(Props[ReplyingForwardTarget], name = "17-reply-forwarding-target") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "17-direct-producer-test-3-forward-failure") val message = CamelMessage("fail", Map(CamelMessage.MessageExchangeId -> "123")) filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { producer.tell(message, testActor) expectMsgPF(timeoutDuration) { case Failure(e: AkkaCamelException) ⇒ e.getMessage should be("failure") e.headers should be(Map(CamelMessage.MessageExchangeId -> "123", "test" -> "failure")) } } } "18 produce message, forward normal response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { val target = system.actorOf(Props[ProducingForwardTarget], "18-producing-forward-target-normal") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "18-direct-producer-test-3-forward-normal") mockEndpoint.expectedBodiesReceived("received test") producer.tell(CamelMessage("test", Map()), producer) mockEndpoint.assertIsSatisfied() } "19 produce message, forward failure response from direct:producer-test-3 to a producing target actor and produce response to direct:forward-test-1" in { val target = system.actorOf(Props[ProducingForwardTarget], "19-producing-forward-target-failure") val producer = system.actorOf(Props(new TestForwarder("direct:producer-test-3", target)), name = "19-direct-producer-test-3-forward-failure-producing-target") filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { mockEndpoint.expectedMessageCount(1) mockEndpoint.message(0).body().isInstanceOf(classOf[akka.actor.Status.Failure]) producer.tell(CamelMessage("fail", Map()), producer) mockEndpoint.assertIsSatisfied() } } "20 keep producing messages after error" in { import TestSupport._ val consumer = start(new IntermittentErrorConsumer("direct:intermittentTest-1"), "20-intermittentTest-error-consumer") val producer = start(new SimpleProducer("direct:intermittentTest-1"), "20-intermittentTest-producer") filterEvents(EventFilter[AkkaCamelException](occurrences = 1)) { val futureFailed = producer.tell("fail", testActor) expectMsgPF(timeoutDuration) { case Failure(e) ⇒ e.getMessage should be("fail") } producer.tell("OK", testActor) expectMsg("OK") } stop(consumer) stop(producer) } "21 be able to transform outgoing messages and have a valid sender reference" in { import TestSupport._ filterEvents(EventFilter[Exception](occurrences = 1)) { val producerSupervisor = system.actorOf(Props(new ProducerSupervisor(Props(new ChildProducer("mock:mock", true)))), "21-ignore-deadletter-sender-ref-test") mockEndpoint.reset() producerSupervisor.tell(CamelMessage("test", Map()), testActor) producerSupervisor.tell(CamelMessage("err", Map()), testActor) mockEndpoint.expectedMessageCount(1) mockEndpoint.expectedBodiesReceived("TEST") expectMsg("TEST") } } } private def mockEndpoint = camel.context.getEndpoint("mock:mock", classOf[MockEndpoint]) } object ProducerFeatureTest { class ProducerSupervisor(childProps: Props) extends Actor { override def supervisorStrategy = SupervisorStrategy.stoppingStrategy val child = context.actorOf(childProps, "producer-supervisor-child") val duration = 10 seconds implicit val timeout = Timeout(duration) implicit val ec = context.system.dispatcher Await.ready(CamelExtension(context.system).activationFutureFor(child), timeout.duration) def receive = { case msg: CamelMessage ⇒ child forward (msg) case (aref: ActorRef, msg: String) ⇒ aref ! msg } } class ChildProducer(uri: String, upper: Boolean = false) extends Actor with Producer { override def oneway = true var lastSender: Option[ActorRef] = None var lastMessage: Option[String] = None def endpointUri = uri override def transformOutgoingMessage(msg: Any) = msg match { case msg: CamelMessage ⇒ if (upper) msg.mapBody { body: String ⇒ if (body == "err") throw new Exception("Crash!") val upperMsg = body.toUpperCase lastSender = Some(sender()) lastMessage = Some(upperMsg) } else msg } override def postStop() { for (msg ← lastMessage; aref ← lastSender) context.parent ! ((aref, msg)) super.postStop() } } class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer { def endpointUri = uri override def preRestart(reason: Throwable, message: Option[Any]) { //overriding on purpose so it doesn't try to deRegister and reRegister at restart, // which would cause a deadletter message in the test output. } override protected def transformOutgoingMessage(msg: Any) = msg match { case msg: CamelMessage ⇒ if (upper) msg.mapBody { body: String ⇒ body.toUpperCase } else msg } } class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer { def endpointUri = uri override def headersToCopy = Set(CamelMessage.MessageExchangeId, "test") override def routeResponse(msg: Any): Unit = target forward msg } class TestResponder extends Actor { def receive = { case msg: CamelMessage ⇒ msg.body match { case "fail" ⇒ context.sender() ! akka.actor.Status.Failure(new AkkaCamelException(new Exception("failure"), msg.headers)) case _ ⇒ context.sender() ! (msg.mapBody { body: String ⇒ "received %s" format body }) } } } class ReplyingForwardTarget extends Actor { def receive = { case msg: CamelMessage ⇒ context.sender() ! (msg.copy(headers = msg.headers + ("test" -> "result"))) case msg: akka.actor.Status.Failure ⇒ msg.cause match { case e: AkkaCamelException ⇒ context.sender() ! Status.Failure(new AkkaCamelException(e, e.headers + ("test" -> "failure"))) } } } class ProducingForwardTarget extends Actor with Producer with Oneway { def endpointUri = "direct:forward-test-1" } class TestRoute(system: ActorSystem) extends RouteBuilder { val responder = system.actorOf(Props[TestResponder], name = "TestResponder") def configure { from("direct:forward-test-1").to("mock:mock") // for one-way messaging tests from("direct:producer-test-1").to("mock:mock") // for two-way messaging tests (async) from("direct:producer-test-3").to(responder) // for two-way messaging tests (sync) from("direct:producer-test-2").process(new Processor() { def process(exchange: Exchange) = { exchange.getIn.getBody match { case "fail" ⇒ throw new Exception("failure") case body ⇒ exchange.getOut.setBody("received %s" format body) } } }) } } class SimpleProducer(override val endpointUri: String) extends Producer { override protected def transformResponse(msg: Any) = msg match { case m: CamelMessage ⇒ m.bodyAs[String] case m: Any ⇒ m } } class IntermittentErrorConsumer(override val endpointUri: String) extends Consumer { def receive = { case msg: CamelMessage if msg.bodyAs[String] == "fail" ⇒ sender() ! Failure(new Exception("fail")) case msg: CamelMessage ⇒ sender() ! msg } } } Other Akka source code examplesHere is a short list of links related to this Akka ProducerFeatureTest.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.