alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Endpoint.scala)

This example Akka source code file (Endpoint.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, address, akka, concurrent, endpointexception, none, option, receive, remote, resendstate, send, some, throwable, time, transport, unit

The Endpoint.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.remote

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import akka.actor.Terminated
import akka.actor._
import akka.dispatch.sysmsg.SystemMessage
import akka.event.{ Logging, LoggingAdapter }
import akka.pattern.pipe
import akka.remote.EndpointManager.{ ResendState, Link, Send }
import akka.remote.EndpointWriter.{ StoppedReading, FlushAndStop }
import akka.remote.WireFormats.SerializedMessage
import akka.remote.transport.AkkaPduCodec.Message
import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload }
import akka.remote.transport.Transport.InvalidAssociationException
import akka.remote.transport._
import akka.serialization.Serialization
import akka.util.ByteString
import akka.{ OnlyCauseStackTrace, AkkaException }
import java.io.NotSerializableException
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, Deadline }
import scala.util.control.NonFatal
import java.util.concurrent.locks.LockSupport
import scala.concurrent.Future
import scala.concurrent.blocking

/**
 * INTERNAL API
 */
private[remote] trait InboundMessageDispatcher {
  def dispatch(recipient: InternalActorRef,
               recipientAddress: Address,
               serializedMessage: SerializedMessage,
               senderOption: Option[ActorRef]): Unit
}

/**
 * INTERNAL API
 */
private[remote] class DefaultMessageDispatcher(private val system: ExtendedActorSystem,
                                               private val provider: RemoteActorRefProvider,
                                               private val log: LoggingAdapter) extends InboundMessageDispatcher {

  private val remoteDaemon = provider.remoteDaemon

  override def dispatch(recipient: InternalActorRef,
                        recipientAddress: Address,
                        serializedMessage: SerializedMessage,
                        senderOption: Option[ActorRef]): Unit = {

    import provider.remoteSettings._

    lazy val payload: AnyRef = MessageSerializer.deserialize(system, serializedMessage)
    def payloadClass: Class[_] = if (payload eq null) null else payload.getClass
    val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
    val originalReceiver = recipient.path

    def msgLog = s"RemoteMessage: [$payload] to [$recipient]<+[$originalReceiver] from [$sender()]"

    recipient match {

      case `remoteDaemon` ⇒
        if (UntrustedMode) log.debug("dropping daemon message in untrusted mode")
        else {
          if (LogReceive) log.debug("received daemon message {}", msgLog)
          remoteDaemon ! payload
        }

      case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒
        if (LogReceive) log.debug("received local message {}", msgLog)
        payload match {
          case sel: ActorSelectionMessage ⇒
            if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) ||
              sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian))
              log.debug("operating in UntrustedMode, dropping inbound actor selection to [{}], " +
                "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration",
                sel.elements.mkString("/", "/", ""))
            else
              // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
              ActorSelection.deliverSelection(l, sender, sel)
          case msg: PossiblyHarmful if UntrustedMode ⇒
            log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName)
          case msg: SystemMessage ⇒ l.sendSystemMessage(msg)
          case msg                ⇒ l.!(msg)(sender)
        }

      case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒
        if (LogReceive) log.debug("received remote-destined message {}", msgLog)
        if (provider.transport.addresses(recipientAddress))
          // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
          r.!(payload)(sender)
        else
          log.error("dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
            payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", "))

      case r ⇒ log.error("dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
        payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", "))

    }
  }

}

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[remote] class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace {
  def this(msg: String) = this(msg, null)
}

/**
 * INTERNAL API
 */
private[remote] trait AssociationProblem

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[remote] final case class ShutDownAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable)
  extends EndpointException("Shut down address: " + remoteAddress, cause) with AssociationProblem

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[remote] final case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable)
  extends EndpointException("Invalid address: " + remoteAddress, cause) with AssociationProblem

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[remote] final case class HopelessAssociation(localAddress: Address, remoteAddress: Address, uid: Option[Int], cause: Throwable)
  extends EndpointException("Catastrophic association error.") with AssociationProblem

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[remote] class EndpointDisassociatedException(msg: String) extends EndpointException(msg)

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[remote] class EndpointAssociationException(msg: String, cause: Throwable) extends EndpointException(msg, cause)

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[remote] class OversizedPayloadException(msg: String) extends EndpointException(msg)

