alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (Tcp.scala)

This example Akka source code file (Tcp.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, akka, boolean, collection, command, concurrent, connectionclosed, duration, event, inetsocketaddress, int, io, noack, simplewritecommand, time, writecommand

The Tcp.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */

package akka.io

import java.net.InetSocketAddress
import java.net.Socket
import akka.io.Inet._
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.collection.immutable
import scala.collection.JavaConverters._
import akka.util.ByteString
import akka.util.Helpers.Requiring
import akka.actor._
import java.lang.{ Iterable ⇒ JIterable }

/**
 * TCP Extension for Akka’s IO layer.
 *
 * For a full description of the design and philosophy behind this IO
 * implementation please refer to <a href="http://doc.akka.io/">the Akka online documentation</a>.
 *
 * In order to open an outbound connection send a [[Tcp.Connect]] message
 * to the [[TcpExt#manager]].
 *
 * In order to start listening for inbound connetions send a [[Tcp.Bind]]
 * message to the [[TcpExt#manager]].
 *
 * The Java API for generating TCP commands is available at [[TcpMessage]].
 */
object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {

  override def lookup = Tcp

  override def createExtension(system: ExtendedActorSystem): TcpExt = new TcpExt(system)

  /**
   * Java API: retrieve the Tcp extension for the given system.
   */
  override def get(system: ActorSystem): TcpExt = super.get(system)

  /**
   * Scala API: this object contains all applicable socket options for TCP.
   *
   * For the Java API see [[TcpSO]].
   */
  object SO extends Inet.SoForwarders {

    // general socket options

    /**
     * [[akka.io.Inet.SocketOption]] to enable or disable SO_KEEPALIVE
     *
     * For more information see [[java.net.Socket.setKeepAlive]]
     */
    final case class KeepAlive(on: Boolean) extends SocketOption {
      override def afterConnect(s: Socket): Unit = s.setKeepAlive(on)
    }

    /**
     * [[akka.io.Inet.SocketOption]] to enable or disable OOBINLINE (receipt
     * of TCP urgent data) By default, this option is disabled and TCP urgent
     * data is silently discarded.
     *
     * For more information see [[java.net.Socket.setOOBInline]]
     */
    final case class OOBInline(on: Boolean) extends SocketOption {
      override def afterConnect(s: Socket): Unit = s.setOOBInline(on)
    }

    // SO_LINGER is handled by the Close code

    /**
     * [[akka.io.Inet.SocketOption]] to enable or disable TCP_NODELAY
     * (disable or enable Nagle's algorithm)
     *
     * Please note, that TCP_NODELAY is enabled by default.
     *
     * For more information see [[java.net.Socket.setTcpNoDelay]]
     */
    final case class TcpNoDelay(on: Boolean) extends SocketOption {
      override def afterConnect(s: Socket): Unit = s.setTcpNoDelay(on)
    }

  }

  /**
   * The common interface for [[Command]] and [[Event]].
   */
  sealed trait Message extends NoSerializationVerificationNeeded

  /// COMMANDS

  /**
   * This is the common trait for all commands understood by TCP actors.
   */
  trait Command extends Message with SelectionHandler.HasFailureMessage {
    def failureMessage = CommandFailed(this)
  }

  /**
   * The Connect message is sent to the TCP manager actor, which is obtained via
   * [[TcpExt#manager]]. Either the manager replies with a [[CommandFailed]]
   * or the actor handling the new connection replies with a [[Connected]]
   * message.
   *
   * @param remoteAddress is the address to connect to
   * @param localAddress optionally specifies a specific address to bind to
   * @param options Please refer to the [[SO]] object for a list of all supported options.
   */
  final case class Connect(remoteAddress: InetSocketAddress,
                           localAddress: Option[InetSocketAddress] = None,
                           options: immutable.Traversable[SocketOption] = Nil,
                           timeout: Option[FiniteDuration] = None,
                           pullMode: Boolean = false) extends Command

  /**
   * The Bind message is send to the TCP manager actor, which is obtained via
   * [[TcpExt#manager]] in order to bind to a listening socket. The manager
   * replies either with a [[CommandFailed]] or the actor handling the listen
   * socket replies with a [[Bound]] message. If the local port is set to 0 in
   * the Bind message, then the [[Bound]] message should be inspected to find
   * the actual port which was bound to.
   *
   * @param handler The actor which will receive all incoming connection requests
   *                in the form of [[Connected]] messages.
   *
   * @param localAddress The socket address to bind to; use port zero for
   *                automatic assignment (i.e. an ephemeral port, see [[Bound]])
   *
   * @param backlog This specifies the number of unaccepted connections the O/S
   *                kernel will hold for this port before refusing connections.
   *
   * @param options Please refer to the [[SO]] object for a list of all supported options.
   */
  final case class Bind(handler: ActorRef,
                        localAddress: InetSocketAddress,
                        backlog: Int = 100,
                        options: immutable.Traversable[SocketOption] = Nil,
                        pullMode: Boolean = false) extends Command

  /**
   * This message must be sent to a TCP connection actor after receiving the
   * [[Connected]] message. The connection will not read any data from the
   * socket until this message is received, because this message defines the
   * actor which will receive all inbound data.
   *
   * @param handler The actor which will receive all incoming data and which
   *                will be informed when the connection is closed.
   *
   * @param keepOpenOnPeerClosed If this is set to true then the connection
   *                is not automatically closed when the peer closes its half,
   *                requiring an explicit [[Closed]] from our side when finished.
   *
   * @param useResumeWriting If this is set to true then the connection actor
   *                will refuse all further writes after issuing a [[CommandFailed]]
   *                notification until [[ResumeWriting]] is received. This can
   *                be used to implement NACK-based write backpressure.
   */
  final case class Register(handler: ActorRef, keepOpenOnPeerClosed: Boolean = false, useResumeWriting: Boolean = true) extends Command

  /**
   * In order to close down a listening socket, send this message to that socket’s
   * actor (that is the actor which previously had sent the [[Bound]] message). The
   * listener socket actor will reply with a [[Unbound]] message.
   */
  case object Unbind extends Command

  /**
   * Common interface for all commands which aim to close down an open connection.
   */
  sealed trait CloseCommand extends Command {
    /**
     * The corresponding event which is sent as an acknowledgment once the
     * close operation is finished.
     */
    def event: ConnectionClosed
  }

  /**
   * A normal close operation will first flush pending writes and then close the
   * socket. The sender of this command and the registered handler for incoming
   * data will both be notified once the socket is closed using a [[Closed]]
   * message.
   */
  case object Close extends CloseCommand {
    /**
     * The corresponding event which is sent as an acknowledgment once the
     * close operation is finished.
     */
    override def event = Closed
  }

  /**
   * A confirmed close operation will flush pending writes and half-close the
   * connection, waiting for the peer to close the other half. The sender of this
   * command and the registered handler for incoming data will both be notified
   * once the socket is closed using a [[ConfirmedClosed]] message.
   */
  case object ConfirmedClose extends CloseCommand {
    /**
     * The corresponding event which is sent as an acknowledgment once the
     * close operation is finished.
     */
    override def event = ConfirmedClosed
  }

  /**
   * An abort operation will not flush pending writes and will issue a TCP ABORT
   * command to the O/S kernel which should result in a TCP_RST packet being sent
   * to the peer. The sender of this command and the registered handler for
   * incoming data will both be notified once the socket is closed using a
   * [[Aborted]] message.
   */
  case object Abort extends CloseCommand {
    /**
     * The corresponding event which is sent as an acknowledgment once the
     * close operation is finished.
     */
    override def event = Aborted
  }

  /**
   * Each [[WriteCommand]] can optionally request a positive acknowledgment to be sent
   * to the commanding actor. If such notification is not desired the [[WriteCommand#ack]]
   * must be set to an instance of this class. The token contained within can be used
   * to recognize which write failed when receiving a [[CommandFailed]] message.
   */
  case class NoAck(token: Any) extends Event

  /**
   * Default [[NoAck]] instance which is used when no acknowledgment information is
   * explicitly provided. Its “token” is `null`.
   */
  object NoAck extends NoAck(null)

  /**
   * Common interface for all write commands, currently [[Write]], [[WriteFile]] and [[CompoundWrite]].
   */
  sealed abstract class WriteCommand extends Command {
    /**
     * Prepends this command with another `Write` or `WriteFile` to form
     * a `CompoundWrite`.
     */
    def +:(other: SimpleWriteCommand): CompoundWrite = CompoundWrite(other, this)

    /**
     * Prepends this command with a number of other writes.
     * The first element of the given Iterable becomes the first sub write of a potentially
     * created `CompoundWrite`.
     */
    def ++:(writes: Iterable[WriteCommand]): WriteCommand =
      writes.foldRight(this) {
        case (a: SimpleWriteCommand, b) ⇒ a +: b
        case (a: CompoundWrite, b)      ⇒ a ++: b
      }

    /**
     * Java API: prepends this command with another `Write` or `WriteFile` to form
     * a `CompoundWrite`.
     */
    def prepend(that: SimpleWriteCommand): CompoundWrite = that +: this

    /**
     * Java API: prepends this command with a number of other writes.
     * The first element of the given Iterable becomes the first sub write of a potentially
     * created `CompoundWrite`.
     */
    def prepend(writes: JIterable[WriteCommand]): WriteCommand = writes.asScala ++: this
  }

  object WriteCommand {
    /**
     * Combines the given number of write commands into one atomic `WriteCommand`.
     */
    def apply(writes: Iterable[WriteCommand]): WriteCommand = writes ++: Write.empty

    /**
     * Java API: combines the given number of write commands into one atomic `WriteCommand`.
     */
    def create(writes: JIterable[WriteCommand]): WriteCommand = apply(writes.asScala)
  }

  /**
   * Common supertype of [[Write]] and [[WriteFile]].
   */
  sealed abstract class SimpleWriteCommand extends WriteCommand {
    require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")

    /**
     * The acknowledgment token associated with this write command.
     */
    def ack: Event

    /**
     * An acknowledgment is only sent if this write command “wants an ack”, which is
     * equivalent to the [[#ack]] token not being a of type [[NoAck]].
     */
    def wantsAck: Boolean = !ack.isInstanceOf[NoAck]

    /**
     * Java API: appends this command with another `WriteCommand` to form a `CompoundWrite`.
     */
    def append(that: WriteCommand): CompoundWrite = this +: that
  }

  /**
   * Write data to the TCP connection. If no ack is needed use the special
   * `NoAck` object. The connection actor will reply with a [[CommandFailed]]
   * message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
   * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
   * token once the write has been successfully enqueued to the O/S kernel.
   * <b>Note that this does not in any way guarantee that the data will be
   * or have been sent!</b> Unfortunately there is no way to determine whether
   * a particular write has been sent by the O/S.
   */
  final case class Write(data: ByteString, ack: Event) extends SimpleWriteCommand
  object Write {
    /**
     * The empty Write doesn't write anything and isn't acknowledged.
     * It will, however, be denied and sent back with `CommandFailed` if the
     * connection isn't currently ready to send any data (because another WriteCommand
     * is still pending).
     */
    val empty: Write = Write(ByteString.empty, NoAck)

    /**
     * Create a new unacknowledged Write command with the given data.
     */
    def apply(data: ByteString): Write =
      if (data.isEmpty) empty else Write(data, NoAck)
  }

  /**
   * Write `count` bytes starting at `position` from file at `filePath` to the connection.
   * The count must be > 0. The connection actor will reply with a [[CommandFailed]]
   * message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
   * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
   * token once the write has been successfully enqueued to the O/S kernel.
   * <b>Note that this does not in any way guarantee that the data will be
   * or have been sent!</b> Unfortunately there is no way to determine whether
   * a particular write has been sent by the O/S.
   */
  final case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends SimpleWriteCommand {
    require(position >= 0, "WriteFile.position must be >= 0")
    require(count > 0, "WriteFile.count must be > 0")
  }

  /**
   * A write command which aggregates two other write commands. Using this construct
   * you can chain a number of [[Write]] and/or [[WriteFile]] commands together in a way
   * that allows them to be handled as a single write which gets written out to the
   * network as quickly as possible.
   * If the sub commands contain `ack` requests they will be honored as soon as the
   * respective write has been written completely.
   */
  final case class CompoundWrite(override val head: SimpleWriteCommand, tailCommand: WriteCommand) extends WriteCommand
    with immutable.Iterable[SimpleWriteCommand] {

    def iterator: Iterator[SimpleWriteCommand] =
      new Iterator[SimpleWriteCommand] {
        private[this] var current: WriteCommand = CompoundWrite.this
        def hasNext: Boolean = current ne null
        def next(): SimpleWriteCommand =
          current match {
            case null                  ⇒ Iterator.empty.next()
            case CompoundWrite(h, t)   ⇒ { current = t; h }
            case x: SimpleWriteCommand ⇒ { current = null; x }
          }
      }
  }

  /**
   * When `useResumeWriting` is in effect as was indicated in the [[Register]] message
   * then this command needs to be sent to the connection actor in order to re-enable
   * writing after a [[CommandFailed]] event. All [[WriteCommand]] processed by the
   * connection actor between the first [[CommandFailed]] and subsequent reception of
   * this message will also be rejected with [[CommandFailed]].
   */
  case object ResumeWriting extends Command

  /**
   * Sending this command to the connection actor will disable reading from the TCP
   * socket. TCP flow-control will then propagate backpressure to the sender side
   * as buffers fill up on either end. To re-enable reading send [[ResumeReading]].
   */
  case object SuspendReading extends Command

  /**
   * This command needs to be sent to the connection actor after a [[SuspendReading]]
   * command in order to resume reading from the socket.
   */
  case object ResumeReading extends Command

  /**
   * This message enables the accepting of the next connection if read throttling is enabled
   * for connection actors.
   * @param batchSize The number of connections to accept before waiting for the next resume command
   */
  final case class ResumeAccepting(batchSize: Int) extends Command

  /// EVENTS
  /**
   * Common interface for all events generated by the TCP layer actors.
   */
  trait Event extends Message

  /**
   * Whenever data are read from a socket they will be transferred within this
   * class to the handler actor which was designated in the [[Register]] message.
   */
  final case class Received(data: ByteString) extends Event

  /**
   * The connection actor sends this message either to the sender of a [[Connect]]
   * command (for outbound) or to the handler for incoming connections designated
   * in the [[Bind]] message. The connection is characterized by the `remoteAddress`
   * and `localAddress` TCP endpoints.
   */
  final case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event

  /**
   * Whenever a command cannot be completed, the queried actor will reply with
   * this message, wrapping the original command which failed.
   */
  final case class CommandFailed(cmd: Command) extends Event

  /**
   * When `useResumeWriting` is in effect as indicated in the [[Register]] message,
   * the [[ResumeWriting]] command will be acknowledged by this message type, upon
   * which it is safe to send at least one write. This means that all writes preceding
   * the first [[CommandFailed]] message have been enqueued to the O/S kernel at this
   * point.
   */
  sealed trait WritingResumed extends Event
  case object WritingResumed extends WritingResumed

  /**
   * The sender of a [[Bind]] command will—in case of success—receive confirmation
   * in this form. If the bind address indicated a 0 port number, then the contained
   * `localAddress` can be used to find out which port was automatically assigned.
   */
  final case class Bound(localAddress: InetSocketAddress) extends Event

  /**
   * The sender of an [[Unbind]] command will receive confirmation through this
   * message once the listening socket has been closed.
   */
  sealed trait Unbound extends Event
  case object Unbound extends Unbound

  /**
   * This is the common interface for all events which indicate that a connection
   * has been closed or half-closed.
   */
  sealed trait ConnectionClosed extends Event {
    /**
     * `true` iff the connection has been closed in response to an [[Abort]] command.
     */
    def isAborted: Boolean = false
    /**
     * `true` iff the connection has been fully closed in response to a
     * [[ConfirmedClose]] command.
     */
    def isConfirmed: Boolean = false
    /**
     * `true` iff the connection has been closed by the peer; in case
     * `keepOpenOnPeerClosed` is in effect as per the [[Register]] command,
     * this connection’s reading half is now closed.
     */
    def isPeerClosed: Boolean = false
    /**
     * `true` iff the connection has been closed due to an IO error.
     */
    def isErrorClosed: Boolean = false
    /**
     * If `isErrorClosed` returns true, then the error condition can be
     * retrieved by this method.
     */
    def getErrorCause: String = null
  }
  /**
   * The connection has been closed normally in response to a [[Close]] command.
   */
  case object Closed extends ConnectionClosed
  /**
   * The connection has been aborted in response to an [[Abort]] command.
   */
  case object Aborted extends ConnectionClosed {
    override def isAborted = true
  }
  /**
   * The connection has been half-closed by us and then half-close by the peer
   * in response to a [[ConfirmedClose]] command.
   */
  case object ConfirmedClosed extends ConnectionClosed {
    override def isConfirmed = true
  }
  /**
   * The peer has closed its writing half of the connection.
   */
  case object PeerClosed extends ConnectionClosed {
    override def isPeerClosed = true
  }
  /**
   * The connection has been closed due to an IO error.
   */
  final case class ErrorClosed(cause: String) extends ConnectionClosed {
    override def isErrorClosed = true
    override def getErrorCause = cause
  }
}

class TcpExt(system: ExtendedActorSystem) extends IO.Extension {

  val Settings = new Settings(system.settings.config.getConfig("akka.io.tcp"))
  class Settings private[TcpExt] (_config: Config) extends SelectionHandlerSettings(_config) {
    import akka.util.Helpers.ConfigOps
    import _config._

    val NrOfSelectors: Int = getInt("nr-of-selectors") requiring (_ > 0, "nr-of-selectors must be > 0")

    val BatchAcceptLimit: Int = getInt("batch-accept-limit") requiring (_ > 0, "batch-accept-limit must be > 0")
    val DirectBufferSize: Int = getIntBytes("direct-buffer-size")
    val MaxDirectBufferPoolSize: Int = getInt("direct-buffer-pool-limit")
    val RegisterTimeout: Duration = getString("register-timeout") match {
      case "infinite" ⇒ Duration.Undefined
      case x          ⇒ _config.getMillisDuration("register-timeout")
    }
    val ReceivedMessageSizeLimit: Int = getString("max-received-message-size") match {
      case "unlimited" ⇒ Int.MaxValue
      case x           ⇒ getIntBytes("received-message-size-limit")
    }
    val ManagementDispatcher: String = getString("management-dispatcher")
    val FileIODispatcher: String = getString("file-io-dispatcher")
    val TransferToLimit: Int = getString("file-io-transferTo-limit") match {
      case "unlimited" ⇒ Int.MaxValue
      case _           ⇒ getIntBytes("file-io-transferTo-limit")
    }

    val MaxChannelsPerSelector: Int = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1)
    val FinishConnectRetries: Int = getInt("finish-connect-retries") requiring (_ > 0,
      "finish-connect-retries must be > 0")

    private[this] def getIntBytes(path: String): Int = {
      val size = getBytes(path)
      require(size < Int.MaxValue, s"$path must be < 2 GiB")
      require(size >= 0, s"$path must be non-negative")
      size.toInt
    }
  }

  /**
   *
   */
  val manager: ActorRef = {
    system.systemActorOf(
      props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher).withDeploy(Deploy.local),
      name = "IO-TCP")
  }

  /**
   * Java API: retrieve a reference to the manager actor.
   */
  def getManager: ActorRef = manager

  val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize)
  val fileIoDispatcher = system.dispatchers.lookup(Settings.FileIODispatcher)
}

