alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Remoting.scala)

This example Akka source code file (Remoting.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, address, akka, boolean, concurrent, event, none, option, promise, remote, remoting, remotingcommand, some, transport, unit, utilities

The Remoting.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.remote

import akka.actor.SupervisorStrategy._
import akka.actor._
import akka.event.{ Logging, LoggingAdapter }
import akka.pattern.{ gracefulStop, pipe, ask }
import akka.remote.EndpointManager._
import akka.remote.Remoting.TransportSupervisor
import akka.remote.transport.Transport.{ ActorAssociationEventListener, AssociationEventListener, InboundAssociation }
import akka.remote.transport._
import com.typesafe.config.Config
import java.net.URLEncoder
import java.util.concurrent.TimeoutException
import scala.collection.immutable.{ Seq, HashMap }
import scala.concurrent.duration._
import scala.concurrent.{ Promise, Await, Future }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success }
import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.event.AddressTerminatedTopic

/**
 * INTERNAL API
 */
private[remote] object AddressUrlEncoder {
  def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8")
}

/**
 * INTERNAL API
 */
private[remote] final case class RARP(provider: RemoteActorRefProvider) extends Extension {
  def configureDispatcher(props: Props): Props = provider.remoteSettings.configureDispatcher(props)
}
/**
 * INTERNAL API
 */
private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider {

  override def lookup() = RARP

  override def createExtension(system: ExtendedActorSystem) = RARP(system.provider.asInstanceOf[RemoteActorRefProvider])
}

/**
 * INTERNAL API
 * Messages marked with this trait will be sent before other messages when buffering is active.
 * This means that these messages don't obey normal message ordering.
 * It is used for failure detector heartbeat messages.
 */
private[akka] trait PriorityMessage

/**
 * INTERNAL API
 */
private[remote] object Remoting {

  final val EndpointManagerName = "endpointManager"

  def localAddressForRemote(transportMapping: Map[String, Set[(AkkaProtocolTransport, Address)]], remote: Address): Address = {

    transportMapping.get(remote.protocol) match {
      case Some(transports) ⇒
        val responsibleTransports = transports.filter { case (t, _) ⇒ t.isResponsibleFor(remote) }

        responsibleTransports.size match {
          case 0 ⇒
            throw new RemoteTransportException(
              s"No transport is responsible for address: [$remote] although protocol [${remote.protocol}] is available." +
                " Make sure at least one transport is configured to be responsible for the address.",
              null)

          case 1 ⇒
            responsibleTransports.head._2

          case _ ⇒
            throw new RemoteTransportException(
              s"Multiple transports are available for [$remote]: [${responsibleTransports.mkString(",")}]. " +
                "Remoting cannot decide which transport to use to reach the remote system. Change your configuration " +
                "so that only one transport is responsible for the address.",
              null)
        }
      case None ⇒ throw new RemoteTransportException(
        s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString(", ")}]", null)
    }
  }

  final case class RegisterTransportActor(props: Props, name: String) extends NoSerializationVerificationNeeded

  private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
    override def supervisorStrategy = OneForOneStrategy() {
      case NonFatal(e) ⇒ Restart
    }

    def receive = {
      case RegisterTransportActor(props, name) ⇒
        sender() ! context.actorOf(
          RARP(context.system).configureDispatcher(props.withDeploy(Deploy.local)),
          name)
    }
  }

}

/**
 * INTERNAL API
 */
