|
Akka/Scala example source code file (ActorProducerTest.scala)
The ActorProducerTest.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.camel.internal.component
import language.postfixOps
import org.scalatest.mock.MockitoSugar
import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.apache.camel.{ CamelContext, ProducerTemplate, AsyncCallback }
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration._
import java.lang.String
import akka.camel._
import internal.{ DefaultCamel, CamelExchangeAdapter }
import org.scalatest.{ Suite, WordSpecLike, BeforeAndAfterAll, BeforeAndAfterEach }
import akka.camel.TestSupport._
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
import org.mockito.{ ArgumentMatcher, Matchers ⇒ MMatchers, Mockito }
import org.scalatest.Matchers
import akka.actor.Status.{ Success, Failure }
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem.Settings
import akka.event.LoggingAdapter
import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe }
import org.apache.camel.impl.DefaultCamelContext
import scala.concurrent.{ Await, Promise, Future }
import akka.util.Timeout
import akka.actor._
import akka.testkit._
class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with ActorProducerFixture {
implicit val timeout = Timeout(10 seconds)
"ActorProducer" when {
"synchronous" when {
"consumer actor doesnt exist" must {
"set failure message on exchange" in {
producer = given(actor = null)
producer.processExchangeAdapter(exchange)
verify(exchange).setFailure(any[FailureResult])
}
}
"in-only" must {
def producer = given(outCapable = false)
"pass the message to the consumer" taggedAs TimingTest in {
producer.processExchangeAdapter(exchange)
within(1 second)(probe.expectMsg(message))
}
"not expect response and not block" taggedAs TimingTest in {
time(producer.processExchangeAdapter(exchange)) should be < (200 millis)
}
}
"manualAck" when {
"response is Ack" must {
"process the exchange" in {
producer = given(outCapable = false, autoAck = false)
import system.dispatcher
val future = Future { producer.processExchangeAdapter(exchange) }
within(1 second) {
probe.expectMsgType[CamelMessage]
info("message sent to consumer")
probe.sender() ! Ack
}
verify(exchange, never()).setResponse(any[CamelMessage])
info("no response forwarded to exchange")
Await.ready(future, timeout.duration)
}
}
"the consumer does not respond wit Ack" must {
"not block forever" in {
producer = given(outCapable = false, autoAck = false)
import system.dispatcher
val future = Future {
producer.processExchangeAdapter(exchange)
}
within(1 second) {
probe.expectMsgType[CamelMessage]
info("message sent to consumer")
}
verify(exchange, never()).setResponse(any[CamelMessage])
info("no response forwarded to exchange")
intercept[TimeoutException] {
Await.ready(future, camel.settings.ReplyTimeout - (1 seconds))
}
}
}
}
"out capable" when {
"response is sent back by actor" must {
"get a response" in {
producer = given(actor = echoActor, outCapable = true)
producer.processExchangeAdapter(exchange)
verify(exchange).setResponse(msg("received " + message))
}
}
"response is not sent by actor" must {
val latch = TestLatch(1)
val callback = new AsyncCallback {
def done(doneSync: Boolean) {
latch.countDown()
}
}
def process() = {
producer = given(outCapable = true, replyTimeout = 100 millis)
val duration = time {
producer.processExchangeAdapter(exchange, callback)
// wait for the actor to complete the callback
Await.ready(latch, 1.seconds.dilated)
}
latch.reset()
duration
}
"timeout after replyTimeout" taggedAs TimingTest in {
val duration = process()
duration should (be >= (100 millis) and be < (300 millis))
}
"never set the response on exchange" in {
process()
verify(exchange, Mockito.never()).setResponse(any[CamelMessage])
}
"set failure message to timeout" in {
process()
verify(exchange).setFailure(any[FailureResult])
}
}
}
}
"asynchronous" when {
def verifyFailureIsSet(): Unit = {
producer.processExchangeAdapter(exchange, asyncCallback)
asyncCallback.awaitCalled()
verify(exchange).setFailure(any[FailureResult])
}
"out-capable" when {
"consumer actor doesnt exist" must {
"set failure message on exchange" in {
producer = given(actor = null, outCapable = true)
verifyFailureIsSet()
}
}
"response is ok" must {
"get a response and async callback as soon as it gets the response (but not before)" in {
producer = given(outCapable = true)
val doneSync = producer.processExchangeAdapter(exchange, asyncCallback)
asyncCallback.expectNoCallWithin(100 millis)
info("no async callback before response")
within(1 second) {
probe.expectMsgType[CamelMessage]
probe.sender() ! "some message"
}
doneSync should be(false)
info("done async")
asyncCallback.expectDoneAsyncWithin(1 second)
info("async callback received")
verify(exchange).setResponse(msg("some message"))
info("response as expected")
}
}
"response is Failure" must {
"set an exception on exchange" in {
val exception = new RuntimeException("some failure")
val failure = Failure(exception)
producer = given(outCapable = true)
producer.processExchangeAdapter(exchange, asyncCallback)
within(1 second) {
probe.expectMsgType[CamelMessage]
probe.sender() ! failure
asyncCallback.awaitCalled(remaining)
}
verify(exchange).setFailure(FailureResult(exception))
}
}
"no response is sent within timeout" must {
"set TimeoutException on exchange" in {
producer = given(outCapable = true, replyTimeout = 10 millis)
producer.processExchangeAdapter(exchange, asyncCallback)
asyncCallback.awaitCalled(100 millis)
verify(exchange).setFailure(MMatchers.argThat(new ArgumentMatcher[FailureResult] {
def matches(failure: AnyRef) = {
failure.asInstanceOf[FailureResult].cause should be(anInstanceOf[TimeoutException])
true
}
}))
}
}
}
"in-only" when {
"consumer actor doesnt exist" must {
"set failure message on exchange" in {
producer = given(actor = null, outCapable = false)
verifyFailureIsSet()
}
}
"autoAck" must {
"get sync callback as soon as it sends a message" in {
producer = given(outCapable = false, autoAck = true)
val doneSync = producer.processExchangeAdapter(exchange, asyncCallback)
doneSync should be(true)
info("done sync")
asyncCallback.expectDoneSyncWithin(1 second)
info("async callback called")
verify(exchange, never()).setResponse(any[CamelMessage])
info("no response forwarded to exchange")
}
}
"manualAck" when {
"response is Ack" must {
"get async callback" in {
producer = given(outCapable = false, autoAck = false)
val doneSync = producer.processExchangeAdapter(exchange, asyncCallback)
doneSync should be(false)
within(1 second) {
probe.expectMsgType[CamelMessage]
info("message sent to consumer")
probe.sender() ! Ack
asyncCallback.expectDoneAsyncWithin(remaining)
info("async callback called")
}
verify(exchange, never()).setResponse(any[CamelMessage])
info("no response forwarded to exchange")
}
}
"expecting Ack or Failure message and some other message is sent as a response" must {
"fail" in {
producer = given(outCapable = false, autoAck = false)
producer.processExchangeAdapter(exchange, asyncCallback)
within(1 second) {
probe.expectMsgType[CamelMessage]
info("message sent to consumer")
probe.sender() ! "some neither Ack nor Failure response"
asyncCallback.expectDoneAsyncWithin(remaining)
info("async callback called")
}
verify(exchange, never()).setResponse(any[CamelMessage])
info("no response forwarded to exchange")
verify(exchange).setFailure(any[FailureResult])
info("failure set")
}
}
"no Ack is sent within timeout" must {
"set failure on exchange" in {
producer = given(outCapable = false, replyTimeout = 10 millis, autoAck = false)
producer.processExchangeAdapter(exchange, asyncCallback)
asyncCallback.awaitCalled(100 millis)
verify(exchange).setFailure(any[FailureResult])
}
}
"response is Failure" must {
"set an exception on exchange" in {
producer = given(outCapable = false, autoAck = false)
val doneSync = producer.processExchangeAdapter(exchange, asyncCallback)
doneSync should be(false)
within(1 second) {
probe.expectMsgType[CamelMessage]
info("message sent to consumer")
probe.sender() ! Failure(new Exception)
asyncCallback.awaitCalled(remaining)
}
verify(exchange, never()).setResponse(any[CamelMessage])
info("no response forwarded to exchange")
verify(exchange).setFailure(any[FailureResult])
info("failure set")
}
}
}
}
}
}
}
private[camel] trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with BeforeAndAfterEach { self: TestKit with Matchers with Suite ⇒
var camel: Camel = _
var exchange: CamelExchangeAdapter = _
var callback: AsyncCallback = _
var producer: ActorProducer = _
var message: CamelMessage = _
var probe: TestProbe = _
var asyncCallback: TestAsyncCallback = _
var actorEndpointPath: ActorEndpointPath = _
var actorComponent: ActorComponent = _
override protected def beforeEach() {
asyncCallback = createAsyncCallback
probe = TestProbe()
val sys = mock[ExtendedActorSystem]
val config = ConfigFactory.defaultReference()
when(sys.dispatcher) thenReturn system.dispatcher
when(sys.dynamicAccess) thenReturn system.asInstanceOf[ExtendedActorSystem].dynamicAccess
when(sys.settings) thenReturn (new Settings(this.getClass.getClassLoader, config, "mocksystem"))
when(sys.name) thenReturn ("mocksystem")
def camelWithMocks = new DefaultCamel(sys) {
override val log = mock[LoggingAdapter]
override lazy val template = mock[ProducerTemplate]
override lazy val context = mock[DefaultCamelContext]
override val settings = new CamelSettings(ConfigFactory.parseString(
"""
akka {
camel {
jmx = off
streamingCache = on
consumer {
auto-ack = on
reply-timeout = 2s
activation-timeout = 10s
}
}
}
""").withFallback(config), sys.dynamicAccess)
}
camel = camelWithMocks
exchange = mock[CamelExchangeAdapter]
callback = mock[AsyncCallback]
actorEndpointPath = mock[ActorEndpointPath]
actorComponent = mock[ActorComponent]
producer = new ActorProducer(configure(), camel)
message = CamelMessage(null, null)
}
override protected def afterAll() {
shutdown()
}
def msg(s: String) = CamelMessage(s, Map.empty)
def given(actor: ActorRef = probe.ref, outCapable: Boolean = true, autoAck: Boolean = true, replyTimeout: FiniteDuration = 20 seconds) = {
prepareMocks(actor, outCapable = outCapable)
new ActorProducer(configure(isAutoAck = autoAck, _replyTimeout = replyTimeout), camel)
}
def createAsyncCallback = new TestAsyncCallback
class TestAsyncCallback extends AsyncCallback {
def expectNoCallWithin(duration: Duration): Unit =
if (callbackReceived.await(duration.length, duration.unit)) fail("NOT expected callback, but received one!")
def awaitCalled(timeout: Duration = 1 second) { valueWithin(1 second) }
val callbackReceived = new CountDownLatch(1)
val callbackValue = new AtomicBoolean()
def done(doneSync: Boolean) {
callbackValue set doneSync
callbackReceived.countDown()
}
private[this] def valueWithin(implicit timeout: FiniteDuration) =
if (!callbackReceived.await(timeout.length, timeout.unit)) fail("Callback not received!")
else callbackValue.get
def expectDoneSyncWithin(implicit timeout: FiniteDuration): Unit = if (!valueWithin(timeout)) fail("Expected to be done Synchronously")
def expectDoneAsyncWithin(implicit timeout: FiniteDuration): Unit = if (valueWithin(timeout)) fail("Expected to be done Asynchronously")
}
def configure(endpointUri: String = "test-uri", isAutoAck: Boolean = true, _replyTimeout: FiniteDuration = 20 seconds) = {
val endpoint = new ActorEndpoint(endpointUri, actorComponent, actorEndpointPath, camel)
endpoint.autoAck = isAutoAck
endpoint.replyTimeout = _replyTimeout
endpoint
}
def prepareMocks(actor: ActorRef, message: CamelMessage = message, outCapable: Boolean) {
when(actorEndpointPath.findActorIn(any[ActorSystem])) thenReturn Option(actor)
when(exchange.toRequestMessage(any[Map[String, Any]])) thenReturn message
when(exchange.isOutCapable) thenReturn outCapable
}
def echoActor = system.actorOf(Props(new Actor {
def receive = { case msg ⇒ sender() ! "received " + msg }
}), name = "echoActor")
}
Other Akka source code examplesHere is a short list of links related to this Akka ActorProducerTest.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.