/**
 * Java API for accessing socket options.
 */
object TcpSO extends SoJavaFactories {
  import Tcp.SO._

  /**
   * [[akka.io.Inet.SocketOption]] to enable or disable SO_KEEPALIVE
   *
   * For more information see [[java.net.Socket.setKeepAlive]]
   */
  def keepAlive(on: Boolean) = KeepAlive(on)

  /**
   * [[akka.io.Inet.SocketOption]] to enable or disable OOBINLINE (receipt
   * of TCP urgent data) By default, this option is disabled and TCP urgent
   * data is silently discarded.
   *
   * For more information see [[java.net.Socket.setOOBInline]]
   */
  def oobInline(on: Boolean) = OOBInline(on)

  /**
   * [[akka.io.Inet.SocketOption]] to enable or disable TCP_NODELAY
   * (disable or enable Nagle's algorithm)
   *
   * Please note, that TCP_NODELAY is enabled by default.
   *
   * For more information see [[java.net.Socket.setTcpNoDelay]]
   */
  def tcpNoDelay(on: Boolean) = TcpNoDelay(on)
}

object TcpMessage {
  import language.implicitConversions
  import Tcp._

  /**
   * The Connect message is sent to the TCP manager actor, which is obtained via
   * [[TcpExt#getManager]]. Either the manager replies with a [[Tcp.CommandFailed]]
   * or the actor handling the new connection replies with a [[Tcp.Connected]]
   * message.
   *
   * @param remoteAddress is the address to connect to
   * @param localAddress optionally specifies a specific address to bind to
   * @param options Please refer to [[TcpSO]] for a list of all supported options.
   * @param timeout is the desired connection timeout, `null` means "no timeout"
   * @param pullMode enables pull based reading from the connection
   */
  def connect(remoteAddress: InetSocketAddress,
              localAddress: InetSocketAddress,
              options: JIterable[SocketOption],
              timeout: FiniteDuration,
              pullMode: Boolean): Command = Connect(remoteAddress, Option(localAddress), options, Option(timeout), pullMode)