/**
 * INTERNAL API
 */
private[remote] object ReliableDeliverySupervisor {
  case object Ungate
  case object AttemptSysMsgRedelivery
  final case class GotUid(uid: Int, remoteAddres: Address)

  def props(
    handleOrActive: Option[AkkaProtocolHandle],
    localAddress: Address,
    remoteAddress: Address,
    refuseUid: Option[Int],
    transport: AkkaProtocolTransport,
    settings: RemoteSettings,
    codec: AkkaPduCodec,
    receiveBuffers: ConcurrentHashMap[Link, ResendState]): Props =
    Props(classOf[ReliableDeliverySupervisor], handleOrActive, localAddress, remoteAddress, refuseUid, transport, settings,
      codec, receiveBuffers)
}

/**
 * INTERNAL API
 */
private[remote] class ReliableDeliverySupervisor(
  handleOrActive: Option[AkkaProtocolHandle],
  val localAddress: Address,
  val remoteAddress: Address,
  val refuseUid: Option[Int],
  val transport: AkkaProtocolTransport,
  val settings: RemoteSettings,
  val codec: AkkaPduCodec,
  val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends Actor with ActorLogging {
  import ReliableDeliverySupervisor._
  import context.dispatcher

  var autoResendTimer: Option[Cancellable] = None

  def scheduleAutoResend(): Unit = if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) {
    if (autoResendTimer.isEmpty)
      autoResendTimer = Some(context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery))
  }

  def rescheduleAutoResend(): Unit = {
    autoResendTimer.foreach(_.cancel())
    autoResendTimer = None
    scheduleAutoResend()
  }

  override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
    case e @ (_: AssociationProblem) ⇒ Escalate
    case NonFatal(e) ⇒
      log.warning("Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason is: [{}].",
        remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage)
      uidConfirmed = false // Need confirmation of UID again
      context.become(gated)
      currentHandle = None
      context.parent ! StoppedReading(self)
      Stop
  }

  var currentHandle: Option[AkkaProtocolHandle] = handleOrActive

  var resendBuffer: AckedSendBuffer[Send] = _
  var lastCumulativeAck: SeqNo = _
  var seqCounter: Long = _
  var pendingAcks = Vector.empty[Ack]

  def reset() {
    resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize)
    scheduleAutoResend()
    lastCumulativeAck = SeqNo(-1)
    seqCounter = 0L
    pendingAcks = Vector.empty
  }

  reset()

  def nextSeq(): SeqNo = {
    val tmp = seqCounter
    seqCounter += 1
    SeqNo(tmp)
  }

  var writer: ActorRef = createWriter()
  var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid }
  val bailoutAt: Deadline = Deadline.now + settings.InitialSysMsgDeliveryTimeout
  // Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the
  // UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for
  // any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore
  // it serves a separator.
  // If we already have an inbound handle then UID is initially confirmed.
  // (This actor is never restarted)
  var uidConfirmed: Boolean = uid.isDefined

  def unstashAcks(): Unit = {
    pendingAcks foreach (self ! _)
    pendingAcks = Vector.empty
  }

  override def postStop(): Unit = {
    // All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence
    // number otherwise deadLetters will ignore it to avoid reporting system messages as dead letters while they are
    // still possibly retransmitted.
    // Such a situation may arise when the EndpointWriter is shut down, and all of its mailbox contents are delivered
    // to dead letters. These messages should be ignored, as they still live in resendBuffer and might be delivered to
    // the remote system later.
    (resendBuffer.nacked ++ resendBuffer.nonAcked) foreach { s ⇒ context.system.deadLetters ! s.copy(seqOpt = None) }
    receiveBuffers.remove(Link(localAddress, remoteAddress))
  }

  override def postRestart(reason: Throwable): Unit = {
    throw new IllegalStateException(
      "BUG: ReliableDeliverySupervisor has been attempted to be restarted. This must not happen.")
  }

  override def receive: Receive = {
    case FlushAndStop ⇒
      // Trying to serve until our last breath
      resendAll()
      writer ! FlushAndStop
      context.become(flushWait)
    case s: Send ⇒
      handleSend(s)
    case ack: Ack ⇒
      if (!uidConfirmed) pendingAcks = pendingAcks :+ ack
      else {
        try resendBuffer = resendBuffer.acknowledge(ack)
        catch {
          case NonFatal(e) ⇒
            throw new InvalidAssociationException(s"Error encountered while processing system message acknowledgement $resendBuffer $ack", e)
        }

        if (lastCumulativeAck < ack.cumulativeAck) {
          lastCumulativeAck = ack.cumulativeAck
          // Cumulative ack is progressing, we might not need to resend non-acked messages yet.
          // If this progression stops, the timer will eventually kick in, since scheduleAutoResend
          // does not cancel existing timers (see the "else" case).
          rescheduleAutoResend()
        } else scheduleAutoResend()

        resendNacked()
      }
    case AttemptSysMsgRedelivery ⇒
      if (uidConfirmed) resendAll()
    case Terminated(_) ⇒
      currentHandle = None
      context.parent ! StoppedReading(self)
      if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty)
        context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
      context.become(idle)
    case g @ GotUid(receivedUid, _) ⇒
      context.parent ! g
      // New system that has the same address as the old - need to start from fresh state
      uidConfirmed = true
      if (uid.exists(_ != receivedUid)) reset()
      else unstashAcks()
      uid = Some(receivedUid)
      resendAll()

    case s: EndpointWriter.StopReading ⇒
      writer forward s
  }

  def gated: Receive = {
    case Terminated(_) ⇒
      context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
    case Ungate ⇒
      if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
        // If we talk to a system we have not talked to before (or has given up talking to in the past) stop
        // system delivery attempts after the specified time. This act will drop the pending system messages and gate the
        // remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable
        // again it will be immediately quarantined due to out-of-sync system message buffer and becomes quarantined.
        // In other words, this action is safe.
        if (!uidConfirmed && bailoutAt.isOverdue())
          throw new InvalidAssociation(localAddress, remoteAddress,
            new java.util.concurrent.TimeoutException("Delivery of system messages timed out and they were dropped."))
        writer = createWriter()
        // Resending will be triggered by the incoming GotUid message after the connection finished
        context.become(receive)
      } else context.become(idle)
    case s @ Send(msg: SystemMessage, _, _, _) ⇒ tryBuffer(s.copy(seqOpt = Some(nextSeq())))
    case s: Send                               ⇒ context.system.deadLetters ! s
    case EndpointWriter.FlushAndStop           ⇒ context.stop(self)
    case EndpointWriter.StopReading(w, replyTo) ⇒
      replyTo ! EndpointWriter.StoppedReading(w)
      sender() ! EndpointWriter.StoppedReading(w)
  }

  def idle: Receive = {
    case s: Send ⇒
      writer = createWriter()
      // Resending will be triggered by the incoming GotUid message after the connection finished
      handleSend(s)
      context.become(receive)
    case AttemptSysMsgRedelivery ⇒
      writer = createWriter()
      // Resending will be triggered by the incoming GotUid message after the connection finished
      context.become(receive)
    case EndpointWriter.FlushAndStop ⇒ context.stop(self)
    case EndpointWriter.StopReading(w, replyTo) ⇒
      replyTo ! EndpointWriter.StoppedReading(w)
  }

  def flushWait: Receive = {
    case Terminated(_) ⇒
      // Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down
      // and don't really know if they were properly delivered or not.
      resendBuffer = new AckedSendBuffer[Send](0)
      context.stop(self)
    case _ ⇒ // Ignore
  }

  private def handleSend(send: Send): Unit =
    if (send.message.isInstanceOf[SystemMessage]) {
      val sequencedSend = send.copy(seqOpt = Some(nextSeq()))
      tryBuffer(sequencedSend)
      // If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it.
      // GotUid will kick resendAll() causing the messages to be properly written
      if (uidConfirmed) writer ! sequencedSend
    } else writer ! send

  private def resendNacked(): Unit = resendBuffer.nacked foreach { writer ! _ }

  private def resendAll(): Unit = {
    resendNacked()
    resendBuffer.nonAcked foreach { writer ! _ }
    rescheduleAutoResend()
  }

  private def tryBuffer(s: Send): Unit =
    try {
      resendBuffer = resendBuffer buffer s
    } catch {
      case NonFatal(e) ⇒ throw new HopelessAssociation(localAddress, remoteAddress, uid, e)
    }

  private def createWriter(): ActorRef = {
    context.watch(context.actorOf(RARP(context.system).configureDispatcher(EndpointWriter.props(
      handleOrActive = currentHandle,
      localAddress = localAddress,
      remoteAddress = remoteAddress,
      refuseUid,
      transport = transport,
      settings = settings,
      AkkaPduProtobufCodec,
      receiveBuffers = receiveBuffers,
      reliableDeliverySupervisor = Some(self))).withDeploy(Deploy.local), "endpointWriter"))
  }
}

