alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (SocketOption.scala)

This example Akka source code file (SocketOption.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, akka, bytestring, collection, concurrent, duration, linger, long, request, socketmeta, socketoption, socketoptionquery, string, subscribe, time, zmqmessage

The SocketOption.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.zeromq

import com.google.protobuf.Message
import akka.actor.ActorRef
import scala.concurrent.duration._
import scala.collection.immutable
import org.zeromq.{ ZMQ ⇒ JZMQ }
import org.zeromq.ZMQ.{ Poller, Socket }
import akka.japi.Util.immutableSeq
import akka.util.ByteString
import akka.util.Collections.EmptyImmutableSeq
import annotation.varargs

/**
 * Marker trait representing request messages for zeromq
 */
sealed trait Request

/**
 * Marker trait representing the base for all socket options
 */
sealed trait SocketOption extends Request

/**
 * Marker trait representing the base for all meta operations for a socket
 * such as the context, listener, socket type and poller dispatcher
 */
sealed trait SocketMeta extends SocketOption

/**
 * A base trait for connection options for a ZeroMQ socket
 */
sealed trait SocketConnectOption extends SocketOption {
  def endpoint: String
}

/**
 * A base trait for pubsub options for the ZeroMQ socket
 */
sealed trait PubSubOption extends SocketOption {
  def payload: ByteString
}

/**
 * A marker trait to group option queries together
 */
sealed trait SocketOptionQuery extends Request

/**
 * This socket should be a client socket and connect to the specified endpoint
 *
 * @param endpoint URI (ex. tcp://127.0.0.1:5432)
 */
final case class Connect(endpoint: String) extends SocketConnectOption

/**
 * Companion object for a ZeroMQ I/O thread pool
 */
object Context {
  def apply(numIoThreads: Int = 1): Context = new Context(numIoThreads)
}

/**
 * Represents an I/O thread pool for ZeroMQ sockets.
 * By default the ZeroMQ module uses an I/O thread pool with 1 thread.
 * For most applications that should be sufficient
 *
 * @param numIoThreads
 */
class Context(numIoThreads: Int) extends SocketMeta {
  private val context = JZMQ.context(numIoThreads)

  def socket(socketType: SocketType.ZMQSocketType): Socket = context.socket(socketType.id)

  def poller: Poller = context.poller

  def term(): Unit = context.term()
}

/**
 * A base trait for message deserializers
 */
trait Deserializer extends SocketOption {
  def apply(frames: immutable.Seq[ByteString]): Any
}

/**
 * The different socket types you can create with zeromq
 */
object SocketType {

  abstract class ZMQSocketType(val id: Int) extends SocketMeta

  /**
   * A Publisher socket
   */
  object Pub extends ZMQSocketType(JZMQ.PUB)

  /**
   * A subscriber socket
   */
  object Sub extends ZMQSocketType(JZMQ.SUB)

  /**
   * A dealer socket
   */
  object Dealer extends ZMQSocketType(JZMQ.DEALER)

  /**
   * A router socket
   */
  object Router extends ZMQSocketType(JZMQ.ROUTER)

  /**
   * A request socket
   */
  object Req extends ZMQSocketType(JZMQ.REQ)

  /**
   * A reply socket
   */
  object Rep extends ZMQSocketType(JZMQ.REP)

  /**
   * A push socket
   */
  object Push extends ZMQSocketType(JZMQ.PUSH)

  /**
   * A pull socket
   */
  object Pull extends ZMQSocketType(JZMQ.PULL)

  /**
   * A Pair socket
   */
  object Pair extends ZMQSocketType(JZMQ.PAIR)
}

/**
 * An option containing the listener for the socket
 * @param listener
 */
final case class Listener(listener: ActorRef) extends SocketMeta

/**
 * An option containing the configuration key for the poller loop dispatcher
 * @param name
 */
final case class PollDispatcher(name: String) extends SocketMeta

/**
 * An option containing the duration a poll cycle should wait for a message before it loops
 * @param duration
 */
final case class PollTimeoutDuration(duration: FiniteDuration = 100 millis) extends SocketMeta

/**
 * Start listening with this server socket on the specified address
 *
 * @param endpoint
 */
final case class Bind(endpoint: String) extends SocketConnectOption

/**
 * The [[akka.zeromq.Subscribe]] option establishes a new message filter on a [[akka.zeromq.SocketType.Pub]] socket.
 * Newly created [[akka.zeromq.SocketType.Sub]] sockets filter out all incoming messages,
 * therefore you should send this option to establish an initial message filter.
 *
 * An empty payload of length zero will subscribe to all incoming messages.
 * A non-empty payload will subscribe to all messages beginning with the specified prefix.
 * Multiple filters may be attached to a single [[akka.zeromq.SocketType.Sub]] socket,
 * in which case a message will be accepted if it matches at least one filter.
 *
 * @param payload the topic to subscribe to
 */
final case class Subscribe(payload: ByteString) extends PubSubOption {
  def this(topic: String) = this(ByteString(topic))
}
object Subscribe {
  val all = Subscribe(ByteString.empty)
  def apply(topic: String): Subscribe = topic match {
    case null | "" ⇒ all
    case t         ⇒ new Subscribe(t)
  }
}

/**
 * The [[akka.zeromq.Unsubscribe]] option shall remove an existing message filter
 * on a [[akka.zeromq.SocketType.Sub]] socket. The filter specified must match an existing filter
 * previously established with the [[akka.zeromq.Subscribe]] option. If the socket has several instances of the
 * same filter attached the [[akka.zeromq.Unsubscribe]] option shall remove only one instance, leaving the rest
 * in place and functional.
 *
 * @param payload
 */
final case class Unsubscribe(payload: ByteString) extends PubSubOption {
  def this(topic: String) = this(ByteString(topic))
}
object Unsubscribe {
  def apply(topic: String): Unsubscribe = new Unsubscribe(topic)
}

/**
 * Send a message over the zeromq socket
 * @param frames
 */
final case class Send(frames: immutable.Seq[ByteString]) extends Request

/**
 * A message received over the zeromq socket
 * @param frames
 */
final case class ZMQMessage(frames: immutable.Seq[ByteString]) {
  def frame(frameIndex: Int): ByteString = frames(frameIndex)
}
object ZMQMessage {
  val empty = new ZMQMessage(EmptyImmutableSeq)

  /**
   * Scala API
   * @param frames the frames of the returned ZMQMessage
   * @return a ZMQMessage with the given frames
   */
  def apply(frames: ByteString*): ZMQMessage =
    if ((frames eq null) || frames.length == 0) empty else new ZMQMessage(frames.to[immutable.Seq])

  /**
   * Java API: create a message from the given frames
   *
   * @param frames the frames of the returned ZMQMessage
   * @return a ZMQMessage with the given frames
   */
  @varargs def withFrames(frames: ByteString*): ZMQMessage = apply(frames: _*)

  def apply[T](frames: T*)(implicit converter: T ⇒ ByteString): ZMQMessage = apply(frames map converter: _*)
}

/**
 * Configure this socket to have a linger of the specified value
 *
 * The linger period determines how long pending messages which have yet to be sent to a peer shall linger
 * in memory after a socket is closed, and further affects the termination of the socket's context.
 *
 * The following outlines the different behaviours:
 * <ul>
 *   <li>The default value of -1 specifies an infinite linger period.
 *     Pending messages shall not be discarded after the socket is closed;
 *     attempting to terminate the socket's context shall block until all pending messages
 *     have been sent to a peer.</li>
 *   <li>The value of 0 specifies no linger period. Pending messages shall be discarded immediately when the socket is closed.</li>
 *   <li>Positive values specify an upper bound for the linger period in milliseconds.
 *     Pending messages shall not be discarded after the socket is closed;
 *     attempting to terminate the socket's context shall block until either all pending messages have been sent to a peer,
 *     or the linger period expires, after which any pending messages shall be discarded.</li>
 * </ul>
 *
 * @param value The value in milliseconds for the linger option
 */
final case class Linger(value: Long) extends SocketOption

/**
 * Gets the linger option @see [[akka.zeromq.Linger]]
 */
object Linger extends SocketOptionQuery {
  val no: Linger = Linger(0)
}

/**
 * Sets the recovery interval for multicast transports using the specified socket.
 * The recovery interval determines the maximum time in seconds that a receiver can be absent from a multicast group
 * before unrecoverable data loss will occur.
 *
 * Exercise care when setting large recovery intervals as the data needed for recovery will be held in memory.
 * For example, a 1 minute recovery interval at a data rate of 1Gbps requires a 7GB in-memory buffer.
 *
 * @param value The interval in seconds
 */
final case class ReconnectIVL(value: Long) extends SocketOption

/**
 * Gets the recover interval @see [[akka.zeromq.ReconnectIVL]]
 */
object ReconnectIVL extends SocketOptionQuery

/**
 * The [[akka.zeromq.ReconnectIVLMax]] option shall set the maximum reconnection interval for the specified socket.
 * This is the maximum period ØMQ shall wait between attempts to reconnect. On each reconnect attempt,
 * the previous interval shall be doubled untill [[akka.zeromq.ReconnectIVLMax]] is reached.
 * This allows for exponential backoff strategy. Default value means no exponential backoff is performed
 * and reconnect interval calculations are only based on [[akka.zeromq.ReconnectIVL]].
 *
 * @see [[akka.zeromq.ReconnectIVL]]
 *
 * This is a ZeroMQ 3.0 option
 *
 * @param value
 */
final case class ReconnectIVLMax(value: Long) extends SocketOption
/**
 * Gets the max reconnect IVL
 * @see [[akka.zeromq.ReconnectIVLMax]]
 */
object ReconnectIVLMax extends SocketOptionQuery

/**
 * The [[akka.zeromq.Backlog]] option shall set the maximum length of the queue of outstanding peer connections
 * for the specified socket; this only applies to connection-oriented transports. For details refer to your
 * operating system documentation for the listen function.
 *
 * @param value
 */
final case class Backlog(value: Long) extends SocketOption
/**
 * Gets the backlog
 * @see [[akka.zeromq.Backlog]]
 */
object Backlog extends SocketOptionQuery

/**
 * Limits the size of the inbound message.
 * If a peer sends a message larger than [[akka.zeromq.MaxMsgSize]] it is disconnected.
 * Value of -1 means no limit.
 *
 * This is a ZeroMQ 3.0 option
 *
 * @param value
 */
final case class MaxMsgSize(value: Long) extends SocketOption
object MaxMsgSize extends SocketOptionQuery

/**
 * The [[akka.zeromq.SendHighWatermark]] option shall set the high water mark for outbound messages on the specified socket.
 * The high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue in memory
 * for any single peer that the specified socket is communicating with.
 *
 * If this limit has been reached the socket shall enter an exceptional state and depending on the socket type,
 * ØMQ shall take appropriate action such as blocking or dropping sent messages.
 *
 * This is a ZeroMQ 3.0 option
 *
 * @param value
 */
final case class SendHighWatermark(value: Long) extends SocketOption

/**
 * Gets the SendHWM
 * @see [[akka.zeromq.SendHighWatermark]]
 */
object SendHighWatermark extends SocketOptionQuery

/**
 * The [[akka.zeromq.ReceiveHighWatermark]] option shall set the high water mark for inbound messages on the specified socket.
 * The high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue
 * in memory for any single peer that the specified socket is communicating with.
 *
 * If this limit has been reached the socket shall enter an exceptional state and depending on the socket type,
 * ØMQ shall take appropriate action such as blocking or dropping sent messages.
 *
 * This is a ZeroMQ 3.0 option
 *
 * @param value
 */
final case class ReceiveHighWatermark(value: Long) extends SocketOption

/**
 * Gets the ReceiveHighWatermark
 * @see [[akka.zeromq.ReceiveHighWatermark]]
 */
object ReceiveHighWatermark extends SocketOptionQuery

/**
 * The [[akka.zeromq.HighWatermark]] option shall set the high water mark for the specified socket.
 * The high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue in memory for
 * any single peer that the specified socket is communicating with.
 *
 * If this limit has been reached the socket shall enter an exceptional state and depending on the socket type,
 * ØMQ shall take appropriate action such as blocking or dropping sent messages.
 * The default [[akka.zeromq.HighWatermark]] value of zero means "no limit".
 *
 * @param value
 */
final case class HighWatermark(value: Long) extends SocketOption

/**
 * The [[akka.zeromq.Swap]] option shall set the disk offload (swap) size for the specified socket.
 * A socket which has [[akka.zeromq.Swap]] set to a non-zero value may exceed its high water mark;
 * in this case outstanding messages shall be offloaded to storage on disk rather than held in memory.
 *
 * The value of [[akka.zeromq.Swap]] defines the maximum size of the swap space in bytes.
 *
 * @param value
 */
final case class Swap(value: Long) extends SocketOption

/**
 * Gets the [[akka.zeromq.Swap]]
 *
 * @see [[akka.zeromq.Swap]]
 */
object Swap extends SocketOptionQuery

/**
 * The [[akka.zeromq.Affinity]] option shall set the I/O thread affinity for newly created connections on the specified socket.
 *
 * Affinity determines which threads from the ØMQ I/O thread pool associated with the socket's context shall handle
 * newly created connections. A value of zero specifies no affinity, meaning that work shall be distributed fairly
 * among all ØMQ I/O threads in the thread pool. For non-zero values, the lowest bit corresponds to thread 1,
 * second lowest bit to thread 2 and so on. For example, a value of 3 specifies that subsequent connections
 * on socket shall be handled exclusively by I/O threads 1 and 2.
 *
 * @param value
 */
final case class Affinity(value: Long) extends SocketOption

/**
 * Gets the [[akka.zeromq.Affinity]] value
 */
object Affinity extends SocketOptionQuery

/**
 * Sets the identity of the specified socket. Socket identity determines if existing ØMQ infrastructure
 * (message queues, forwarding devices) shall be identified with a specific application and persist across multiple
 * runs of the application.
 *
 * If the socket has no identity, each run of an application is completely separate from other runs.
 * However, with identity set the socket shall re-use any existing ØMQ infrastructure configured by the previous run(s).
 * Thus the application may receive messages that were sent in the meantime, message queue limits shall be shared
 * with previous run(s) and so on.
 *
 * Identity should be at least one byte and at most 255 bytes long.
 * Identities starting with binary zero are reserved for use by ØMQ infrastructure.
 *
 * @param value The identity string for this socket
 */
final case class Identity(value: Array[Byte]) extends SocketOption

/**
 * Gets the [[akka.zeromq.Identity]] value
 */
object Identity extends SocketOptionQuery

/**
 * Sets the maximum send or receive data rate for multicast transports such as pgm using the specified socket.
 *
 * @param value The kilobits per second
 */
final case class Rate(value: Long) extends SocketOption

/**
 * Gets the send or receive rate for the socket
 */
object Rate extends SocketOptionQuery

/**
 * Sets the recovery interval for multicast transports using the specified socket.
 * The recovery interval determines the maximum time in seconds that a receiver can be absent from a multicast group
 * before unrecoverable data loss will occur.
 *
 * Exercise care when setting large recovery intervals as the data needed for recovery will be held in memory.
 * For example, a 1 minute recovery interval at a data rate of 1Gbps requires a 7GB in-memory buffer.
 *
 * @param value The interval in seconds
 */
final case class RecoveryInterval(value: Long) extends SocketOption

/**
 * Gets the [[akka.zeromq.RecoveryInterval]]
 */
object RecoveryInterval extends SocketOptionQuery

/**
 * Controls whether data sent via multicast transports using the specified socket can also be received by the sending
 * host via loop-back. A value of zero disables the loop-back functionality, while the default value of 1 enables the
 * loop-back functionality. Leaving multicast loop-back enabled when it is not required can have a negative impact
 * on performance. Where possible, disable McastLoop in production environments.
 *
 * @param value Flag indicating whether or not loopback multicast is enabled
 */
final case class MulticastLoop(value: Boolean) extends SocketOption

/**
 * Gets the [[akka.zeromq.MulticastLoop]]
 */
object MulticastLoop extends SocketOptionQuery

/**
 * Sets the time-to-live field in every multicast packet sent from this socket.
 * The default is 1 which means that the multicast packets don't leave the local network.
 *
 * This is za ZeroMQ 3.0 option
 *
 * @param value
 */
final case class MulticastHops(value: Long) extends SocketOption

/**
 * Gets the [[akka.zeromq.MulticastHops]]
 */
object MulticastHops extends SocketOptionQuery

/**
 * The [[akka.zeromq.SendBufferSize]] option shall set the underlying kernel transmit buffer size for the socket to
 * the specified size in bytes. A value of zero means leave the OS default unchanged.
 * For details please refer to your operating system documentation for the SO_SNDBUF socket option.
 *
 * This is a ZeroMQ 2.x only option
 *
 * @param value
 */
final case class SendBufferSize(value: Long) extends SocketOption

/**
 * Gets the [[akka.zeromq.SendBufferSize]]
 */
object SendBufferSize extends SocketOptionQuery

/**
 * The [[akka.zeromq.ReceiveBufferSize]] option shall set the underlying kernel receive buffer size for the socket to
 * the specified size in bytes. A value of zero means leave the OS default unchanged.
 * For details refer to your operating system documentation for the SO_RCVBUF socket option.
 * @param value
 */
final case class ReceiveBufferSize(value: Long) extends SocketOption

/**
 * Gets the [[akka.zeromq.ReceiveBufferSize]]
 */
object ReceiveBufferSize extends SocketOptionQuery

/**
 * Gets the file descriptor associated with the ZeroMQ socket
 */
object FileDescriptor extends SocketOptionQuery

Other Akka source code examples

Here is a short list of links related to this Akka SocketOption.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.