alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ConsumerIntegrationTest.scala)

This example Akka source code file (ConsumerIntegrationTest.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

ack, akka, camelmessage, concurrent, consumer, duration, error, failure, manualackconsumer, string, test, testactor, testexception, testing, throwable, time

The ConsumerIntegrationTest.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.camel

import internal.ActorActivationException
import language.postfixOps
import language.existentials

import akka.actor._
import org.scalatest.Matchers
import org.scalatest.WordSpec
import akka.camel.TestSupport._
import org.apache.camel.model.{ ProcessorDefinition, RouteDefinition }
import org.apache.camel.builder.Builder
import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException }
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
import akka.actor.Status.Failure
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Await }
import akka.testkit._
import akka.util.Timeout

class ConsumerIntegrationTest extends WordSpec with Matchers with NonSharedCamelSystem {
  "ConsumerIntegrationTest" must {
    val defaultTimeoutDuration = 10 seconds
    implicit val defaultTimeout = Timeout(defaultTimeoutDuration)
    implicit def ec: ExecutionContext = system.dispatcher

    "Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
      filterEvents(EventFilter[FailedToCreateRouteException](pattern = "failed to activate.*", occurrences = 1)) {
        val actorRef = system.actorOf(Props(new TestActor(uri = "some invalid uri")), "invalidActor")
        intercept[FailedToCreateRouteException] {
          Await.result(camel.activationFutureFor(actorRef), defaultTimeoutDuration)
        }
      }
    }

    "Consumer must support in-out messaging" in {
      start(new Consumer {
        def endpointUri = "direct:a1"
        def receive = {
          case m: CamelMessage ⇒ sender() ! "received " + m.bodyAs[String]
        }
      }, name = "direct-a1")
      camel.sendTo("direct:a1", msg = "some message") should be("received some message")
    }

    "Consumer must time-out if consumer is slow" taggedAs TimingTest in {
      val SHORT_TIMEOUT = 10 millis
      val LONG_WAIT = 1 second

      val ref = start(new Consumer {
        override def replyTimeout = SHORT_TIMEOUT
        def endpointUri = "direct:a3"
        def receive = { case _ ⇒ { Thread.sleep(LONG_WAIT.toMillis); sender() ! "done" } }
      }, name = "ignore-this-deadletter-timeout-consumer-reply")

      intercept[CamelExecutionException] {
        camel.sendTo("direct:a3", msg = "some msg 3")
      }.getCause.getClass should be(classOf[TimeoutException])

      stop(ref)
    }

    "Consumer must process messages even after actor restart" in {
      val restarted = TestLatch()
      val consumer = start(new Consumer {
        def endpointUri = "direct:a2"

        def receive = {
          case "throw"         ⇒ throw new TestException("")
          case m: CamelMessage ⇒ sender() ! "received " + m.bodyAs[String]
        }

        override def postRestart(reason: Throwable) {
          restarted.countDown()
        }
      }, "direct-a2")
      filterEvents(EventFilter[TestException](occurrences = 1)) {
        consumer ! "throw"
        Await.ready(restarted, defaultTimeoutDuration)

        camel.sendTo("direct:a2", msg = "xyz") should be("received xyz")
      }
      stop(consumer)
    }

    "Consumer must unregister itself when stopped" in {
      val consumer = start(new TestActor(), name = "test-actor-unregister")
      Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration)

      camel.routeCount should be > (0)