/**
 * INTERNAL API
 */
private[remote] abstract class EndpointActor(
  val localAddress: Address,
  val remoteAddress: Address,
  val transport: Transport,
  val settings: RemoteSettings,
  val codec: AkkaPduCodec) extends Actor with ActorLogging {

  def inbound: Boolean

  val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)

  def publishError(reason: Throwable, logLevel: Logging.LogLevel): Unit =
    tryPublish(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound, logLevel))

  def publishDisassociated(): Unit = tryPublish(DisassociatedEvent(localAddress, remoteAddress, inbound))

  private def tryPublish(ev: AssociationEvent): Unit = try
    eventPublisher.notifyListeners(ev)
  catch { case NonFatal(e) ⇒ log.error(e, "Unable to publish error event to EventStream.") }
}

/**
 * INTERNAL API
 */
private[remote] object EndpointWriter {

  def props(
    handleOrActive: Option[AkkaProtocolHandle],
    localAddress: Address,
    remoteAddress: Address,
    refuseUid: Option[Int],
    transport: AkkaProtocolTransport,
    settings: RemoteSettings,
    codec: AkkaPduCodec,
    receiveBuffers: ConcurrentHashMap[Link, ResendState],
    reliableDeliverySupervisor: Option[ActorRef]): Props =
    Props(classOf[EndpointWriter], handleOrActive, localAddress, remoteAddress, refuseUid, transport, settings, codec,
      receiveBuffers, reliableDeliverySupervisor)

  /**
   * This message signals that the current association maintained by the local EndpointWriter and EndpointReader is
   * to be overridden by a new inbound association. This is needed to avoid parallel inbound associations from the
   * same remote endpoint: when a parallel inbound association is detected, the old one is removed and the new one is
   * used instead.
   * @param handle Handle of the new inbound association.
   */
  final case class TakeOver(handle: AkkaProtocolHandle, replyTo: ActorRef) extends NoSerializationVerificationNeeded
  final case class TookOver(writer: ActorRef, handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded
  case object BackoffTimer
  case object FlushAndStop
  private case object FlushAndStopTimeout
  case object AckIdleCheckTimer
  final case class StopReading(writer: ActorRef, replyTo: ActorRef)
  final case class StoppedReading(writer: ActorRef)

  final case class Handle(handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded

  final case class OutboundAck(ack: Ack)

  // These settings are not configurable because wrong configuration will break the auto-tuning 
  private val SendBufferBatchSize = 5
  private val MinAdaptiveBackoffNanos = 300000L // 0.3 ms
  private val MaxAdaptiveBackoffNanos = 2000000L // 2 ms
  private val LogBufferSizeInterval = 5000000000L // 5 s, in nanoseconds
  private val MaxWriteCount = 50

}

/**
 * INTERNAL API
 */
private[remote] class EndpointWriter(
  handleOrActive: Option[AkkaProtocolHandle],
  localAddress: Address,
  remoteAddress: Address,
  refuseUid: Option[Int],
  transport: AkkaProtocolTransport,
  settings: RemoteSettings,
  codec: AkkaPduCodec,
  val receiveBuffers: ConcurrentHashMap[Link, ResendState],
  val reliableDeliverySupervisor: Option[ActorRef])
  extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) {

  import EndpointWriter._
  import context.dispatcher

  val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem]
  val remoteMetrics = RemoteMetricsExtension(extendedSystem)
  val backoffDispatcher = context.system.dispatchers.lookup("akka.remote.backoff-remote-dispatcher")

  var reader: Option[ActorRef] = None
  var handle: Option[AkkaProtocolHandle] = handleOrActive
  val readerId = Iterator from 0

  def newAckDeadline: Deadline = Deadline.now + settings.SysMsgAckTimeout
  var ackDeadline: Deadline = newAckDeadline

  var lastAck: Option[Ack] = None

  override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
    case NonFatal(e) ⇒ publishAndThrow(e, Logging.ErrorLevel)
  }

  val provider = RARP(extendedSystem).provider
  val msgDispatch = new DefaultMessageDispatcher(extendedSystem, provider, log)

  val inbound = handle.isDefined
  var stopReason: DisassociateInfo = AssociationHandle.Unknown

  // Use an internal buffer instead of Stash for efficiency
  // stash/unstashAll is slow when many messages are stashed
  // IMPORTANT: sender is not stored, so sender() and forward must not be used in EndpointWriter
  val buffer = new java.util.LinkedList[AnyRef]
  val prioBuffer = new java.util.LinkedList[Send]
  var largeBufferLogTimestamp = System.nanoTime()

  private def publishAndThrow(reason: Throwable, logLevel: Logging.LogLevel): Nothing = {
    reason match {
      case _: EndpointDisassociatedException ⇒ publishDisassociated()
      case _                                 ⇒ publishError(reason, logLevel)
    }
    throw reason
  }

  val ackIdleTimer = {
    val interval = settings.SysMsgAckTimeout / 2
    context.system.scheduler.schedule(interval, interval, self, AckIdleCheckTimer)
  }

  override def preStart(): Unit = {
    handle match {
      case Some(h) ⇒
        reader = startReadEndpoint(h)
      case None ⇒
        transport.associate(remoteAddress, refuseUid).map(Handle(_)) pipeTo self
    }
  }

  override def postRestart(reason: Throwable): Unit =
    throw new IllegalStateException("EndpointWriter must not be restarted")

  override def postStop(): Unit = {
    ackIdleTimer.cancel()
    while (!prioBuffer.isEmpty)
      extendedSystem.deadLetters ! prioBuffer.poll
    while (!buffer.isEmpty)
      extendedSystem.deadLetters ! buffer.poll
    handle foreach { _.disassociate(stopReason) }
    eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound))
  }

  def receive = if (handle.isEmpty) initializing else writing

  def initializing: Receive = {
    case s: Send ⇒
      enqueueInBuffer(s)
    case Status.Failure(e: InvalidAssociationException) ⇒
      publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e), Logging.WarningLevel)
    case Status.Failure(e) ⇒
      publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e), Logging.DebugLevel)
    case Handle(inboundHandle) ⇒
      // Assert handle == None?
      context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid, remoteAddress)
      handle = Some(inboundHandle)
      reader = startReadEndpoint(inboundHandle)
      eventPublisher.notifyListeners(AssociatedEvent(localAddress, remoteAddress, inbound))
      becomeWritingOrSendBufferedMessages()
  }

  def enqueueInBuffer(msg: AnyRef): Unit = msg match {
    case s @ Send(_: PriorityMessage, _, _, _) ⇒ prioBuffer offer s
    case s @ Send(ActorSelectionMessage(_: PriorityMessage, _, _), _, _, _) ⇒ prioBuffer offer s
    case _ ⇒ buffer offer msg
  }

  val buffering: Receive = {
    case s: Send      ⇒ enqueueInBuffer(s)
    case BackoffTimer ⇒ sendBufferedMessages()
    case FlushAndStop ⇒
      // Flushing is postponed after the pending writes
      buffer offer FlushAndStop
      context.system.scheduler.scheduleOnce(settings.FlushWait, self, FlushAndStopTimeout)
    case FlushAndStopTimeout ⇒
      // enough
      flushAndStop()
  }

  def becomeWritingOrSendBufferedMessages(): Unit =
    if (buffer.isEmpty)
      context.become(writing)
    else {
      context.become(buffering)
      sendBufferedMessages()
    }

  var writeCount = 0
  var maxWriteCount = MaxWriteCount
  var adaptiveBackoffNanos = 1000000L // 1 ms
  var fullBackoff = false

  // FIXME remove these counters when tuning/testing is completed
  var fullBackoffCount = 1
  var smallBackoffCount = 0
  var noBackoffCount = 0

  def adjustAdaptiveBackup(): Unit = {
    maxWriteCount = math.max(writeCount, maxWriteCount)
    if (writeCount <= SendBufferBatchSize) {
      fullBackoff = true
      adaptiveBackoffNanos = math.min((adaptiveBackoffNanos * 1.2).toLong, MaxAdaptiveBackoffNanos)
    } else if (writeCount >= maxWriteCount * 0.6)
      adaptiveBackoffNanos = math.max((adaptiveBackoffNanos * 0.9).toLong, MinAdaptiveBackoffNanos)
    else if (writeCount <= maxWriteCount * 0.2)
      adaptiveBackoffNanos = math.min((adaptiveBackoffNanos * 1.1).toLong, MaxAdaptiveBackoffNanos)

    writeCount = 0
  }

  def sendBufferedMessages(): Unit = {

    def delegate(msg: Any): Boolean = msg match {
      case s: Send ⇒
        writeSend(s)
      case FlushAndStop ⇒
        flushAndStop()
        false
      case s @ StopReading(_, replyTo) ⇒
        reader.foreach(_.tell(s, replyTo))
        true
    }

    @tailrec def writeLoop(count: Int): Boolean =
      if (count > 0 && !buffer.isEmpty)
        if (delegate(buffer.peek)) {
          buffer.removeFirst()
          writeCount += 1
          writeLoop(count - 1)
        } else false
      else true

    @tailrec def writePrioLoop(): Boolean =
      if (prioBuffer.isEmpty) true
      else writeSend(prioBuffer.peek) && { prioBuffer.removeFirst(); writePrioLoop() }

    val size = buffer.size

    val ok = writePrioLoop() && writeLoop(SendBufferBatchSize)
    if (buffer.isEmpty && prioBuffer.isEmpty) {
      // FIXME remove this when testing/tuning is completed
      if (log.isDebugEnabled)
        log.debug(s"Drained buffer with maxWriteCount: $maxWriteCount, fullBackoffCount: $fullBackoffCount" +
          s", smallBackoffCount: $smallBackoffCount, noBackoffCount: $noBackoffCount " +
          s", adaptiveBackoff: ${adaptiveBackoffNanos / 1000}")
      fullBackoffCount = 1
      smallBackoffCount = 0
      noBackoffCount = 0

      writeCount = 0
      maxWriteCount = MaxWriteCount
      context.become(writing)
    } else if (ok) {
      noBackoffCount += 1
      self ! BackoffTimer
    } else {

      if (size > settings.LogBufferSizeExceeding) {
        val now = System.nanoTime()
        if (now - largeBufferLogTimestamp >= LogBufferSizeInterval) {
          log.warning("[{}] buffered messages in EndpointWriter for [{}]. " +
            "You should probably implement flow control to avoid flooding the remote connection.",
            size, remoteAddress)
          largeBufferLogTimestamp = now
        }
      }

      adjustAdaptiveBackup()
      scheduleBackoffTimer()
    }

  }

  def scheduleBackoffTimer(): Unit = {
    if (fullBackoff) {
      fullBackoffCount += 1
      fullBackoff = false
      context.system.scheduler.scheduleOnce(settings.BackoffPeriod, self, BackoffTimer)
    } else {
      smallBackoffCount += 1
      val s = self
      val backoffDeadlinelineNanoTime = System.nanoTime + adaptiveBackoffNanos
      Future {
        @tailrec def backoff(): Unit = {
          val backoffNanos = backoffDeadlinelineNanoTime - System.nanoTime
          if (backoffNanos > 0) {
            LockSupport.parkNanos(backoffNanos)
            // parkNanos allows for spurious wakeup, check again
            backoff()
          }
        }
        backoff()
        s.tell(BackoffTimer, ActorRef.noSender)
      }(backoffDispatcher)
    }
  }

  val writing: Receive = {
    case s: Send ⇒
      if (!writeSend(s)) {
        if (s.seqOpt.isEmpty) enqueueInBuffer(s)
        scheduleBackoffTimer()
        context.become(buffering)
      }

    // We are in Writing state, so buffer is empty, safe to stop here
    case FlushAndStop ⇒
      flushAndStop()

    case AckIdleCheckTimer if ackDeadline.isOverdue() ⇒
      trySendPureAck()
  }

  def writeSend(s: Send): Boolean = try {
    handle match {
      case Some(h) ⇒
        if (provider.remoteSettings.LogSend) {
          def msgLog = s"RemoteMessage: [${s.message}] to [${s.recipient}]<+[${s.recipient.path}] from [${s.senderOption.getOrElse(extendedSystem.deadLetters)}]"
          log.debug("sending message {}", msgLog)
        }

        val pdu = codec.constructMessage(
          s.recipient.localAddressToUse,
          s.recipient,
          serializeMessage(s.message),
          s.senderOption,
          seqOption = s.seqOpt,
          ackOption = lastAck)

        val pduSize = pdu.size
        remoteMetrics.logPayloadBytes(s.message, pduSize)

        if (pduSize > transport.maximumPayloadBytes) {
          val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes.")
          log.error(reason, "Transient association error (association remains live)")
          true
        } else {
          val ok = h.write(pdu)
          if (ok) {
            ackDeadline = newAckDeadline
            lastAck = None
          }
          ok
        }

      case None ⇒
        throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.")
    }
  } catch {
    case e: NotSerializableException ⇒
      log.error(e, "Transient association error (association remains live)")
      true
    case e: EndpointException ⇒
      publishAndThrow(e, Logging.ErrorLevel)
    case NonFatal(e) ⇒
      publishAndThrow(new EndpointException("Failed to write message to the transport", e), Logging.ErrorLevel)
  }

  def handoff: Receive = {
    case Terminated(_) ⇒
      reader = startReadEndpoint(handle.get)
      becomeWritingOrSendBufferedMessages()

    case s: Send ⇒
      enqueueInBuffer(s)
  }

  override def unhandled(message: Any): Unit = message match {
    case Terminated(r) if r == reader.orNull ⇒
      publishAndThrow(new EndpointDisassociatedException("Disassociated"), Logging.DebugLevel)
    case s @ StopReading(_, replyTo) ⇒
      reader match {
        case Some(r) ⇒
          r.tell(s, replyTo)
        case None ⇒
          // initalizing, buffer and take care of it later when buffer is sent
          enqueueInBuffer(s)
      }
    case TakeOver(newHandle, replyTo) ⇒
      // Shutdown old reader
      handle foreach { _.disassociate() }
      handle = Some(newHandle)
      replyTo ! TookOver(self, newHandle)
      context.become(handoff)
    case FlushAndStop ⇒
      stopReason = AssociationHandle.Shutdown
      context.stop(self)
    case OutboundAck(ack) ⇒
      lastAck = Some(ack)
    case AckIdleCheckTimer   ⇒ // Ignore
    case FlushAndStopTimeout ⇒ // ignore
    case BackoffTimer        ⇒ // ignore
    case other               ⇒ super.unhandled(other)
  }

  def flushAndStop(): Unit = {
    // Try to send a last Ack message
    trySendPureAck()
    stopReason = AssociationHandle.Shutdown
    context.stop(self)
  }

  private def trySendPureAck(): Unit =
    for (h ← handle; ack ← lastAck)
      if (h.write(codec.constructPureAck(ack))) {
        ackDeadline = newAckDeadline
        lastAck = None
      }

  private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = {
    val newReader =
      context.watch(context.actorOf(
        RARP(context.system).configureDispatcher(EndpointReader.props(localAddress, remoteAddress, transport, settings, codec,
          msgDispatch, inbound, handle.handshakeInfo.uid, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local),
        "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
    handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
    Some(newReader)
  }

  private def serializeMessage(msg: Any): SerializedMessage = handle match {
    case Some(h) ⇒
      Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, extendedSystem)) {
        (MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]))
      }
    case None ⇒
      throw new EndpointException("Internal error: No handle was present during serialization of outbound message.")
  }

}

