alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ZeroMQExtension.scala)

This example Akka source code file (ZeroMQExtension.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actorref, actorsystem, akka, array, collection, concurrent, dispatch, duration, int, newsockettimeout, props, socketoption, time, zeromqextension, zeromqversion, zmqexception

The ZeroMQExtension.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.zeromq

import org.zeromq.{ ZMQ ⇒ JZMQ }
import org.zeromq.ZMQ.Poller
import akka.actor._
import akka.pattern.ask
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit
import akka.util.Timeout
import akka.util.Helpers.ConfigOps
import org.zeromq.ZMQException
import scala.concurrent.duration.FiniteDuration
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }

/**
 * A Model to represent a version of the zeromq library
 * @param major
 * @param minor
 * @param patch
 */
final case class ZeroMQVersion(major: Int, minor: Int, patch: Int) {
  override def toString: String = "%d.%d.%d".format(major, minor, patch)
}

/**
 * The [[akka.actor.ExtensionId]] and [[akka.actor.ExtensionIdProvider]] for the ZeroMQ module
 */
object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider {
  override def get(system: ActorSystem): ZeroMQExtension = super.get(system)
  def lookup(): this.type = this
  override def createExtension(system: ExtendedActorSystem): ZeroMQExtension = new ZeroMQExtension(system)

  private val minVersionString = "2.1.0"
  private val minVersion = JZMQ.makeVersion(2, 1, 0)
}

/**
 * The extension for the ZeroMQ module
 *
 * @param system The ActorSystem this extension belongs to.
 */
class ZeroMQExtension(system: ActorSystem) extends Extension {

  val DefaultPollTimeout: FiniteDuration = system.settings.config.getMillisDuration("akka.zeromq.poll-timeout")
  val NewSocketTimeout: Timeout = Timeout(system.settings.config.getMillisDuration("akka.zeromq.new-socket-timeout"))

  val pollTimeUnit = if (version.major >= 3) TimeUnit.MILLISECONDS else TimeUnit.MICROSECONDS

  /**
   * The version of the ZeroMQ library
   * @return a [[akka.zeromq.ZeroMQVersion]]
   */
  def version: ZeroMQVersion = ZeroMQVersion(JZMQ.getMajorVersion, JZMQ.getMinorVersion, JZMQ.getPatchVersion)

  /**
   * Factory method to create the [[akka.actor.Props]] to build the ZeroMQ socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newSocketProps(socketParameters: SocketOption*): Props = {
    verifyZeroMQVersion()
    require(socketParameters exists {
      case s: SocketType.ZMQSocketType ⇒ true
      case _                           ⇒ false
    }, "A socket type is required")
    val params = socketParameters.to[immutable.Seq]
    Props(classOf[ConcurrentSocketActor], params).withDispatcher("akka.zeromq.socket-dispatcher")
  }

  /**
   * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Publisher socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newPubSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Pub +: socketParameters): _*)

  /**
   * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Subscriber socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newSubSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Sub +: socketParameters): _*)

  /**
   * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Dealer socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newDealerSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Dealer +: socketParameters): _*)

  /**
   * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Router socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newRouterSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Router +: socketParameters): _*)

  /**
   * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Push socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newPushSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Push +: socketParameters): _*)

  /**
   * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Pull socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newPullSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Pull +: socketParameters): _*)

  /**
   * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Req socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newReqSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Rep +: socketParameters): _*)

  /**
   * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Rep socket actor.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.Props]]
   */
  def newRepSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Req +: socketParameters): _*)

  /**
   * Factory method to create the actor representing the ZeroMQ socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
   * @return the [[akka.actor.ActorRef]]
   */
  def newSocket(socketParameters: SocketOption*): ActorRef = {
    implicit val timeout = NewSocketTimeout
    Await.result((zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef], timeout.duration)
  }

  /**
   * Java API factory method to create the actor representing the ZeroMQ Publisher socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.ActorRef]]
   */
  def newPubSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Pub +: socketParameters): _*)

  /**
   * Convenience for creating a publisher socket.
   */
  def newPubSocket(bind: Bind): ActorRef = newSocket(SocketType.Pub, bind)

  /**
   * Java API factory method to create the actor representing the ZeroMQ Subscriber socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.ActorRef]]
   */
  def newSubSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Sub +: socketParameters): _*)

  /**
   * Convenience for creating a subscriber socket.
   */
  def newSubSocket(connect: Connect, listener: Listener, subscribe: Subscribe): ActorRef = newSocket(SocketType.Sub, connect, listener, subscribe)

  /**
   * Java API factory method to create the actor representing the ZeroMQ Dealer socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.ActorRef]]
   */
  def newDealerSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Dealer +: socketParameters): _*)

  /**
   * Java API factory method to create the actor representing the ZeroMQ Router socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.ActorRef]]
   */
  def newRouterSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Router +: socketParameters): _*)

  /**
   * Java API factory method to create the actor representing the ZeroMQ Push socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.ActorRef]]
   */
  def newPushSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Push +: socketParameters): _*)

  /**
   * Java API factory method to create the actor representing the ZeroMQ Pull socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.ActorRef]]
   */
  def newPullSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Pull +: socketParameters): _*)

  /**
   * Java API factory method to create the actor representing the ZeroMQ Req socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
   * @return the [[akka.actor.ActorRef]]
   */
  def newReqSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Req +: socketParameters): _*)

  /**
   * Java API factory method to create the actor representing the ZeroMQ Rep socket.
   * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
   * They are matched on type and the first one found wins.
   *
   * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socke
   * @return the [[akka.actor.ActorRef]]
   */
  def newRepSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Rep +: socketParameters): _*)

  private val zeromqGuardian: ActorRef = {
    verifyZeroMQVersion()

    system.actorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
      import SupervisorStrategy._
      override def supervisorStrategy = OneForOneStrategy() {
        case ex: ZMQException if nonfatal(ex) ⇒ Resume
        case _                                ⇒ Stop
      }

      private def nonfatal(ex: ZMQException) = ex.getErrorCode match {
        case org.zeromq.ZeroMQ.EFSM | 45 /* ENOTSUP */ ⇒ true
        case _                                         ⇒ false
      }

      def receive = { case p: Props ⇒ sender() ! context.actorOf(p) }
    }), "zeromq")
  }

  private def verifyZeroMQVersion(): Unit = {
    require(
      JZMQ.getFullVersion > ZeroMQExtension.minVersion,
      "Unsupported ZeroMQ version: %s, akka needs at least: %s".format(JZMQ.getVersionString, ZeroMQExtension.minVersionString))
  }
}

Other Akka source code examples

Here is a short list of links related to this Akka ZeroMQExtension.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.