private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {

  @volatile private var endpointManager: Option[ActorRef] = None
  @volatile private var transportMapping: Map[String, Set[(AkkaProtocolTransport, Address)]] = _
  // This is effectively a write-once variable similar to a lazy val. The reason for not using a lazy val is exception
  // handling.
  @volatile var addresses: Set[Address] = _
  // This variable has the same semantics as the addresses variable, in the sense it is written once, and emulates
  // a lazy val
  @volatile var defaultAddress: Address = _

  import provider.remoteSettings._

  val transportSupervisor = system.systemActorOf(
    configureDispatcher(Props[TransportSupervisor]),
    "transports")

  override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote)

  val log: LoggingAdapter = Logging(system.eventStream, "Remoting")
  val eventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel)

  private def notifyError(msg: String, cause: Throwable): Unit =
    eventPublisher.notifyListeners(RemotingErrorEvent(new RemoteTransportException(msg, cause)))

  override def shutdown(): Future[Unit] = {
    import scala.concurrent.ExecutionContext.Implicits.global
    endpointManager match {
      case Some(manager) ⇒
        implicit val timeout = ShutdownTimeout

        def finalize(): Unit = {
          eventPublisher.notifyListeners(RemotingShutdownEvent)
          endpointManager = None
        }

        (manager ? ShutdownAndFlush).mapTo[Boolean].andThen {
          case Success(flushSuccessful) ⇒
            if (!flushSuccessful)
              log.warning("Shutdown finished, but flushing might not have been successful and some messages might have been dropped. " +
                "Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this.")
            finalize()

          case Failure(e) ⇒
            notifyError("Failure during shutdown of remoting.", e)
            finalize()
        } map { _ ⇒ () } // RARP needs only type Unit, not a boolean
      case None ⇒
        log.warning("Remoting is not running. Ignoring shutdown attempt.")
        Future successful (())
    }
  }

  // Start assumes that it cannot be followed by another start() without having a shutdown() first
  override def start(): Unit = {
    endpointManager match {
      case None ⇒
        log.info("Starting remoting")
        val manager: ActorRef = system.systemActorOf(
          configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local),
          Remoting.EndpointManagerName)
        endpointManager = Some(manager)

        try {
          val addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]] = Promise()
          manager ! Listen(addressesPromise)

          val transports: Seq[(AkkaProtocolTransport, Address)] = Await.result(addressesPromise.future,
            StartupTimeout.duration)
          if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)

          transportMapping = transports.groupBy {
            case (transport, _) ⇒ transport.schemeIdentifier
          } map { case (k, v) ⇒ k -> v.toSet }

          defaultAddress = transports.head._2
          addresses = transports.map { _._2 }.toSet

          log.info("Remoting started; listening on addresses :" + addresses.mkString("[", ", ", "]"))

          manager ! StartupFinished
          eventPublisher.notifyListeners(RemotingListenEvent(addresses))

        } catch {
          case e: TimeoutException ⇒
            notifyError("Startup timed out", e)
            throw e
          case NonFatal(e) ⇒
            notifyError("Startup failed", e)
            throw e
        }

      case Some(_) ⇒
        log.warning("Remoting was already started. Ignoring start attempt.")
    }
  }

  override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match {
    case Some(manager) ⇒ manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender)
    case None          ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send remote message but Remoting is not running.", null)
  }

  override def managementCommand(cmd: Any): Future[Boolean] = endpointManager match {
    case Some(manager) ⇒
      import system.dispatcher
      implicit val timeout = CommandAckTimeout
      manager ? ManagementCommand(cmd) map { case ManagementCommandAck(status) ⇒ status }
    case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null)
  }

  override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = endpointManager match {
    case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid)
    case _ ⇒ throw new RemoteTransportExceptionNoStackTrace(
      s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null)
  }

  // Not used anywhere only to keep compatibility with RemoteTransport interface
  protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode

}

/**
 * INTERNAL API
 */
