alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (AbstractTransportAdapter.scala)

This example Akka source code file (AbstractTransportAdapter.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

address, akka, collection, concurrent, future, listenerregistered, listenunderlying, promise, remote, string, time, transport, transportadapterprovider, transportadapters, transportoperation

The AbstractTransportAdapter.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.remote.transport

import akka.actor._
import akka.pattern.{ ask, pipe, gracefulStop }
import akka.remote.Remoting.RegisterTransportActor
import akka.remote.transport.Transport._
import akka.remote.RARP
import akka.util.Timeout
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Promise, Future }
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.remote.transport.AssociationHandle.DisassociateInfo

trait TransportAdapterProvider {
  /**
   * Create the transport adapter that wraps an underlying transport.
   */
  def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport
}

class TransportAdapters(system: ExtendedActorSystem) extends Extension {
  val settings = RARP(system).provider.remoteSettings

  private val adaptersTable: Map[String, TransportAdapterProvider] = for ((name, fqn) ← settings.Adapters) yield {
    name -> system.dynamicAccess.createInstanceFor[TransportAdapterProvider](fqn, immutable.Seq.empty).recover({
      case e ⇒ throw new IllegalArgumentException(s"Cannot instantiate transport adapter [${fqn}]", e)
    }).get
  }

  def getAdapterProvider(name: String): TransportAdapterProvider = adaptersTable.get(name) match {
    case Some(provider) ⇒ provider
    case None           ⇒ throw new IllegalArgumentException(s"There is no registered transport adapter provider with name: [${name}]")
  }
}

object TransportAdaptersExtension extends ExtensionId[TransportAdapters] with ExtensionIdProvider {
  override def get(system: ActorSystem): TransportAdapters = super.get(system)
  override def lookup = TransportAdaptersExtension
  override def createExtension(system: ExtendedActorSystem): TransportAdapters =
    new TransportAdapters(system)
}

trait SchemeAugmenter {
  protected def addedSchemeIdentifier: String

  protected def augmentScheme(originalScheme: String): String = s"$addedSchemeIdentifier.$originalScheme"

  protected def augmentScheme(address: Address): Address = address.copy(protocol = augmentScheme(address.protocol))

  protected def removeScheme(scheme: String): String =
    if (scheme.startsWith(s"$addedSchemeIdentifier."))
      scheme.drop(addedSchemeIdentifier.length + 1)
    else scheme

  protected def removeScheme(address: Address): Address = address.copy(protocol = removeScheme(address.protocol))
}

/**
 * An adapter that wraps a transport and provides interception
 */
abstract class AbstractTransportAdapter(protected val wrappedTransport: Transport)(implicit val ec: ExecutionContext)
  extends Transport with SchemeAugmenter {

  protected def maximumOverhead: Int

  protected def interceptListen(listenAddress: Address,
                                listenerFuture: Future[AssociationEventListener]): Future[AssociationEventListener]

  protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit

  override def schemeIdentifier: String = augmentScheme(wrappedTransport.schemeIdentifier)

  override def isResponsibleFor(address: Address): Boolean = wrappedTransport.isResponsibleFor(address)

  override def maximumPayloadBytes: Int = wrappedTransport.maximumPayloadBytes - maximumOverhead

  override def listen: Future[(Address, Promise[AssociationEventListener])] = {
    val upstreamListenerPromise: Promise[AssociationEventListener] = Promise()

    for {
      (listenAddress, listenerPromise) ← wrappedTransport.listen
      // Enforce ordering between the signalling of "listen ready" to upstream
      // and initialization happening in interceptListen
      _ ← listenerPromise.tryCompleteWith(interceptListen(listenAddress, upstreamListenerPromise.future)).future
    } yield (augmentScheme(listenAddress), upstreamListenerPromise)
  }

  override def associate(remoteAddress: Address): Future[AssociationHandle] = {
    // Prepare a future, and pass its promise to the manager
    val statusPromise: Promise[AssociationHandle] = Promise()

    interceptAssociate(removeScheme(remoteAddress), statusPromise)

    statusPromise.future
  }

  override def shutdown(): Future[Boolean] = wrappedTransport.shutdown()

}

