|
Akka/Scala example source code file (ActorProducerTest.scala)
The ActorProducerTest.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.camel.internal.component import language.postfixOps import org.scalatest.mock.MockitoSugar import org.mockito.Matchers.any import org.mockito.Mockito._ import org.apache.camel.{ CamelContext, ProducerTemplate, AsyncCallback } import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.duration._ import java.lang.String import akka.camel._ import internal.{ DefaultCamel, CamelExchangeAdapter } import org.scalatest.{ Suite, WordSpecLike, BeforeAndAfterAll, BeforeAndAfterEach } import akka.camel.TestSupport._ import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } import org.mockito.{ ArgumentMatcher, Matchers ⇒ MMatchers, Mockito } import org.scalatest.Matchers import akka.actor.Status.{ Success, Failure } import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem.Settings import akka.event.LoggingAdapter import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe } import org.apache.camel.impl.DefaultCamelContext import scala.concurrent.{ Await, Promise, Future } import akka.util.Timeout import akka.actor._ import akka.testkit._ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with ActorProducerFixture { implicit val timeout = Timeout(10 seconds) "ActorProducer" when { "synchronous" when { "consumer actor doesnt exist" must { "set failure message on exchange" in { producer = given(actor = null) producer.processExchangeAdapter(exchange) verify(exchange).setFailure(any[FailureResult]) } } "in-only" must { def producer = given(outCapable = false) "pass the message to the consumer" taggedAs TimingTest in { producer.processExchangeAdapter(exchange) within(1 second)(probe.expectMsg(message)) } "not expect response and not block" taggedAs TimingTest in { time(producer.processExchangeAdapter(exchange)) should be < (200 millis) } } "manualAck" when { "response is Ack" must { "process the exchange" in { producer = given(outCapable = false, autoAck = false) import system.dispatcher val future = Future { producer.processExchangeAdapter(exchange) } within(1 second) { probe.expectMsgType[CamelMessage] info("message sent to consumer") probe.sender() ! Ack } verify(exchange, never()).setResponse(any[CamelMessage]) info("no response forwarded to exchange") Await.ready(future, timeout.duration) } } "the consumer does not respond wit Ack" must { "not block forever" in { producer = given(outCapable = false, autoAck = false) import system.dispatcher val future = Future { producer.processExchangeAdapter(exchange) } within(1 second) { probe.expectMsgType[CamelMessage] info("message sent to consumer") } verify(exchange, never()).setResponse(any[CamelMessage]) info("no response forwarded to exchange") intercept[TimeoutException] { Await.ready(future, camel.settings.ReplyTimeout - (1 seconds)) } } } } "out capable" when { "response is sent back by actor" must { "get a response" in { producer = given(actor = echoActor, outCapable = true) producer.processExchangeAdapter(exchange) verify(exchange).setResponse(msg("received " + message)) } } "response is not sent by actor" must { val latch = TestLatch(1) val callback = new AsyncCallback { def done(doneSync: Boolean) { latch.countDown() } } def process() = { producer = given(outCapable = true, replyTimeout = 100 millis) val duration = time { producer.processExchangeAdapter(exchange, callback) // wait for the actor to complete the callback Await.ready(latch, 1.seconds.dilated) } latch.reset() duration } "timeout after replyTimeout" taggedAs TimingTest in { val duration = process() duration should (be >= (100 millis) and be < (300 millis)) } "never set the response on exchange" in { process() verify(exchange, Mockito.never()).setResponse(any[CamelMessage]) } "set failure message to timeout" in { process() verify(exchange).setFailure(any[FailureResult]) } } } } "asynchronous" when { def verifyFailureIsSet(): Unit = { producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.awaitCalled() verify(exchange).setFailure(any[FailureResult]) } "out-capable" when { "consumer actor doesnt exist" must { "set failure message on exchange" in { producer = given(actor = null, outCapable = true) verifyFailureIsSet() } } "response is ok" must { "get a response and async callback as soon as it gets the response (but not before)" in { producer = given(outCapable = true) val doneSync = producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.expectNoCallWithin(100 millis) info("no async callback before response") within(1 second) { probe.expectMsgType[CamelMessage] probe.sender() ! "some message" } doneSync should be(false) info("done async") asyncCallback.expectDoneAsyncWithin(1 second) info("async callback received") verify(exchange).setResponse(msg("some message")) info("response as expected") } } "response is Failure" must { "set an exception on exchange" in { val exception = new RuntimeException("some failure") val failure = Failure(exception) producer = given(outCapable = true) producer.processExchangeAdapter(exchange, asyncCallback) within(1 second) { probe.expectMsgType[CamelMessage] probe.sender() ! failure asyncCallback.awaitCalled(remaining) } verify(exchange).setFailure(FailureResult(exception)) } } "no response is sent within timeout" must { "set TimeoutException on exchange" in { producer = given(outCapable = true, replyTimeout = 10 millis) producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.awaitCalled(100 millis) verify(exchange).setFailure(MMatchers.argThat(new ArgumentMatcher[FailureResult] { def matches(failure: AnyRef) = { failure.asInstanceOf[FailureResult].cause should be(anInstanceOf[TimeoutException]) true } })) } } } "in-only" when { "consumer actor doesnt exist" must { "set failure message on exchange" in { producer = given(actor = null, outCapable = false) verifyFailureIsSet() } } "autoAck" must { "get sync callback as soon as it sends a message" in { producer = given(outCapable = false, autoAck = true) val doneSync = producer.processExchangeAdapter(exchange, asyncCallback) doneSync should be(true) info("done sync") asyncCallback.expectDoneSyncWithin(1 second) info("async callback called") verify(exchange, never()).setResponse(any[CamelMessage]) info("no response forwarded to exchange") } } "manualAck" when { "response is Ack" must { "get async callback" in { producer = given(outCapable = false, autoAck = false) val doneSync = producer.processExchangeAdapter(exchange, asyncCallback) doneSync should be(false) within(1 second) { probe.expectMsgType[CamelMessage] info("message sent to consumer") probe.sender() ! Ack asyncCallback.expectDoneAsyncWithin(remaining) info("async callback called") } verify(exchange, never()).setResponse(any[CamelMessage]) info("no response forwarded to exchange") } } "expecting Ack or Failure message and some other message is sent as a response" must { "fail" in { producer = given(outCapable = false, autoAck = false) producer.processExchangeAdapter(exchange, asyncCallback) within(1 second) { probe.expectMsgType[CamelMessage] info("message sent to consumer") probe.sender() ! "some neither Ack nor Failure response" asyncCallback.expectDoneAsyncWithin(remaining) info("async callback called") } verify(exchange, never()).setResponse(any[CamelMessage]) info("no response forwarded to exchange") verify(exchange).setFailure(any[FailureResult]) info("failure set") } } "no Ack is sent within timeout" must { "set failure on exchange" in { producer = given(outCapable = false, replyTimeout = 10 millis, autoAck = false) producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.awaitCalled(100 millis) verify(exchange).setFailure(any[FailureResult]) } } "response is Failure" must { "set an exception on exchange" in { producer = given(outCapable = false, autoAck = false) val doneSync = producer.processExchangeAdapter(exchange, asyncCallback) doneSync should be(false) within(1 second) { probe.expectMsgType[CamelMessage] info("message sent to consumer") probe.sender() ! Failure(new Exception) asyncCallback.awaitCalled(remaining) } verify(exchange, never()).setResponse(any[CamelMessage]) info("no response forwarded to exchange") verify(exchange).setFailure(any[FailureResult]) info("failure set") } } } } } } } private[camel] trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with BeforeAndAfterEach { self: TestKit with Matchers with Suite ⇒ var camel: Camel = _ var exchange: CamelExchangeAdapter = _ var callback: AsyncCallback = _ var producer: ActorProducer = _ var message: CamelMessage = _ var probe: TestProbe = _ var asyncCallback: TestAsyncCallback = _ var actorEndpointPath: ActorEndpointPath = _ var actorComponent: ActorComponent = _ override protected def beforeEach() { asyncCallback = createAsyncCallback probe = TestProbe() val sys = mock[ExtendedActorSystem] val config = ConfigFactory.defaultReference() when(sys.dispatcher) thenReturn system.dispatcher when(sys.dynamicAccess) thenReturn system.asInstanceOf[ExtendedActorSystem].dynamicAccess when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem")) when(sys.name) thenReturn ("mocksystem") def camelWithMocks = new DefaultCamel(sys) { override val log = mock[LoggingAdapter] override lazy val template = mock[ProducerTemplate] override lazy val context = mock[DefaultCamelContext] override val settings = new CamelSettings(ConfigFactory.parseString( """ akka { camel { jmx = off streamingCache = on consumer { auto-ack = on reply-timeout = 2s activation-timeout = 10s } } } """).withFallback(config), sys.dynamicAccess) } camel = camelWithMocks exchange = mock[CamelExchangeAdapter] callback = mock[AsyncCallback] actorEndpointPath = mock[ActorEndpointPath] actorComponent = mock[ActorComponent] producer = new ActorProducer(configure(), camel) message = CamelMessage(null, null) } override protected def afterAll() { shutdown() } def msg(s: String) = CamelMessage(s, Map.empty) def given(actor: ActorRef = probe.ref, outCapable: Boolean = true, autoAck: Boolean = true, replyTimeout: FiniteDuration = 20 seconds) = { prepareMocks(actor, outCapable = outCapable) new ActorProducer(configure(isAutoAck = autoAck, _replyTimeout = replyTimeout), camel) } def createAsyncCallback = new TestAsyncCallback class TestAsyncCallback extends AsyncCallback { def expectNoCallWithin(duration: Duration): Unit = if (callbackReceived.await(duration.length, duration.unit)) fail("NOT expected callback, but received one!") def awaitCalled(timeout: Duration = 1 second) { valueWithin(1 second) } val callbackReceived = new CountDownLatch(1) val callbackValue = new AtomicBoolean() def done(doneSync: Boolean) { callbackValue set doneSync callbackReceived.countDown() } private[this] def valueWithin(implicit timeout: FiniteDuration) = if (!callbackReceived.await(timeout.length, timeout.unit)) fail("Callback not received!") else callbackValue.get def expectDoneSyncWithin(implicit timeout: FiniteDuration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously") def expectDoneAsyncWithin(implicit timeout: FiniteDuration): Unit = if (valueWithin(timeout)) fail("Expected to be done Asynchronously") } def configure(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: FiniteDuration = 20 seconds) = { val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel) endpoint.autoAck = isAutoAck endpoint.replyTimeout = _replyTimeout endpoint } def prepareMocks(actor: ActorRef, message: CamelMessage = message, outCapable: Boolean) { when(actorEndpointPath.findActorIn(any[ActorSystem])) thenReturn Option(actor) when(exchange.toRequestMessage(any[Map[String, Any]])) thenReturn message when(exchange.isOutCapable) thenReturn outCapable } def echoActor = system.actorOf(Props(new Actor { def receive = { case msg ⇒ sender() ! "received " + msg } }), name = "echoActor") } Other Akka source code examplesHere is a short list of links related to this Akka ActorProducerTest.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.