private[remote] object EndpointManager {

  // Messages between Remoting and EndpointManager
  sealed trait RemotingCommand extends NoSerializationVerificationNeeded
  final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand
  case object StartupFinished extends RemotingCommand
  case object ShutdownAndFlush extends RemotingCommand
  final case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None)
    extends RemotingCommand with HasSequenceNumber {
    override def toString = s"Remote message $senderOption -> $recipient"

    // This MUST throw an exception to indicate that we attempted to put a nonsequenced message in one of the
    // acknowledged delivery buffers
    def seq = seqOpt.get
  }
  final case class Quarantine(remoteAddress: Address, uid: Option[Int]) extends RemotingCommand
  final case class ManagementCommand(cmd: Any) extends RemotingCommand
  final case class ManagementCommandAck(status: Boolean)

  // Messages internal to EndpointManager
  case object Prune extends NoSerializationVerificationNeeded
  final case class ListensResult(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]],
                                 results: Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])])
    extends NoSerializationVerificationNeeded
  final case class ListensFailure(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]], cause: Throwable)
    extends NoSerializationVerificationNeeded

  // Helper class to store address pairs
  final case class Link(localAddress: Address, remoteAddress: Address)

  final case class ResendState(uid: Int, buffer: AckedReceiveBuffer[Message])

  sealed trait EndpointPolicy {

    /**
     * Indicates that the policy does not contain an active endpoint, but it is a tombstone of a previous failure
     */
    def isTombstone: Boolean
  }
  final case class Pass(endpoint: ActorRef, uid: Option[Int], refuseUid: Option[Int]) extends EndpointPolicy {
    override def isTombstone: Boolean = false
  }
  final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy {
    override def isTombstone: Boolean = true
  }
  final case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy {
    override def isTombstone: Boolean = true
  }

  // Not threadsafe -- only to be used in HeadActor
  class EndpointRegistry {
    private var addressToWritable = HashMap[Address, EndpointPolicy]()
    private var writableToAddress = HashMap[ActorRef, Address]()
    private var addressToReadonly = HashMap[Address, ActorRef]()
    private var readonlyToAddress = HashMap[ActorRef, Address]()

    def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef =
      addressToWritable.get(address) match {
        case Some(Pass(e, _, _)) ⇒
          throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]")
        case _ ⇒
          addressToWritable += address -> Pass(endpoint, uid, refuseUid)
          writableToAddress += endpoint -> address
          endpoint
      }

    def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = {
      addressToWritable.get(remoteAddress) match {
        case Some(Pass(ep, _, refuseUid)) ⇒ addressToWritable += remoteAddress -> Pass(ep, Some(uid), refuseUid)
        case other                        ⇒ // the GotUid might have lost the race with some failure
      }
    }

    def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef): ActorRef = {
      addressToReadonly += address -> endpoint
      readonlyToAddress += endpoint -> address
      endpoint
    }

    def unregisterEndpoint(endpoint: ActorRef): Unit =
      if (isWritable(endpoint)) {
        val address = writableToAddress(endpoint)
        addressToWritable.get(address) match {
          case Some(policy) if policy.isTombstone ⇒ // There is already a tombstone directive, leave it there
          case _                                  ⇒ addressToWritable -= address
        }
        writableToAddress -= endpoint
      } else if (isReadOnly(endpoint)) {
        addressToReadonly -= readonlyToAddress(endpoint)
        readonlyToAddress -= endpoint
      }

    def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)

    def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match {
      case Some(Pass(_, _, _)) ⇒ true
      case _                   ⇒ false
    }

    def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address)

    def isWritable(endpoint: ActorRef): Boolean = writableToAddress contains endpoint

    def isReadOnly(endpoint: ActorRef): Boolean = readonlyToAddress contains endpoint

    def isQuarantined(address: Address, uid: Int): Boolean = writableEndpointWithPolicyFor(address) match {
      // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
      // known fact that it is quarantined.
      case Some(Quarantined(`uid`, _)) ⇒ true
      case _                           ⇒ false
    }

    def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match {
      // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
      // known fact that it is quarantined.
      case Some(Quarantined(uid, _))   ⇒ Some(uid)
      case Some(Pass(_, _, refuseUid)) ⇒ refuseUid
      case _                           ⇒ None
    }

    /**
     * Marking an endpoint as failed means that we will not try to connect to the remote system within
     * the gated period but it is ok for the remote system to try to connect to us.
     */
    def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit =
      if (isWritable(endpoint)) {
        addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease)
        writableToAddress -= endpoint
      } else if (isReadOnly(endpoint)) {
        addressToReadonly -= readonlyToAddress(endpoint)
        readonlyToAddress -= endpoint
      }

    def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit =
      addressToWritable += address -> Quarantined(uid, timeOfRelease)

    def removePolicy(address: Address): Unit =
      addressToWritable -= address

    def allEndpoints: collection.Iterable[ActorRef] = writableToAddress.keys ++ readonlyToAddress.keys

    def prune(): Unit = {
      addressToWritable = addressToWritable.filter {
        case (_, Gated(timeOfRelease))          ⇒ timeOfRelease.hasTimeLeft
        case (_, Quarantined(_, timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft
        case _                                  ⇒ true
      }
    }
  }
}

/**
 * INTERNAL API
 */
