|
Akka/Scala example source code file (ActivationTracker.scala)
The ActivationTracker.scala Akka example source code/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.camel.internal import akka.actor._ import collection.mutable.WeakHashMap import akka.camel.internal.ActivationProtocol._ /** * INTERNAL API * An actor that tracks activation and de-activation of endpoints. */ private[camel] class ActivationTracker extends Actor with ActorLogging { val activations = new WeakHashMap[ActorRef, ActivationStateMachine] /** * A state machine that keeps track of the endpoint activation status of an actor. */ class ActivationStateMachine { type State = PartialFunction[ActivationMessage, Unit] var receive: State = notActivated() /** * Not activated state * @return a partial function that handles messages in the 'not activated' state */ def notActivated(): State = { var awaitingActivation = List[ActorRef]() var awaitingDeActivation = List[ActorRef]() { case AwaitActivation(ref) ⇒ awaitingActivation ::= sender() case AwaitDeActivation(ref) ⇒ awaitingDeActivation ::= sender() case msg @ EndpointActivated(ref) ⇒ awaitingActivation.foreach(_ ! msg) receive = activated(awaitingDeActivation) case EndpointFailedToActivate(ref, cause) ⇒ awaitingActivation.foreach(_ ! EndpointFailedToActivate(ref, cause)) receive = failedToActivate(cause) } } /** * Activated state. * @param currentAwaitingDeActivation the current <code>ActorRef</code>s awaiting de-activation * @return a partial function that handles messages in the 'activated' state */ def activated(currentAwaitingDeActivation: List[ActorRef]): State = { var awaitingDeActivation = currentAwaitingDeActivation { case AwaitActivation(ref) ⇒ sender() ! EndpointActivated(ref) case AwaitDeActivation(ref) ⇒ awaitingDeActivation ::= sender() case msg @ EndpointDeActivated(ref) ⇒ awaitingDeActivation foreach (_ ! msg) receive = deactivated case msg @ EndpointFailedToDeActivate(ref, cause) ⇒ awaitingDeActivation foreach (_ ! msg) receive = failedToDeActivate(cause) } } /** * De-activated state * @return a partial function that handles messages in the 'de-activated' state */ def deactivated: State = { // deactivated means it was activated at some point, so tell sender() it was activated case AwaitActivation(ref) ⇒ sender() ! EndpointActivated(ref) case AwaitDeActivation(ref) ⇒ sender() ! EndpointDeActivated(ref) //resurrected at restart. case msg @ EndpointActivated(ref) ⇒ receive = activated(Nil) } /** * Failed to activate state * @param cause the cause for the failure * @return a partial function that handles messages in 'failed to activate' state */ def failedToActivate(cause: Throwable): State = { case AwaitActivation(ref) ⇒ sender() ! EndpointFailedToActivate(ref, cause) case AwaitDeActivation(ref) ⇒ sender() ! EndpointFailedToActivate(ref, cause) case EndpointDeActivated(_) ⇒ // the de-register at termination always sends a de-activated when the cleanup is done. ignoring. } /** * Failed to de-activate state * @param cause the cause for the failure * @return a partial function that handles messages in 'failed to de-activate' state */ def failedToDeActivate(cause: Throwable): State = { case AwaitActivation(ref) ⇒ sender() ! EndpointActivated(ref) case AwaitDeActivation(ref) ⇒ sender() ! EndpointFailedToDeActivate(ref, cause) case EndpointDeActivated(_) ⇒ // the de-register at termination always sends a de-activated when the cleanup is done. ignoring. } } override def receive = { case msg @ ActivationMessage(ref) ⇒ (activations.getOrElseUpdate(ref, new ActivationStateMachine).receive orElse logStateWarning(ref))(msg) } private[this] def logStateWarning(actorRef: ActorRef): Receive = { case msg ⇒ log.warning("Message [{}] not expected in current state of actor [{}]", msg, actorRef) } } /** * INTERNAL API * A request message to the ActivationTracker for the status of activation. * @param ref the actorRef */ private[camel] final case class AwaitActivation(ref: ActorRef) extends ActivationMessage(ref) /** * INTERNAL API * A request message to the ActivationTracker for the status of de-activation. * @param ref the actorRef */ private[camel] final case class AwaitDeActivation(ref: ActorRef) extends ActivationMessage(ref) Other Akka source code examplesHere is a short list of links related to this Akka ActivationTracker.scala source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.