  /**
   * Connect to the given `remoteAddress` without binding to a local address and without
   * specifying options.
   */
  def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil, None, pullMode = false)

  /**
   * The Bind message is send to the TCP manager actor, which is obtained via
   * [[TcpExt#getManager]] in order to bind to a listening socket. The manager
   * replies either with a [[Tcp.CommandFailed]] or the actor handling the listen
   * socket replies with a [[Tcp.Bound]] message. If the local port is set to 0 in
   * the Bind message, then the [[Tcp.Bound]] message should be inspected to find
   * the actual port which was bound to.
   *
   * @param handler The actor which will receive all incoming connection requests
   *                in the form of [[Tcp.Connected]] messages.
   *
   * @param localAddress The socket address to bind to; use port zero for
   *                automatic assignment (i.e. an ephemeral port, see [[Bound]])
   *
   * @param backlog This specifies the number of unaccepted connections the O/S
   *                kernel will hold for this port before refusing connections.
   *
   * @param options Please refer to [[TcpSO]] for a list of all supported options.
   *
   * @param pullMode enables pull based accepting and of connections and pull
   *                 based reading from the accepted connections.
   */
  def bind(handler: ActorRef,
           endpoint: InetSocketAddress,
           backlog: Int,
           options: JIterable[SocketOption],
           pullMode: Boolean): Command = Bind(handler, endpoint, backlog, options, pullMode)
  /**
   * Open a listening socket without specifying options.
   */
  def bind(handler: ActorRef,
           endpoint: InetSocketAddress,
           backlog: Int): Command = Bind(handler, endpoint, backlog, Nil)

  /**
   * This message must be sent to a TCP connection actor after receiving the
   * [[Tcp.Connected]] message. The connection will not read any data from the
   * socket until this message is received, because this message defines the
   * actor which will receive all inbound data.
   *
   * @param handler The actor which will receive all incoming data and which
   *                will be informed when the connection is closed.
   *
   * @param keepOpenOnPeerClosed If this is set to true then the connection
   *                is not automatically closed when the peer closes its half,
   *                requiring an explicit [[Tcp.Closed]] from our side when finished.
   *
   * @param useResumeWriting If this is set to true then the connection actor
   *                will refuse all further writes after issuing a [[Tcp.CommandFailed]]
   *                notification until [[Tcp.ResumeWriting]] is received. This can
   *                be used to implement NACK-based write backpressure.
   */
  def register(handler: ActorRef, keepOpenOnPeerClosed: Boolean, useResumeWriting: Boolean): Command =
    Register(handler, keepOpenOnPeerClosed, useResumeWriting)
  /**
   * The same as `register(handler, false, false)`.
   */
  def register(handler: ActorRef): Command = Register(handler)

  /**
   * In order to close down a listening socket, send this message to that socket’s
   * actor (that is the actor which previously had sent the [[Tcp.Bound]] message). The
   * listener socket actor will reply with a [[Tcp.Unbound]] message.
   */
  def unbind: Command = Unbind

  /**
   * A normal close operation will first flush pending writes and then close the
   * socket. The sender of this command and the registered handler for incoming
   * data will both be notified once the socket is closed using a [[Tcp.Closed]]
   * message.
   */
  def close: Command = Close

  /**
   * A confirmed close operation will flush pending writes and half-close the
   * connection, waiting for the peer to close the other half. The sender of this
   * command and the registered handler for incoming data will both be notified
   * once the socket is closed using a [[Tcp.ConfirmedClosed]] message.
   */
  def confirmedClose: Command = ConfirmedClose

  /**
   * An abort operation will not flush pending writes and will issue a TCP ABORT
   * command to the O/S kernel which should result in a TCP_RST packet being sent
   * to the peer. The sender of this command and the registered handler for
   * incoming data will both be notified once the socket is closed using a
   * [[Tcp.Aborted]] message.
   */
  def abort: Command = Abort

  /**
   * Each [[Tcp.WriteCommand]] can optionally request a positive acknowledgment to be sent
   * to the commanding actor. If such notification is not desired the [[Tcp.WriteCommand#ack]]
   * must be set to an instance of this class. The token contained within can be used
   * to recognize which write failed when receiving a [[Tcp.CommandFailed]] message.
   */
  def noAck(token: AnyRef): NoAck = NoAck(token)
  /**
   * Default [[Tcp.NoAck]] instance which is used when no acknowledgment information is
   * explicitly provided. Its “token” is `null`.
   */
  def noAck: NoAck = NoAck

  /**
   * Write data to the TCP connection. If no ack is needed use the special
   * `NoAck` object. The connection actor will reply with a [[Tcp.CommandFailed]]
   * message if the write could not be enqueued. If [[Tcp.WriteCommand#wantsAck]]
   * returns true, the connection actor will reply with the supplied [[Tcp.WriteCommand#ack]]
   * token once the write has been successfully enqueued to the O/S kernel.
   * <b>Note that this does not in any way guarantee that the data will be
   * or have been sent!</b> Unfortunately there is no way to determine whether
   * a particular write has been sent by the O/S.
   */
  def write(data: ByteString, ack: Event): Command = Write(data, ack)
  /**
   * The same as `write(data, noAck())`.
   */
  def write(data: ByteString): Command = Write(data)

  /**
   * Write `count` bytes starting at `position` from file at `filePath` to the connection.
   * The count must be > 0. The connection actor will reply with a [[Tcp.CommandFailed]]
   * message if the write could not be enqueued. If [[Tcp.WriteCommand#wantsAck]]
   * returns true, the connection actor will reply with the supplied [[Tcp.WriteCommand#ack]]
   * token once the write has been successfully enqueued to the O/S kernel.
   * <b>Note that this does not in any way guarantee that the data will be
   * or have been sent!</b> Unfortunately there is no way to determine whether
   * a particular write has been sent by the O/S.
   */
  def writeFile(filePath: String, position: Long, count: Long, ack: Event): Command =
    WriteFile(filePath, position, count, ack)

  /**
   * When `useResumeWriting` is in effect as was indicated in the [[Tcp.Register]] message
   * then this command needs to be sent to the connection actor in order to re-enable
   * writing after a [[Tcp.CommandFailed]] event. All [[Tcp.WriteCommand]] processed by the
   * connection actor between the first [[Tcp.CommandFailed]] and subsequent reception of
   * this message will also be rejected with [[Tcp.CommandFailed]].
   */
  def resumeWriting: Command = ResumeWriting

  /**
   * Sending this command to the connection actor will disable reading from the TCP
   * socket. TCP flow-control will then propagate backpressure to the sender side
   * as buffers fill up on either end. To re-enable reading send [[Tcp.ResumeReading]].
   */
  def suspendReading: Command = SuspendReading

  /**
   * This command needs to be sent to the connection actor after a [[Tcp.SuspendReading]]
   * command in order to resume reading from the socket.
   */
  def resumeReading: Command = ResumeReading

  /**
   * This message enables the accepting of the next connection if pull reading is enabled
   * for connection actors.
   * @param batchSize The number of connections to accept before waiting for the next resume command
   */
  def resumeAccepting(batchSize: Int): Command = ResumeAccepting(batchSize)

  implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
    akka.japi.Util.immutableSeq(coll)
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka Tcp.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.