private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Actor
  with RequiresMessageQueue[UnboundedMessageQueueSemantics] {

  import EndpointManager._
  import context.dispatcher

  val settings = new RemoteSettings(conf)
  val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
  val endpointId: Iterator[Int] = Iterator from 0

  val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)

  // Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections
  // will be not part of this map!
  val endpoints = new EndpointRegistry
  // Mapping between transports and the local addresses they listen to
  var transportMapping: Map[Address, AkkaProtocolTransport] = Map()

  def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero
  val pruneInterval: FiniteDuration = if (retryGateEnabled) settings.RetryGateClosedFor * 2 else Duration.Zero
  val pruneTimerCancellable: Option[Cancellable] = if (retryGateEnabled)
    Some(context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune))
  else None

  var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
  var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()

  def handleStashedInbound(endpoint: ActorRef) {
    val stashed = stashedInbound.getOrElse(endpoint, Vector.empty)
    stashedInbound -= endpoint
    stashed foreach (handleInboundAssociation _)
  }

  def keepQuarantinedOr(remoteAddress: Address)(body: ⇒ Unit): Unit = endpoints.refuseUid(remoteAddress) match {
    case Some(uid) ⇒
      log.info("Quarantined address [{}] is still unreachable or has not been restarted. Keeping it quarantined.", remoteAddress)
      // Restoring Quarantine marker overwritten by a Pass(endpoint, refuseUid) pair while probing remote system.
      endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
    case None ⇒ body
  }

  override val supervisorStrategy =
    OneForOneStrategy(loggingEnabled = false) {
      case e @ InvalidAssociation(localAddress, remoteAddress, reason) ⇒
        keepQuarantinedOr(remoteAddress) {
          log.warning("Tried to associate with unreachable remote address [{}]. " +
            "Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}",
            remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage)
          endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
        }
        AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
        Stop

      case ShutDownAssociation(localAddress, remoteAddress, _) ⇒
        keepQuarantinedOr(remoteAddress) {
          log.debug("Remote system with address [{}] has shut down. " +
            "Address is now gated for {} ms, all messages to this address will be delivered to dead letters.",
            remoteAddress, settings.RetryGateClosedFor.toMillis)
          endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
        }
        AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
        Stop

      case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒
        settings.QuarantineDuration match {
          case d: FiniteDuration ⇒
            endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
            eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
          case _ ⇒ // disabled
        }
        AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
        Stop

      case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
        keepQuarantinedOr(remoteAddress) {
          log.warning("Association to [{}] with unknown UID is irrecoverably failed. " +
            "Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
            remoteAddress, settings.RetryGateClosedFor.toMillis)
          endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
        }
        AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
        Stop

      case NonFatal(e) ⇒
        // logging
        e match {
          case _: EndpointDisassociatedException | _: EndpointAssociationException ⇒ // no logging
          case _ ⇒ log.error(e, e.getMessage)
        }
        Stop
    }

  // Structure for saving reliable delivery state across restarts of Endpoints
  val receiveBuffers = new ConcurrentHashMap[Link, ResendState]()

  def receive = {
    case Listen(addressesPromise) ⇒
      listens map { ListensResult(addressesPromise, _) } recover {
        case NonFatal(e) ⇒ ListensFailure(addressesPromise, e)
      } pipeTo self
    case ListensResult(addressesPromise, results) ⇒
      transportMapping = results.groupBy {
        case (_, transportAddress, _) ⇒ transportAddress
      } map {
        case (a, t) if t.size > 1 ⇒
          throw new RemoteTransportException(s"There are more than one transports listening on local address [$a]", null)
        case (a, t) ⇒ a -> t.head._1
      }
      // Register to each transport as listener and collect mapping to addresses
      val transportsAndAddresses = results map {
        case (transport, address, promise) ⇒
          promise.success(ActorAssociationEventListener(self))
          transport -> address
      }
      addressesPromise.success(transportsAndAddresses)
    case ListensFailure(addressesPromise, cause) ⇒
      addressesPromise.failure(cause)
    case ia: InboundAssociation ⇒
      context.system.scheduler.scheduleOnce(10.milliseconds, self, ia)
    case ManagementCommand(_) ⇒
      sender() ! ManagementCommandAck(status = false)
    case StartupFinished ⇒
      context.become(accepting)
    case ShutdownAndFlush ⇒
      sender() ! true
      context.stop(self) // Nothing to flush at this point
  }

  val accepting: Receive = {
    case ManagementCommand(cmd) ⇒
      val allStatuses = transportMapping.values map { transport ⇒
        transport.managementCommand(cmd)
      }
      Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender()

    case Quarantine(address, uidOption) ⇒
      // Stop writers
      endpoints.writableEndpointWithPolicyFor(address) match {
        case Some(Pass(endpoint, _, _)) ⇒
          context.stop(endpoint)
          if (uidOption.isEmpty) {
            log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
              "address cannot be quarantined without knowing the UID, gating instead for {} ms.",
              address, settings.RetryGateClosedFor.toMillis)
            endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor)
          }
        case _ ⇒ // nothing to stop
      }
      // Stop inbound read-only associations
      endpoints.readOnlyEndpointFor(address) match {
        case Some(endpoint) ⇒ context.stop(endpoint)
        case _              ⇒ // nothing to stop
      }
      uidOption foreach { uid ⇒
        endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration)
        eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
      }

    case s @ Send(message, senderOption, recipientRef, _) ⇒
      val recipientAddress = recipientRef.path.address

      def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef =
        endpoints.registerWritableEndpoint(
          recipientAddress,
          uid = None,
          refuseUid,
          createEndpoint(
            recipientAddress,
            recipientRef.localAddressToUse,
            transportMapping(recipientRef.localAddressToUse),
            settings,
            handleOption = None,
            writing = true,
            refuseUid))

      endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
        case Some(Pass(endpoint, _, _)) ⇒
          endpoint ! s
        case Some(Gated(timeOfRelease)) ⇒
          if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s
          else extendedSystem.deadLetters ! s
        case Some(Quarantined(uid, _)) ⇒
          // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have
          // the Quarantined tombstone and we know what UID we don't want to accept, so use it.
          createAndRegisterWritingEndpoint(refuseUid = Some(uid)) ! s
        case None ⇒
          createAndRegisterWritingEndpoint(refuseUid = None) ! s

      }

    case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒
      handleInboundAssociation(ia)
    case EndpointWriter.StoppedReading(endpoint) ⇒
      acceptPendingReader(takingOverFrom = endpoint)
    case Terminated(endpoint) ⇒
      acceptPendingReader(takingOverFrom = endpoint)
      endpoints.unregisterEndpoint(endpoint)
      handleStashedInbound(endpoint)
    case EndpointWriter.TookOver(endpoint, handle) ⇒
      removePendingReader(takingOverFrom = endpoint, withHandle = handle)
    case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) ⇒
      endpoints.registerWritableEndpointUid(remoteAddress, uid)
      handleStashedInbound(sender)
    case Prune ⇒
      endpoints.prune()
    case ShutdownAndFlush ⇒
      // Shutdown all endpoints and signal to sender() when ready (and whether all endpoints were shut down gracefully)

      def shutdownAll[T](resources: TraversableOnce[T])(shutdown: T ⇒ Future[Boolean]): Future[Boolean] = {
        (Future sequence resources.map(shutdown)) map { _.forall(identity) } recover {
          case NonFatal(_) ⇒ false
        }
      }

      (for {
        // The construction of the future for shutdownStatus has to happen after the flushStatus future has been finished
        // so that endpoints are shut down before transports.
        flushStatus ← shutdownAll(endpoints.allEndpoints)(gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop))
        shutdownStatus ← shutdownAll(transportMapping.values)(_.shutdown())
      } yield flushStatus && shutdownStatus) pipeTo sender()

      pendingReadHandoffs.valuesIterator foreach (_.disassociate(AssociationHandle.Shutdown))

      // Ignore all other writes
      context.become(flushing)
  }

  def flushing: Receive = {
    case s: Send                                   ⇒ extendedSystem.deadLetters ! s
    case InboundAssociation(h: AkkaProtocolHandle) ⇒ h.disassociate(AssociationHandle.Shutdown)
    case Terminated(_)                             ⇒ // why should we care now?
  }

  def handleInboundAssociation(ia: InboundAssociation): Unit = ia match {
    case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
      case Some(endpoint) ⇒
        pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
        pendingReadHandoffs += endpoint -> handle
        endpoint ! EndpointWriter.TakeOver(handle, self)
      case None ⇒
        if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
          handle.disassociate(AssociationHandle.Quarantined)
        else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
          case Some(Pass(ep, None, _)) ⇒
            stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
          case Some(Pass(ep, Some(uid), _)) ⇒
            if (handle.handshakeInfo.uid == uid) {
              pendingReadHandoffs.get(ep) foreach (_.disassociate())
              pendingReadHandoffs += ep -> handle
              ep ! EndpointWriter.StopReading(ep, self)
            } else {
              context.stop(ep)
              endpoints.unregisterEndpoint(ep)
              pendingReadHandoffs -= ep
              createAndRegisterEndpoint(handle, refuseUid = Some(uid))
            }
          case state ⇒
            createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress))
        }
    }
  }

  private def createAndRegisterEndpoint(handle: AkkaProtocolHandle, refuseUid: Option[Int]): Unit = {
    val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
    eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
    val endpoint = createEndpoint(
      handle.remoteAddress,
      handle.localAddress,
      transportMapping(handle.localAddress),
      settings,
      Some(handle),
      writing,
      refuseUid = refuseUid)
    if (writing)
      endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint)
    else {
      endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
      endpoints.removePolicy(handle.remoteAddress)
    }
  }

  private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
    /*
     * Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
     * like the following:
     *   AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
     *
     * The transports variable contains only the heads of each chains (the AkkaProtocolTransport instances).
     */
    val transports: Seq[AkkaProtocolTransport] = for ((fqn, adapters, config) ← settings.Transports) yield {

      val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config)

      // Loads the driver -- the bottom element of the chain.
      // The chain at this point:
      //   Driver
      val driver = extendedSystem.dynamicAccess
        .createInstanceFor[Transport](fqn, args).recover({

          case exception ⇒ throw new IllegalArgumentException(
            s"Cannot instantiate transport [$fqn]. " +
              "Make sure it extends [akka.remote.transport.Transport] and has constructor with " +
              "[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters", exception)

        }).get

      // Iteratively decorates the bottom level driver with a list of adapters.
      // The chain at this point:
      //   Adapter <- ... <- Adapter <- Driver
      val wrappedTransport =
        adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider }.foldLeft(driver) {
          (t: Transport, provider: TransportAdapterProvider) ⇒
            // The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
            provider.create(t, context.system.asInstanceOf[ExtendedActorSystem])
        }

      // Apply AkkaProtocolTransport wrapper to the end of the chain
      // The chain at this point:
      //   AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
      new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec)
    }

    // Collect all transports, listen addresses and listener promises in one future
    Future.sequence(transports.map { transport ⇒
      transport.listen map { case (address, listenerPromise) ⇒ (transport, address, listenerPromise) }
    })
  }

  private def acceptPendingReader(takingOverFrom: ActorRef): Unit = {
    if (pendingReadHandoffs.contains(takingOverFrom)) {
      val handle = pendingReadHandoffs(takingOverFrom)
      pendingReadHandoffs -= takingOverFrom
      eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
      val endpoint = createEndpoint(
        handle.remoteAddress,
        handle.localAddress,
        transportMapping(handle.localAddress),
        settings,
        Some(handle),
        writing = false,
        refuseUid = None)
      endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
    }
  }

  private def removePendingReader(takingOverFrom: ActorRef, withHandle: AkkaProtocolHandle): Unit = {
    if (pendingReadHandoffs.get(takingOverFrom).exists(handle ⇒ handle == withHandle))
      pendingReadHandoffs -= takingOverFrom
  }

  private def createEndpoint(remoteAddress: Address,
                             localAddress: Address,
                             transport: AkkaProtocolTransport,
                             endpointSettings: RemoteSettings,
                             handleOption: Option[AkkaProtocolHandle],
                             writing: Boolean,
                             refuseUid: Option[Int]): ActorRef = {
    assert(transportMapping contains localAddress)
    assert(writing || refuseUid.isEmpty)

    if (writing) context.watch(context.actorOf(RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props(
      handleOption,
      localAddress,
      remoteAddress,
      refuseUid,
      transport,
      endpointSettings,
      AkkaPduProtobufCodec,
      receiveBuffers)).withDeploy(Deploy.local),
      "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
    else context.watch(context.actorOf(RARP(extendedSystem).configureDispatcher(EndpointWriter.props(
      handleOption,
      localAddress,
      remoteAddress,
      refuseUid,
      transport,
      endpointSettings,
      AkkaPduProtobufCodec,
      receiveBuffers,
      reliableDeliverySupervisor = None)).withDeploy(Deploy.local),
      "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
  }

  override def postStop(): Unit = {
    pruneTimerCancellable.foreach { _.cancel() }
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka Remoting.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.