|
Akka/Scala example source code file (ConsumerIntegrationTest.scala)
The ConsumerIntegrationTest.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel
import internal.ActorActivationException
import language.postfixOps
import language.existentials
import akka.actor._
import org.scalatest.Matchers
import org.scalatest.WordSpec
import akka.camel.TestSupport._
import org.apache.camel.model.{ ProcessorDefinition, RouteDefinition }
import org.apache.camel.builder.Builder
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
import akka.actor.Status.Failure
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Await }
import akka.testkit._
import akka.util.Timeout
class ConsumerIntegrationTest extends WordSpec with Matchers with NonSharedCamelSystem {
"ConsumerIntegrationTest" must {
val defaultTimeoutDuration = 10 seconds
implicit val defaultTimeout = Timeout(defaultTimeoutDuration)
implicit def ec: ExecutionContext = system.dispatcher
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
filterEvents(EventFilter[FailedToCreateRouteException](pattern = "failed to activate.*", occurrences = 1)) {
val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")), "invalidActor")
intercept[FailedToCreateRouteException] {
Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration)
}
}
}
"Consumer must support in-out messaging" in {
start(new Consumer {
def endpointUri = "direct:a1"
def receive = {
case m: CamelMessage ⇒ sender() ! "received " + m.bodyAs[String]
}
}, name = "direct-a1")
camel.sendTo("direct:a1", msg = "some message") should be("received some message")
}
"Consumer must time-out if consumer is slow" taggedAs TimingTest in {
val SHORT_TIMEOUT = 10 millis
val LONG_WAIT = 1 second
val ref = start(new Consumer {
override def replyTimeout = SHORT_TIMEOUT
def endpointUri = "direct:a3"
def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender() ! "done" } }
}, name = "ignore-this-deadletter-timeout-consumer-reply")
intercept[CamelExecutionException] {
camel.sendTo("direct:a3", msg = "some msg 3")
}.getCause.getClass should be(classOf[TimeoutException])
stop(ref)
}
"Consumer must process messages even after actor restart" in {
val restarted = TestLatch()
val consumer = start(new Consumer {
def endpointUri = "direct:a2"
def receive = {
case "throw" ⇒ throw new TestException("")
case m: CamelMessage ⇒ sender() ! "received " + m.bodyAs[String]
}
override def postRestart(reason: Throwable) {
restarted.countDown()
}
}, "direct-a2")
filterEvents(EventFilter[TestException](occurrences = 1)) {
consumer ! "throw"
Await.ready(restarted, defaultTimeoutDuration)
camel.sendTo("direct:a2", msg = "xyz") should be("received xyz")
}
stop(consumer)
}
"Consumer must unregister itself when stopped" in {
val consumer = start(new TestActor(), name = "test-actor-unregister")
Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount should be > (0)
system.stop(consumer)
Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount should be(0)
}
"Consumer must register on uri passed in through constructor" in {
val consumer = start(new TestActor("direct://test"), name = "direct-test")
Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount should be > (0)
camel.routes.get(0).getEndpoint.getEndpointUri should be("direct://test")
system.stop(consumer)
Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration)
camel.routeCount should be(0)
stop(consumer)
}
"Error passing consumer supports error handling through route modification" in {
val ref = start(new ErrorThrowingConsumer("direct:error-handler-test") {
override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
rd.onException(classOf[TestException]).handled(true).transform(Builder.exceptionMessage).end
}
}, name = "direct-error-handler-test")
filterEvents(EventFilter[TestException](occurrences = 1)) {
camel.sendTo("direct:error-handler-test", msg = "hello") should be("error: hello")
}
stop(ref)
}
"Error passing consumer supports redelivery through route modification" in {
val ref = start(new FailingOnceConsumer("direct:failing-once-concumer") {
override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
rd.onException(classOf[TestException]).maximumRedeliveries(1).end
}
}, name = "direct-failing-once-consumer")
filterEvents(EventFilter[TestException](occurrences = 1)) {
camel.sendTo("direct:failing-once-concumer", msg = "hello") should be("accepted: hello")
}
stop(ref)
}
"Consumer supports manual Ack" in {
val ref = start(new ManualAckConsumer() {
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ sender() ! Ack }
}, name = "direct-manual-ack-1")
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS) should be(null) //should not timeout
stop(ref)
}
"Consumer handles manual Ack failure" in {
val someException = new Exception("e1")
val ref = start(new ManualAckConsumer() {
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ sender() ! Failure(someException) }
}, name = "direct-manual-ack-2")
intercept[ExecutionException] {
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS)
}.getCause.getCause should be(someException)
stop(ref)
}
"Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in {
val ref = start(new ManualAckConsumer() {
override def replyTimeout = 10 millis
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ }
}, name = "direct-manual-ack-3")
intercept[ExecutionException] {
camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS)
}.getCause.getCause.getMessage should include("Failed to get Ack")
stop(ref)
}
"respond to onRouteDefinition" in {
val ref = start(new ErrorRespondingConsumer("direct:error-responding-consumer-1"), "error-responding-consumer")
filterEvents(EventFilter[TestException](occurrences = 1)) {
val response = camel.sendTo("direct:error-responding-consumer-1", "some body")
response should be("some body has an error")
}
stop(ref)
}
}
}
class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage ⇒ throw new TestException("error: %s" format msg.body)
}
override def preRestart(reason: Throwable, message: Option[Any]) {
super.preRestart(reason, message)
sender() ! Failure(reason)
}
}
class ErrorRespondingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage ⇒ throw new TestException("Error!")
}
override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
// Catch TestException and handle it by returning a modified version of the in message
rd.onException(classOf[TestException]).handled(true).transform(Builder.body.append(" has an error")).end
}
final override def preRestart(reason: Throwable, message: Option[Any]) {
super.preRestart(reason, message)
sender() ! Failure(reason)
}
}
class FailingOnceConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage ⇒
if (msg.headerAs[Boolean]("CamelRedelivered").getOrElse(false))
sender() ! ("accepted: %s" format msg.body)
else
throw new TestException("rejected: %s" format msg.body)
}
final override def preRestart(reason: Throwable, message: Option[Any]) {
super.preRestart(reason, message)
sender() ! Failure(reason)
}
}
class TestActor(uri: String = "file://target/abcde") extends Consumer {
def endpointUri = uri
def receive = { case _ ⇒ /* do nothing */ }
}
trait ManualAckConsumer extends Consumer {
override def autoAck = false
}
class TestException(msg: String) extends Exception(msg)
Other Akka source code examplesHere is a short list of links related to this Akka ConsumerIntegrationTest.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.