Akka/Scala example source code file (Remoting.scala)

This example Akka source code file (Remoting.scala) is included in my "Source Code Warehouse" project.

All credit for the original source code belongs to; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, address, akka, boolean, concurrent, event, none, option, promise, remote, remoting, remotingcommand, some, transport, unit, utilities

The Remoting.scala Akka example source code

 * Copyright (C) 2009-2014 Typesafe Inc. <>
package akka.remote

import akka.event.{ Logging, LoggingAdapter }
import akka.pattern.{ gracefulStop, pipe, ask }
import akka.remote.EndpointManager._
import akka.remote.Remoting.TransportSupervisor
import akka.remote.transport.Transport.{ ActorAssociationEventListener, AssociationEventListener, InboundAssociation }
import akka.remote.transport._
import com.typesafe.config.Config
import java.util.concurrent.TimeoutException
import scala.collection.immutable.{ Seq, HashMap }
import scala.concurrent.duration._
import scala.concurrent.{ Promise, Await, Future }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success }
import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.event.AddressTerminatedTopic

private[remote] object AddressUrlEncoder {
  def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8")

private[remote] final case class RARP(provider: RemoteActorRefProvider) extends Extension {
  def configureDispatcher(props: Props): Props = provider.remoteSettings.configureDispatcher(props)
private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider {

  override def lookup() = RARP

  override def createExtension(system: ExtendedActorSystem) = RARP(system.provider.asInstanceOf[RemoteActorRefProvider])

 * Messages marked with this trait will be sent before other messages when buffering is active.
 * This means that these messages don't obey normal message ordering.
 * It is used for failure detector heartbeat messages.
private[akka] trait PriorityMessage

private[remote] object Remoting {

  final val EndpointManagerName = "endpointManager"

  def localAddressForRemote(transportMapping: Map[String, Set[(AkkaProtocolTransport, Address)]], remote: Address): Address = {

    transportMapping.get(remote.protocol) match {
      case Some(transports) ⇒
        val responsibleTransports = transports.filter { case (t, _) ⇒ t.isResponsibleFor(remote) }

        responsibleTransports.size match {
          case 0 ⇒
            throw new RemoteTransportException(
              s"No transport is responsible for address: [$remote] although protocol [${remote.protocol}] is available." +
                " Make sure at least one transport is configured to be responsible for the address.",

          case 1 ⇒

          case _ ⇒
            throw new RemoteTransportException(
              s"Multiple transports are available for [$remote]: [${responsibleTransports.mkString(",")}]. " +
                "Remoting cannot decide which transport to use to reach the remote system. Change your configuration " +
                "so that only one transport is responsible for the address.",
      case None ⇒ throw new RemoteTransportException(
        s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString(", ")}]", null)

  final case class RegisterTransportActor(props: Props, name: String) extends NoSerializationVerificationNeeded

  private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
    override def supervisorStrategy = OneForOneStrategy() {
      case NonFatal(e) ⇒ Restart

    def receive = {
      case RegisterTransportActor(props, name) ⇒
        sender() ! context.actorOf(


private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {

  @volatile private var endpointManager: Option[ActorRef] = None
  @volatile private var transportMapping: Map[String, Set[(AkkaProtocolTransport, Address)]] = _
  // This is effectively a write-once variable similar to a lazy val. The reason for not using a lazy val is exception
  // handling.
  @volatile var addresses: Set[Address] = _
  // This variable has the same semantics as the addresses variable, in the sense it is written once, and emulates
  // a lazy val
  @volatile var defaultAddress: Address = _

  import provider.remoteSettings._

  val transportSupervisor = system.systemActorOf(

  override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote)

  val log: LoggingAdapter = Logging(system.eventStream, "Remoting")
  val eventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel)

  private def notifyError(msg: String, cause: Throwable): Unit =
    eventPublisher.notifyListeners(RemotingErrorEvent(new RemoteTransportException(msg, cause)))

  override def shutdown(): Future[Unit] = {
    endpointManager match {
      case Some(manager) ⇒
        implicit val timeout = ShutdownTimeout

        def finalize(): Unit = {
          endpointManager = None

        (manager ? ShutdownAndFlush).mapTo[Boolean].andThen {
          case Success(flushSuccessful) ⇒
            if (!flushSuccessful)
              log.warning("Shutdown finished, but flushing might not have been successful and some messages might have been dropped. " +
                "Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this.")

          case Failure(e) ⇒
            notifyError("Failure during shutdown of remoting.", e)
        } map { _ ⇒ () } // RARP needs only type Unit, not a boolean
      case None ⇒
        log.warning("Remoting is not running. Ignoring shutdown attempt.")
        Future successful (())

  // Start assumes that it cannot be followed by another start() without having a shutdown() first
  override def start(): Unit = {
    endpointManager match {
      case None ⇒"Starting remoting")
        val manager: ActorRef = system.systemActorOf(
          configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local),
        endpointManager = Some(manager)

        try {
          val addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]] = Promise()
          manager ! Listen(addressesPromise)

          val transports: Seq[(AkkaProtocolTransport, Address)] = Await.result(addressesPromise.future,
          if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)

          transportMapping = transports.groupBy {
            case (transport, _) ⇒ transport.schemeIdentifier
          } map { case (k, v) ⇒ k -> v.toSet }

          defaultAddress = transports.head._2
          addresses = { _._2 }.toSet

"Remoting started; listening on addresses :" + addresses.mkString("[", ", ", "]"))

          manager ! StartupFinished

        } catch {
          case e: TimeoutException ⇒
            notifyError("Startup timed out", e)
            throw e
          case NonFatal(e) ⇒
            notifyError("Startup failed", e)
            throw e

      case Some(_) ⇒
        log.warning("Remoting was already started. Ignoring start attempt.")

  override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match {
    case Some(manager) ⇒ manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender)
    case None          ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send remote message but Remoting is not running.", null)

  override def managementCommand(cmd: Any): Future[Boolean] = endpointManager match {
    case Some(manager) ⇒
      import system.dispatcher
      implicit val timeout = CommandAckTimeout
      manager ? ManagementCommand(cmd) map { case ManagementCommandAck(status) ⇒ status }
    case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null)

  override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = endpointManager match {
    case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid)
    case _ ⇒ throw new RemoteTransportExceptionNoStackTrace(
      s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null)

  // Not used anywhere only to keep compatibility with RemoteTransport interface
  protected def useUntrustedMode: Boolean = provider.remoteSettings.UntrustedMode


private[remote] object EndpointManager {

  // Messages between Remoting and EndpointManager
  sealed trait RemotingCommand extends NoSerializationVerificationNeeded
  final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand
  case object StartupFinished extends RemotingCommand
  case object ShutdownAndFlush extends RemotingCommand
  final case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None)
    extends RemotingCommand with HasSequenceNumber {
    override def toString = s"Remote message $senderOption -> $recipient"

    // This MUST throw an exception to indicate that we attempted to put a nonsequenced message in one of the
    // acknowledged delivery buffers
    def seq = seqOpt.get
  final case class Quarantine(remoteAddress: Address, uid: Option[Int]) extends RemotingCommand
  final case class ManagementCommand(cmd: Any) extends RemotingCommand
  final case class ManagementCommandAck(status: Boolean)

  // Messages internal to EndpointManager
  case object Prune extends NoSerializationVerificationNeeded
  final case class ListensResult(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]],
                                 results: Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])])
    extends NoSerializationVerificationNeeded
  final case class ListensFailure(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]], cause: Throwable)
    extends NoSerializationVerificationNeeded

  // Helper class to store address pairs
  final case class Link(localAddress: Address, remoteAddress: Address)

  final case class ResendState(uid: Int, buffer: AckedReceiveBuffer[Message])

  sealed trait EndpointPolicy {

     * Indicates that the policy does not contain an active endpoint, but it is a tombstone of a previous failure
    def isTombstone: Boolean
  final case class Pass(endpoint: ActorRef, uid: Option[Int], refuseUid: Option[Int]) extends EndpointPolicy {
    override def isTombstone: Boolean = false
  final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy {
    override def isTombstone: Boolean = true
  final case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy {
    override def isTombstone: Boolean = true

  // Not threadsafe -- only to be used in HeadActor
  class EndpointRegistry {
    private var addressToWritable = HashMap[Address, EndpointPolicy]()
    private var writableToAddress = HashMap[ActorRef, Address]()
    private var addressToReadonly = HashMap[Address, ActorRef]()
    private var readonlyToAddress = HashMap[ActorRef, Address]()

    def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef =
      addressToWritable.get(address) match {
        case Some(Pass(e, _, _)) ⇒
          throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]")
        case _ ⇒
          addressToWritable += address -> Pass(endpoint, uid, refuseUid)
          writableToAddress += endpoint -> address

    def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = {
      addressToWritable.get(remoteAddress) match {
        case Some(Pass(ep, _, refuseUid)) ⇒ addressToWritable += remoteAddress -> Pass(ep, Some(uid), refuseUid)
        case other                        ⇒ // the GotUid might have lost the race with some failure

    def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef): ActorRef = {
      addressToReadonly += address -> endpoint
      readonlyToAddress += endpoint -> address

    def unregisterEndpoint(endpoint: ActorRef): Unit =
      if (isWritable(endpoint)) {
        val address = writableToAddress(endpoint)
        addressToWritable.get(address) match {
          case Some(policy) if policy.isTombstone ⇒ // There is already a tombstone directive, leave it there
          case _                                  ⇒ addressToWritable -= address
        writableToAddress -= endpoint
      } else if (isReadOnly(endpoint)) {
        addressToReadonly -= readonlyToAddress(endpoint)
        readonlyToAddress -= endpoint

    def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)

    def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match {
      case Some(Pass(_, _, _)) ⇒ true
      case _                   ⇒ false

    def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address)

    def isWritable(endpoint: ActorRef): Boolean = writableToAddress contains endpoint

    def isReadOnly(endpoint: ActorRef): Boolean = readonlyToAddress contains endpoint

    def isQuarantined(address: Address, uid: Int): Boolean = writableEndpointWithPolicyFor(address) match {
      // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
      // known fact that it is quarantined.
      case Some(Quarantined(`uid`, _)) ⇒ true
      case _                           ⇒ false

    def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match {
      // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
      // known fact that it is quarantined.
      case Some(Quarantined(uid, _))   ⇒ Some(uid)
      case Some(Pass(_, _, refuseUid)) ⇒ refuseUid
      case _                           ⇒ None

     * Marking an endpoint as failed means that we will not try to connect to the remote system within
     * the gated period but it is ok for the remote system to try to connect to us.
    def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit =
      if (isWritable(endpoint)) {
        addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease)
        writableToAddress -= endpoint
      } else if (isReadOnly(endpoint)) {
        addressToReadonly -= readonlyToAddress(endpoint)
        readonlyToAddress -= endpoint

    def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit =
      addressToWritable += address -> Quarantined(uid, timeOfRelease)

    def removePolicy(address: Address): Unit =
      addressToWritable -= address

    def allEndpoints: collection.Iterable[ActorRef] = writableToAddress.keys ++ readonlyToAddress.keys

    def prune(): Unit = {
      addressToWritable = addressToWritable.filter {
        case (_, Gated(timeOfRelease))          ⇒ timeOfRelease.hasTimeLeft
        case (_, Quarantined(_, timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft
        case _                                  ⇒ true

private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Actor
  with RequiresMessageQueue[UnboundedMessageQueueSemantics] {

  import EndpointManager._
  import context.dispatcher

  val settings = new RemoteSettings(conf)
  val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
  val endpointId: Iterator[Int] = Iterator from 0

  val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)

  // Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections
  // will be not part of this map!
  val endpoints = new EndpointRegistry
  // Mapping between transports and the local addresses they listen to
  var transportMapping: Map[Address, AkkaProtocolTransport] = Map()

  def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero
  val pruneInterval: FiniteDuration = if (retryGateEnabled) settings.RetryGateClosedFor * 2 else Duration.Zero
  val pruneTimerCancellable: Option[Cancellable] = if (retryGateEnabled)
    Some(context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune))
  else None

  var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
  var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()

  def handleStashedInbound(endpoint: ActorRef) {
    val stashed = stashedInbound.getOrElse(endpoint, Vector.empty)
    stashedInbound -= endpoint
    stashed foreach (handleInboundAssociation _)

  def keepQuarantinedOr(remoteAddress: Address)(body: ⇒ Unit): Unit = endpoints.refuseUid(remoteAddress) match {
    case Some(uid) ⇒"Quarantined address [{}] is still unreachable or has not been restarted. Keeping it quarantined.", remoteAddress)
      // Restoring Quarantine marker overwritten by a Pass(endpoint, refuseUid) pair while probing remote system.
      endpoints.markAsQuarantined(remoteAddress, uid, + settings.QuarantineDuration)
    case None ⇒ body

  override val supervisorStrategy =
    OneForOneStrategy(loggingEnabled = false) {
      case e @ InvalidAssociation(localAddress, remoteAddress, reason) ⇒
        keepQuarantinedOr(remoteAddress) {
          log.warning("Tried to associate with unreachable remote address [{}]. " +
            "Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}",
            remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage)
          endpoints.markAsFailed(sender(), + settings.RetryGateClosedFor)

      case ShutDownAssociation(localAddress, remoteAddress, _) ⇒
        keepQuarantinedOr(remoteAddress) {
          log.debug("Remote system with address [{}] has shut down. " +
            "Address is now gated for {} ms, all messages to this address will be delivered to dead letters.",
            remoteAddress, settings.RetryGateClosedFor.toMillis)
          endpoints.markAsFailed(sender(), + settings.RetryGateClosedFor)

      case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒
        settings.QuarantineDuration match {
          case d: FiniteDuration ⇒
            endpoints.markAsQuarantined(remoteAddress, uid, + d)
            eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
          case _ ⇒ // disabled

      case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
        keepQuarantinedOr(remoteAddress) {
          log.warning("Association to [{}] with unknown UID is irrecoverably failed. " +
            "Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
            remoteAddress, settings.RetryGateClosedFor.toMillis)
          endpoints.markAsFailed(sender(), + settings.RetryGateClosedFor)

      case NonFatal(e) ⇒
        // logging
        e match {
          case _: EndpointDisassociatedException | _: EndpointAssociationException ⇒ // no logging
          case _ ⇒ log.error(e, e.getMessage)

  // Structure for saving reliable delivery state across restarts of Endpoints
  val receiveBuffers = new ConcurrentHashMap[Link, ResendState]()

  def receive = {
    case Listen(addressesPromise) ⇒
      listens map { ListensResult(addressesPromise, _) } recover {
        case NonFatal(e) ⇒ ListensFailure(addressesPromise, e)
      } pipeTo self
    case ListensResult(addressesPromise, results) ⇒
      transportMapping = results.groupBy {
        case (_, transportAddress, _) ⇒ transportAddress
      } map {
        case (a, t) if t.size > 1 ⇒
          throw new RemoteTransportException(s"There are more than one transports listening on local address [$a]", null)
        case (a, t) ⇒ a -> t.head._1
      // Register to each transport as listener and collect mapping to addresses
      val transportsAndAddresses = results map {
        case (transport, address, promise) ⇒
          transport -> address
    case ListensFailure(addressesPromise, cause) ⇒
    case ia: InboundAssociation ⇒
      context.system.scheduler.scheduleOnce(10.milliseconds, self, ia)
    case ManagementCommand(_) ⇒
      sender() ! ManagementCommandAck(status = false)
    case StartupFinished ⇒
    case ShutdownAndFlush ⇒
      sender() ! true
      context.stop(self) // Nothing to flush at this point

  val accepting: Receive = {
    case ManagementCommand(cmd) ⇒
      val allStatuses = transportMapping.values map { transport ⇒
      Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender()

    case Quarantine(address, uidOption) ⇒
      // Stop writers
      endpoints.writableEndpointWithPolicyFor(address) match {
        case Some(Pass(endpoint, _, _)) ⇒
          if (uidOption.isEmpty) {
            log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
              "address cannot be quarantined without knowing the UID, gating instead for {} ms.",
              address, settings.RetryGateClosedFor.toMillis)
            endpoints.markAsFailed(endpoint, + settings.RetryGateClosedFor)
        case _ ⇒ // nothing to stop
      // Stop inbound read-only associations
      endpoints.readOnlyEndpointFor(address) match {
        case Some(endpoint) ⇒ context.stop(endpoint)
        case _              ⇒ // nothing to stop
      uidOption foreach { uid ⇒
        endpoints.markAsQuarantined(address, uid, + settings.QuarantineDuration)
        eventPublisher.notifyListeners(QuarantinedEvent(address, uid))

    case s @ Send(message, senderOption, recipientRef, _) ⇒
      val recipientAddress = recipientRef.path.address

      def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef =
          uid = None,
            handleOption = None,
            writing = true,

      endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
        case Some(Pass(endpoint, _, _)) ⇒
          endpoint ! s
        case Some(Gated(timeOfRelease)) ⇒
          if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s
          else extendedSystem.deadLetters ! s
        case Some(Quarantined(uid, _)) ⇒
          // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have
          // the Quarantined tombstone and we know what UID we don't want to accept, so use it.
          createAndRegisterWritingEndpoint(refuseUid = Some(uid)) ! s
        case None ⇒
          createAndRegisterWritingEndpoint(refuseUid = None) ! s


    case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒
    case EndpointWriter.StoppedReading(endpoint) ⇒
      acceptPendingReader(takingOverFrom = endpoint)
    case Terminated(endpoint) ⇒
      acceptPendingReader(takingOverFrom = endpoint)
    case EndpointWriter.TookOver(endpoint, handle) ⇒
      removePendingReader(takingOverFrom = endpoint, withHandle = handle)
    case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) ⇒
      endpoints.registerWritableEndpointUid(remoteAddress, uid)
    case Prune ⇒
    case ShutdownAndFlush ⇒
      // Shutdown all endpoints and signal to sender() when ready (and whether all endpoints were shut down gracefully)

      def shutdownAll[T](resources: TraversableOnce[T])(shutdown: T ⇒ Future[Boolean]): Future[Boolean] = {
        (Future sequence map { _.forall(identity) } recover {
          case NonFatal(_) ⇒ false

      (for {
        // The construction of the future for shutdownStatus has to happen after the flushStatus future has been finished
        // so that endpoints are shut down before transports.
        flushStatus ← shutdownAll(endpoints.allEndpoints)(gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop))
        shutdownStatus ← shutdownAll(transportMapping.values)(_.shutdown())
      } yield flushStatus && shutdownStatus) pipeTo sender()

      pendingReadHandoffs.valuesIterator foreach (_.disassociate(AssociationHandle.Shutdown))

      // Ignore all other writes

  def flushing: Receive = {
    case s: Send                                   ⇒ extendedSystem.deadLetters ! s
    case InboundAssociation(h: AkkaProtocolHandle) ⇒ h.disassociate(AssociationHandle.Shutdown)
    case Terminated(_)                             ⇒ // why should we care now?

  def handleInboundAssociation(ia: InboundAssociation): Unit = ia match {
    case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
      case Some(endpoint) ⇒
        pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
        pendingReadHandoffs += endpoint -> handle
        endpoint ! EndpointWriter.TakeOver(handle, self)
      case None ⇒
        if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
        else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
          case Some(Pass(ep, None, _)) ⇒
            stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
          case Some(Pass(ep, Some(uid), _)) ⇒
            if (handle.handshakeInfo.uid == uid) {
              pendingReadHandoffs.get(ep) foreach (_.disassociate())
              pendingReadHandoffs += ep -> handle
              ep ! EndpointWriter.StopReading(ep, self)
            } else {
              pendingReadHandoffs -= ep
              createAndRegisterEndpoint(handle, refuseUid = Some(uid))
          case state ⇒
            createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress))

  private def createAndRegisterEndpoint(handle: AkkaProtocolHandle, refuseUid: Option[Int]): Unit = {
    val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
    eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
    val endpoint = createEndpoint(
      refuseUid = refuseUid)
    if (writing)
      endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint)
    else {
      endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)

  private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
     * Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
     * like the following:
     *   AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
     * The transports variable contains only the heads of each chains (the AkkaProtocolTransport instances).
    val transports: Seq[AkkaProtocolTransport] = for ((fqn, adapters, config) ← settings.Transports) yield {

      val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config)

      // Loads the driver -- the bottom element of the chain.
      // The chain at this point:
      //   Driver
      val driver = extendedSystem.dynamicAccess
        .createInstanceFor[Transport](fqn, args).recover({

          case exception ⇒ throw new IllegalArgumentException(
            s"Cannot instantiate transport [$fqn]. " +
              "Make sure it extends [akka.remote.transport.Transport] and has constructor with " +
              "[] and [com.typesafe.config.Config] parameters", exception)


      // Iteratively decorates the bottom level driver with a list of adapters.
      // The chain at this point:
      //   Adapter <- ... <- Adapter <- Driver
      val wrappedTransport = { TransportAdaptersExtension.get(context.system).getAdapterProvider }.foldLeft(driver) {
          (t: Transport, provider: TransportAdapterProvider) ⇒
            // The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
            provider.create(t, context.system.asInstanceOf[ExtendedActorSystem])

      // Apply AkkaProtocolTransport wrapper to the end of the chain
      // The chain at this point:
      //   AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
      new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec)

    // Collect all transports, listen addresses and listener promises in one future
    Future.sequence( { transport ⇒
      transport.listen map { case (address, listenerPromise) ⇒ (transport, address, listenerPromise) }

  private def acceptPendingReader(takingOverFrom: ActorRef): Unit = {
    if (pendingReadHandoffs.contains(takingOverFrom)) {
      val handle = pendingReadHandoffs(takingOverFrom)
      pendingReadHandoffs -= takingOverFrom
      eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
      val endpoint = createEndpoint(
        writing = false,
        refuseUid = None)
      endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)

  private def removePendingReader(takingOverFrom: ActorRef, withHandle: AkkaProtocolHandle): Unit = {
    if (pendingReadHandoffs.get(takingOverFrom).exists(handle ⇒ handle == withHandle))
      pendingReadHandoffs -= takingOverFrom

  private def createEndpoint(remoteAddress: Address,
                             localAddress: Address,
                             transport: AkkaProtocolTransport,
                             endpointSettings: RemoteSettings,
                             handleOption: Option[AkkaProtocolHandle],
                             writing: Boolean,
                             refuseUid: Option[Int]): ActorRef = {
    assert(transportMapping contains localAddress)
    assert(writing || refuseUid.isEmpty)

    if (writing)
      "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" +
      reliableDeliverySupervisor = None)).withDeploy(Deploy.local),
      "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" +

  override def postStop(): Unit = {
    pruneTimerCancellable.foreach { _.cancel() }