/**
 * INTERNAL API
 */
private[remote] object EndpointReader {

  def props(
    localAddress: Address,
    remoteAddress: Address,
    transport: Transport,
    settings: RemoteSettings,
    codec: AkkaPduCodec,
    msgDispatch: InboundMessageDispatcher,
    inbound: Boolean,
    uid: Int,
    reliableDeliverySupervisor: Option[ActorRef],
    receiveBuffers: ConcurrentHashMap[Link, ResendState]): Props =
    Props(classOf[EndpointReader], localAddress, remoteAddress, transport, settings, codec, msgDispatch, inbound,
      uid, reliableDeliverySupervisor, receiveBuffers)

}

/**
 * INTERNAL API
 */
private[remote] class EndpointReader(
  localAddress: Address,
  remoteAddress: Address,
  transport: Transport,
  settings: RemoteSettings,
  codec: AkkaPduCodec,
  msgDispatch: InboundMessageDispatcher,
  val inbound: Boolean,
  val uid: Int,
  val reliableDeliverySupervisor: Option[ActorRef],
  val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) {

  import EndpointWriter.{ OutboundAck, StopReading, StoppedReading }

  val provider = RARP(context.system).provider
  var ackedReceiveBuffer = new AckedReceiveBuffer[Message]

  override def preStart(): Unit = {
    receiveBuffers.get(Link(localAddress, remoteAddress)) match {
      case null ⇒
      case ResendState(`uid`, buffer) ⇒
        ackedReceiveBuffer = buffer
        deliverAndAck()
      case _ ⇒
    }
  }

  override def postStop(): Unit = saveState()

  def saveState(): Unit = {
    def merge(currentState: ResendState, oldState: ResendState): ResendState =
      if (currentState.uid == oldState.uid) ResendState(uid, oldState.buffer.mergeFrom(currentState.buffer))
      else currentState

    @tailrec
    def updateSavedState(key: Link, expectedState: ResendState): Unit = {
      if (expectedState eq null) {
        if (receiveBuffers.putIfAbsent(key, ResendState(uid, ackedReceiveBuffer)) ne null)
          updateSavedState(key, receiveBuffers.get(key))
      } else if (!receiveBuffers.replace(key, expectedState, merge(ResendState(uid, ackedReceiveBuffer), expectedState)))
        updateSavedState(key, receiveBuffers.get(key))
    }

    val key = Link(localAddress, remoteAddress)
    updateSavedState(key, receiveBuffers.get(key))
  }

  override def receive: Receive = {
    case Disassociated(info) ⇒ handleDisassociated(info)

    case InboundPayload(p) if p.size <= transport.maximumPayloadBytes ⇒
      val (ackOption, msgOption) = tryDecodeMessageAndAck(p)

      for (ack ← ackOption; reliableDelivery ← reliableDeliverySupervisor) reliableDelivery ! ack

      msgOption match {
        case Some(msg) ⇒
          if (msg.reliableDeliveryEnabled) {
            ackedReceiveBuffer = ackedReceiveBuffer.receive(msg)
            deliverAndAck()
          } else msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption)

        case None ⇒
      }

    case InboundPayload(oversized) ⇒
      log.error(new OversizedPayloadException(s"Discarding oversized payload received: " +
        s"max allowed size [${transport.maximumPayloadBytes}] bytes, actual size [${oversized.size}] bytes."),
        "Transient error while reading from association (association remains live)")

    case StopReading(writer, replyTo) ⇒
      saveState()
      context.become(notReading)
      replyTo ! StoppedReading(writer)

  }

  def notReading: Receive = {
    case Disassociated(info) ⇒ handleDisassociated(info)

    case StopReading(writer, replyTo) ⇒
      replyTo ! StoppedReading(writer)

    case InboundPayload(p) ⇒
      val (ackOption, _) = tryDecodeMessageAndAck(p)
      for (ack ← ackOption; reliableDelivery ← reliableDeliverySupervisor) reliableDelivery ! ack

    case _ ⇒
  }

  private def handleDisassociated(info: DisassociateInfo): Unit = info match {
    case AssociationHandle.Unknown ⇒
      context.stop(self)
    case AssociationHandle.Shutdown ⇒
      throw ShutDownAssociation(
        localAddress,
        remoteAddress,
        InvalidAssociationException("The remote system terminated the association because it is shutting down."))
    case AssociationHandle.Quarantined ⇒
      throw InvalidAssociation(
        localAddress,
        remoteAddress,
        InvalidAssociationException("The remote system has quarantined this system. No further associations " +
          "to the remote system are possible until this system is restarted."))
  }

  private def deliverAndAck(): Unit = {
    val (updatedBuffer, deliver, ack) = ackedReceiveBuffer.extractDeliverable
    ackedReceiveBuffer = updatedBuffer

    // Notify writer that some messages can be acked
    context.parent ! OutboundAck(ack)
    deliver foreach { m ⇒
      msgDispatch.dispatch(m.recipient, m.recipientAddress, m.serializedMessage, m.senderOption)
    }
  }

  private def tryDecodeMessageAndAck(pdu: ByteString): (Option[Ack], Option[Message]) = try {
    codec.decodeMessage(pdu, provider, localAddress)
  } catch {
    case NonFatal(e) ⇒ throw new EndpointException("Error while decoding incoming Akka PDU", e)
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka Endpoint.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.