|
Akka/Scala example source code file (ConcurrentActivationTest.scala)
The ConcurrentActivationTest.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.camel import language.postfixOps import org.scalatest.WordSpec import org.scalatest.Matchers import scala.concurrent.{ Promise, Await, Future } import scala.collection.immutable import akka.camel.TestSupport.NonSharedCamelSystem import akka.actor.{ ActorRef, Props, Actor } import akka.routing.BroadcastGroup import scala.concurrent.duration._ import akka.testkit._ import akka.util.Timeout import org.apache.camel.model.RouteDefinition import org.apache.camel.builder.Builder import akka.actor.ActorLogging /** * A test to concurrently register and de-register consumer and producer endpoints */ class ConcurrentActivationTest extends WordSpec with Matchers with NonSharedCamelSystem { "Activation" must { "support concurrent registrations and de-registrations" in { implicit val ec = system.dispatcher val number = 10 val eventFilter = EventFilter.warning(pattern = "received dead letter from .*producerRegistrar.*") system.eventStream.publish(TestEvent.Mute(eventFilter)) try { // A ConsumerBroadcast creates 'number' amount of ConsumerRegistrars, which will register 'number' amount of endpoints, // in total number*number endpoints, activating and deactivating every endpoint. // a promise to the list of registrars, which have a list of actorRefs each. A tuple of a list of activated refs and a list of deactivated refs val promiseRegistrarLists = Promise[(Future[List[List[ActorRef]]], Future[List[List[ActorRef]]])]() // future to all the futures of activation and deactivation val futureRegistrarLists = promiseRegistrarLists.future val ref = system.actorOf(Props(classOf[ConsumerBroadcast], promiseRegistrarLists), name = "broadcaster") // create the registrars ref ! CreateRegistrars(number) // send a broadcast to all registrars, so that number * number messages are sent // every Register registers a consumer and a producer (1 to number).map(i ⇒ ref ! RegisterConsumersAndProducers("direct:concurrent-")) // de-register all consumers and producers ref ! DeRegisterConsumersAndProducers() val promiseAllRefs = Promise[(List[ActorRef], List[ActorRef])]() val allRefsFuture = promiseAllRefs.future // map over all futures, put all futures in one list of activated and deactivated actor refs. futureRegistrarLists.map { case (futureActivations, futureDeactivations) ⇒ futureActivations zip futureDeactivations map { case (activations, deactivations) ⇒ promiseAllRefs.success((activations.flatten, deactivations.flatten)) } } val (activations, deactivations) = Await.result(allRefsFuture, 10.seconds.dilated) // should be the size of the activated activated producers and consumers activations.size should be(2 * number * number) // should be the size of the activated activated producers and consumers deactivations.size should be(2 * number * number) def partitionNames(refs: immutable.Seq[ActorRef]) = refs.map(_.path.name).partition(_.startsWith("concurrent-test-echo-consumer")) def assertContainsSameElements(lists: (Seq[_], Seq[_])) { val (a, b) = lists a.intersect(b).size should be(a.size) } val (activatedConsumerNames, activatedProducerNames) = partitionNames(activations) val (deactivatedConsumerNames, deactivatedProducerNames) = partitionNames(deactivations) assertContainsSameElements(activatedConsumerNames -> deactivatedConsumerNames) assertContainsSameElements(activatedProducerNames -> deactivatedProducerNames) } finally { system.eventStream.publish(TestEvent.UnMute(eventFilter)) } } } } class ConsumerBroadcast(promise: Promise[(Future[List[List[ActorRef]]], Future[List[List[ActorRef]]])]) extends Actor { private var broadcaster: Option[ActorRef] = None private implicit val ec = context.dispatcher def receive = { case CreateRegistrars(number) ⇒ var allActivationFutures = List[Future[List[ActorRef]]]() var allDeactivationFutures = List[Future[List[ActorRef]]]() val routeePaths = (1 to number).map { i ⇒ val activationListPromise = Promise[List[ActorRef]]() val deactivationListPromise = Promise[List[ActorRef]]() val activationListFuture = activationListPromise.future val deactivationListFuture = deactivationListPromise.future allActivationFutures = allActivationFutures :+ activationListFuture allDeactivationFutures = allDeactivationFutures :+ deactivationListFuture val routee = context.actorOf(Props(classOf[Registrar], i, number, activationListPromise, deactivationListPromise), "registrar-" + i) routee.path.toString } promise.success(Future.sequence(allActivationFutures) -> Future.sequence(allDeactivationFutures)) broadcaster = Some(context.actorOf(BroadcastGroup(routeePaths).props(), "registrarRouter")) case reg: Any ⇒ broadcaster.foreach(_.forward(reg)) } } final case class CreateRegistrars(number: Int) final case class RegisterConsumersAndProducers(endpointUri: String) final case class DeRegisterConsumersAndProducers() final case class Activations() final case class DeActivations() class Registrar(val start: Int, val number: Int, activationsPromise: Promise[List[ActorRef]], deActivationsPromise: Promise[List[ActorRef]]) extends Actor with ActorLogging { private var actorRefs = Set[ActorRef]() private var activations = Set[Future[ActorRef]]() private var deActivations = Set[Future[ActorRef]]() private var index = 0 private val camel = CamelExtension(context.system) private implicit val ec = context.dispatcher private implicit val timeout = Timeout(10.seconds.dilated(context.system)) def receive = { case reg: RegisterConsumersAndProducers ⇒ val i = index val endpoint = reg.endpointUri + start + "-" + i add(new EchoConsumer(endpoint), "concurrent-test-echo-consumer-" + start + "-" + i) add(new TestProducer(endpoint), "concurrent-test-producer-" + start + "-" + i) index = index + 1 if (activations.size == number * 2) { Future.sequence(activations.toList) map activationsPromise.success } case reg: DeRegisterConsumersAndProducers ⇒ actorRefs.foreach { aref ⇒ context.stop(aref) val result = camel.deactivationFutureFor(aref) result.onFailure { case e ⇒ log.error("deactivationFutureFor {} failed: {}", aref, e.getMessage) } deActivations += result if (deActivations.size == number * 2) { Future.sequence(deActivations.toList) map deActivationsPromise.success } } } def add(actor: ⇒ Actor, name: String) { val ref = context.actorOf(Props(actor), name) actorRefs = actorRefs + ref val result = camel.activationFutureFor(ref) result.onFailure { case e ⇒ log.error("activationFutureFor {} failed: {}", ref, e.getMessage) } activations += result } } class EchoConsumer(endpoint: String) extends Actor with Consumer { def endpointUri = endpoint def receive = { case msg: CamelMessage ⇒ sender() ! msg } /** * Returns the route definition handler for creating a custom route to this consumer. * By default it returns an identity function, override this method to * return a custom route definition handler. */ override def onRouteDefinition = (rd: RouteDefinition) ⇒ rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end } class TestProducer(uri: String) extends Actor with Producer { def endpointUri = uri } Other Akka source code examplesHere is a short list of links related to this Akka ConcurrentActivationTest.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.