abstract class AbstractTransportAdapterHandle(val originalLocalAddress: Address,
                                              val originalRemoteAddress: Address,
                                              val wrappedHandle: AssociationHandle,
                                              val addedSchemeIdentifier: String) extends AssociationHandle
  with SchemeAugmenter {

  def this(wrappedHandle: AssociationHandle, addedSchemeIdentifier: String) =
    this(wrappedHandle.localAddress,
      wrappedHandle.remoteAddress,
      wrappedHandle,
      addedSchemeIdentifier)

  override val localAddress = augmentScheme(originalLocalAddress)
  override val remoteAddress = augmentScheme(originalRemoteAddress)

}

object ActorTransportAdapter {
  sealed trait TransportOperation extends NoSerializationVerificationNeeded

  final case class ListenerRegistered(listener: AssociationEventListener) extends TransportOperation
  final case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[AssociationHandle]) extends TransportOperation
  final case class ListenUnderlying(listenAddress: Address,
                                    upstreamListener: Future[AssociationEventListener]) extends TransportOperation
  final case class DisassociateUnderlying(info: DisassociateInfo = AssociationHandle.Unknown) extends TransportOperation

  implicit val AskTimeout = Timeout(5.seconds)
}

abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem)
  extends AbstractTransportAdapter(wrappedTransport)(system.dispatcher) {

  import ActorTransportAdapter._

  protected def managerName: String
  protected def managerProps: Props
  // Write once variable initialized when Listen is called.
  @volatile protected var manager: ActorRef = _

  private def registerManager(): Future[ActorRef] =
    (system.actorSelection("/system/transports") ? RegisterTransportActor(managerProps, managerName)).mapTo[ActorRef]

  override def interceptListen(listenAddress: Address,
                               listenerPromise: Future[AssociationEventListener]): Future[AssociationEventListener] = {
    registerManager().map { mgr ⇒
      // Side effecting: storing the manager instance in volatile var
      // This is done only once: during the initialization of the protocol stack. The variable manager is not read
      // before listen is called.
      manager = mgr
      manager ! ListenUnderlying(listenAddress, listenerPromise)
      ActorAssociationEventListener(manager)
    }
  }

  override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit =
    manager ! AssociateUnderlying(remoteAddress, statusPromise)

  override def shutdown(): Future[Boolean] =
    for {
      stopResult ← gracefulStop(manager, RARP(system).provider.remoteSettings.FlushWait)
      wrappedStopResult ← wrappedTransport.shutdown()
    } yield stopResult && wrappedStopResult
}

abstract class ActorTransportAdapterManager extends Actor
  with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
  import ActorTransportAdapter.{ ListenUnderlying, ListenerRegistered }

  private var delayedEvents = immutable.Queue.empty[Any]

  protected var associationListener: AssociationEventListener = _
  protected var localAddress: Address = _
  private var uniqueId = 0L

  protected def nextId(): Long = {
    uniqueId += 1
    uniqueId
  }

  import context.dispatcher

  def receive: Receive = {
    case ListenUnderlying(listenAddress, upstreamListenerFuture) ⇒
      localAddress = listenAddress
      upstreamListenerFuture.future.map { ListenerRegistered(_) } pipeTo self

    case ListenerRegistered(listener) ⇒
      associationListener = listener
      delayedEvents foreach { self.tell(_, Actor.noSender) }
      delayedEvents = immutable.Queue.empty[Any]
      context.become(ready)

    /* Simple imitation of Stash. It is more lightweight as it does not need any specific dispatchers or additional
     * queue. The difference is that these messages will not survive a restart -- which is not needed here.
     * These messages will be processed in the ready state.
     */
    case otherEvent ⇒ delayedEvents = delayedEvents enqueue otherEvent

  }

  protected def ready: Receive
}

Other Akka source code examples

Here is a short list of links related to this Akka AbstractTransportAdapter.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.