      system.stop(consumer)
      Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration)

      camel.routeCount should be(0)
    }

    "Consumer must register on uri passed in through constructor" in {
      val consumer = start(new TestActor("direct://test"), name = "direct-test")
      Await.result(camel.activationFutureFor(consumer), defaultTimeoutDuration)

      camel.routeCount should be > (0)
      camel.routes.get(0).getEndpoint.getEndpointUri should be("direct://test")
      system.stop(consumer)
      Await.result(camel.deactivationFutureFor(consumer), defaultTimeoutDuration)
      camel.routeCount should be(0)
      stop(consumer)
    }

    "Error passing consumer supports error handling through route modification" in {
      val ref = start(new ErrorThrowingConsumer("direct:error-handler-test") {
        override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
          rd.onException(classOf[TestException]).handled(true).transform(Builder.exceptionMessage).end
        }
      }, name = "direct-error-handler-test")
      filterEvents(EventFilter[TestException](occurrences = 1)) {
        camel.sendTo("direct:error-handler-test", msg = "hello") should be("error: hello")
      }
      stop(ref)
    }

    "Error passing consumer supports redelivery through route modification" in {
      val ref = start(new FailingOnceConsumer("direct:failing-once-concumer") {
        override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
          rd.onException(classOf[TestException]).maximumRedeliveries(1).end
        }
      }, name = "direct-failing-once-consumer")
      filterEvents(EventFilter[TestException](occurrences = 1)) {
        camel.sendTo("direct:failing-once-concumer", msg = "hello") should be("accepted: hello")
      }
      stop(ref)
    }

    "Consumer supports manual Ack" in {
      val ref = start(new ManualAckConsumer() {
        def endpointUri = "direct:manual-ack"
        def receive = { case _ ⇒ sender() ! Ack }
      }, name = "direct-manual-ack-1")
      camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS) should be(null) //should not timeout
      stop(ref)
    }

    "Consumer handles manual Ack failure" in {
      val someException = new Exception("e1")
      val ref = start(new ManualAckConsumer() {
        def endpointUri = "direct:manual-ack"
        def receive = { case _ ⇒ sender() ! Failure(someException) }
      }, name = "direct-manual-ack-2")

      intercept[ExecutionException] {
        camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS)
      }.getCause.getCause should be(someException)
      stop(ref)
    }

    "Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in {
      val ref = start(new ManualAckConsumer() {
        override def replyTimeout = 10 millis
        def endpointUri = "direct:manual-ack"
        def receive = { case _ ⇒ }
      }, name = "direct-manual-ack-3")

      intercept[ExecutionException] {
        camel.template.asyncSendBody("direct:manual-ack", "some message").get(defaultTimeoutDuration.toSeconds, TimeUnit.SECONDS)
      }.getCause.getCause.getMessage should include("Failed to get Ack")
      stop(ref)
    }
    "respond to onRouteDefinition" in {
      val ref = start(new ErrorRespondingConsumer("direct:error-responding-consumer-1"), "error-responding-consumer")
      filterEvents(EventFilter[TestException](occurrences = 1)) {
        val response = camel.sendTo("direct:error-responding-consumer-1", "some body")
        response should be("some body has an error")
      }
      stop(ref)
    }
  }
}

class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
  def receive = {
    case msg: CamelMessage ⇒ throw new TestException("error: %s" format msg.body)
  }
  override def preRestart(reason: Throwable, message: Option[Any]) {
    super.preRestart(reason, message)
    sender() ! Failure(reason)
  }
}

class ErrorRespondingConsumer(override val endpointUri: String) extends Consumer {
  def receive = {
    case msg: CamelMessage ⇒ throw new TestException("Error!")
  }
  override def onRouteDefinition = (rd: RouteDefinition) ⇒ {
    // Catch TestException and handle it by returning a modified version of the in message
    rd.onException(classOf[TestException]).handled(true).transform(Builder.body.append(" has an error")).end
  }

  final override def preRestart(reason: Throwable, message: Option[Any]) {
    super.preRestart(reason, message)
    sender() ! Failure(reason)
  }
}

class FailingOnceConsumer(override val endpointUri: String) extends Consumer {

  def receive = {
    case msg: CamelMessage ⇒
      if (msg.headerAs[Boolean]("CamelRedelivered").getOrElse(false))
        sender() ! ("accepted: %s" format msg.body)
      else
        throw new TestException("rejected: %s" format msg.body)
  }

  final override def preRestart(reason: Throwable, message: Option[Any]) {
    super.preRestart(reason, message)
    sender() ! Failure(reason)
  }
}

class TestActor(uri: String = "file://target/abcde") extends Consumer {
  def endpointUri = uri
  def receive = { case _ ⇒ /* do nothing */ }
}

trait ManualAckConsumer extends Consumer {
  override def autoAck = false
}

class TestException(msg: String) extends Exception(msg)

Other Akka source code examples

Here is a short list of links related to this Akka ConsumerIntegrationTest.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.