alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Akka/Scala example source code file (ClusterClient.scala)

This example Akka source code file (ClusterClient.scala) is included in my "Source Code Warehouse" project. The intent of this project is to help you more easily find Akka and Scala source code examples by using tags.

All credit for the original source code belongs to akka.io; I'm just trying to make examples easier to find. (For my Scala work, see my Scala examples and tutorials.)

Akka tags/keywords

actor, actorref, akka, cluster, clusterreceptionistextension, finiteduration, getcontacts, none, props, route, router, routing, serialversionuid, some, string, unit

The ClusterClient.scala Akka example source code

/**
 * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
 */
package akka.contrib.pattern

import java.net.URLEncoder
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.Identify
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.routing.ConsistentHash
import akka.routing.MurmurHash
import akka.actor.Stash
import akka.actor.Cancellable

object ClusterClient {

  /**
   * Scala API: Factory method for `ClusterClient` [[akka.actor.Props]].
   */
  def props(
    initialContacts: Set[ActorSelection],
    establishingGetContactsInterval: FiniteDuration = 3.second,
    refreshContactsInterval: FiniteDuration = 1.minute): Props =
    Props(classOf[ClusterClient], initialContacts, establishingGetContactsInterval, refreshContactsInterval).
      withMailbox("akka.contrib.cluster.client.mailbox")

  /**
   * Java API: Factory method for `ClusterClient` [[akka.actor.Props]].
   */
  def props(
    initialContacts: java.util.Set[ActorSelection],
    establishingGetContactsInterval: FiniteDuration,
    refreshContactsInterval: FiniteDuration): Props = {
    import scala.collection.JavaConverters._
    props(initialContacts.asScala.toSet, establishingGetContactsInterval, refreshContactsInterval)
  }

  /**
   * Java API: Factory method for `ClusterClient` [[akka.actor.Props]] with
   * default values.
   */
  def defaultProps(initialContacts: java.util.Set[ActorSelection]): Props = {
    import scala.collection.JavaConverters._
    props(initialContacts.asScala.toSet)
  }

  @SerialVersionUID(1L)
  final case class Send(path: String, msg: Any, localAffinity: Boolean) {
    /**
     * Convenience constructor with `localAffinity` false
     */
    def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
  }
  @SerialVersionUID(1L)
  final case class SendToAll(path: String, msg: Any)
  @SerialVersionUID(1L)
  final case class Publish(topic: String, msg: Any)

  /**
   * INTERNAL API
   */
  private[pattern] object Internal {
    case object RefreshContactsTick
  }
}

/**
 * This actor is intended to be used on an external node that is not member
 * of the cluster. It acts like a gateway for sending messages to actors
 * somewhere in the cluster. From the initial contact points it will establish
 * a connection to a [[ClusterReceptionist]] somewhere in the cluster. It will
 * monitor the connection to the receptionist and establish a new connection if
 * the link goes down. When looking for a new receptionist it uses fresh contact
 * points retrieved from previous establishment, or periodically refreshed
 * contacts, i.e. not necessarily the initial contact points.
 *
 * You can send messages via the `ClusterClient` to any actor in the cluster
 * that is registered in the [[ClusterReceptionist]].
 * Messages are wrapped in [[ClusterClient.Send]], [[ClusterClient.SendToAll]]
 * or [[ClusterClient.Publish]].
 *
 * 1. [[ClusterClient.Send]] -
 * The message will be delivered to one recipient with a matching path, if any such
 * exists. If several entries match the path the message will be delivered
 * to one random destination. The sender of the message can specify that local
 * affinity is preferred, i.e. the message is sent to an actor in the same local actor
 * system as the used receptionist actor, if any such exists, otherwise random to any other
 * matching entry.
 *
 * 2. [[ClusterClient.SendToAll]] -
 * The message will be delivered to all recipients with a matching path.
 *
 * 3. [[ClusterClient.Publish]] -
 * The message will be delivered to all recipients Actors that have been registered as subscribers to
 * to the named topic.
 *
 *  Use the factory method [[ClusterClient#props]]) to create the
 * [[akka.actor.Props]] for the actor.
 */
class ClusterClient(
  initialContacts: Set[ActorSelection],
  establishingGetContactsInterval: FiniteDuration,
  refreshContactsInterval: FiniteDuration)
  extends Actor with Stash with ActorLogging {

  import ClusterClient._
  import ClusterClient.Internal._
  import ClusterReceptionist.Internal._

  var contacts: immutable.IndexedSeq[ActorSelection] = initialContacts.toVector
  sendGetContacts()

  import context.dispatcher
  var refreshContactsTask: Option[Cancellable] = None
  scheduleRefreshContactsTick(establishingGetContactsInterval)
  self ! RefreshContactsTick

  def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = {
    refreshContactsTask foreach { _.cancel() }
    refreshContactsTask = Some(context.system.scheduler.schedule(
      interval, interval, self, RefreshContactsTick))
  }

  override def postStop(): Unit = {
    super.postStop()
    refreshContactsTask foreach { _.cancel() }
  }

  def receive = establishing

  def establishing: Actor.Receive = {
    case Contacts(contactPoints) ⇒
      if (contactPoints.nonEmpty) {
        contacts = contactPoints
        contacts foreach { _ ! Identify(None) }
      }
    case ActorIdentity(_, Some(receptionist)) ⇒
      context watch receptionist
      log.info("Connected to [{}]", receptionist.path)
      context.watch(receptionist)
      scheduleRefreshContactsTick(refreshContactsInterval)
      unstashAll()
      context.become(active(receptionist))
    case ActorIdentity(_, None) ⇒ // ok, use another instead
    case RefreshContactsTick    ⇒ sendGetContacts()
    case msg                    ⇒ stash()
  }

  def active(receptionist: ActorRef): Actor.Receive = {
    case Send(path, msg, localAffinity) ⇒
      receptionist forward DistributedPubSubMediator.Send(path, msg, localAffinity)
    case SendToAll(path, msg) ⇒
      receptionist forward DistributedPubSubMediator.SendToAll(path, msg)
    case Publish(topic, msg) ⇒
      receptionist forward DistributedPubSubMediator.Publish(topic, msg)
    case RefreshContactsTick ⇒
      receptionist ! GetContacts
    case Contacts(contactPoints) ⇒
      // refresh of contacts
      if (contactPoints.nonEmpty)
        contacts = contactPoints
    case Terminated(`receptionist`) ⇒
      log.info("Lost contact with [{}], restablishing connection", receptionist)
      sendGetContacts()
      scheduleRefreshContactsTick(establishingGetContactsInterval)
      context.become(establishing)
    case _: ActorIdentity ⇒ // ok, from previous establish, already handled
  }

  def sendGetContacts(): Unit = {
    if (contacts.isEmpty) initialContacts foreach { _ ! GetContacts }
    else if (contacts.size == 1) (initialContacts ++ contacts) foreach { _ ! GetContacts }
    else contacts foreach { _ ! GetContacts }
  }
}

/**
 * Extension that starts [[ClusterReceptionist]] and accompanying [[DistributedPubSubMediator]]
 * with settings defined in config section `akka.contrib.cluster.receptionist`.
 * The [[DistributedPubSubMediator]] is started by the [[DistributedPubSubExtension]].
 */
object ClusterReceptionistExtension extends ExtensionId[ClusterReceptionistExtension] with ExtensionIdProvider {
  override def get(system: ActorSystem): ClusterReceptionistExtension = super.get(system)

  override def lookup = ClusterReceptionistExtension

  override def createExtension(system: ExtendedActorSystem): ClusterReceptionistExtension =
    new ClusterReceptionistExtension(system)
}

class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extension {

  private val config = system.settings.config.getConfig("akka.contrib.cluster.receptionist")
  private val role: Option[String] = config.getString("role") match {
    case "" ⇒ None
    case r  ⇒ Some(r)
  }

  /**
   * Returns true if this member is not tagged with the role configured for the
   * receptionist.
   */
  def isTerminated: Boolean = Cluster(system).isTerminated || !role.forall(Cluster(system).selfRoles.contains)

  /**
   * Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]].
   */
  private def pubSubMediator: ActorRef = DistributedPubSubExtension(system).mediator

  /**
   * Register an actor that should be reachable for the clients.
   * The clients can send messages to this actor with `Send` or `SendToAll` using
   * the path elements of the `ActorRef`, e.g. `"/user/myservice"`.
   */
  def registerService(actor: ActorRef): Unit =
    pubSubMediator ! DistributedPubSubMediator.Put(actor)

  /**
   * A registered actor will be automatically unregistered when terminated,
   * but it can also be explicitly unregistered before termination.
   */
  def unregisterService(actor: ActorRef): Unit =
    pubSubMediator ! DistributedPubSubMediator.Remove(actor.path.toStringWithoutAddress)

  /**
   * Register an actor that should be reachable for the clients to a named topic.
   * Several actors can be registered to the same topic name, and all will receive
   * published messages.
   * The client can publish messages to this topic with `Publish`.
   */
  def registerSubscriber(topic: String, actor: ActorRef): Unit =
    pubSubMediator ! DistributedPubSubMediator.Subscribe(topic, actor)

  /**
   * A registered subscriber will be automatically unregistered when terminated,
   * but it can also be explicitly unregistered before termination.
   */
  def unregisterSubscriber(topic: String, actor: ActorRef): Unit =
    pubSubMediator ! DistributedPubSubMediator.Unsubscribe(topic, actor)

  /**
   * The [[ClusterReceptionist]] actor
   */
  private val receptionist: ActorRef = {
    if (isTerminated)
      system.deadLetters
    else {
      val numberOfContacts: Int = config.getInt("number-of-contacts")
      val responseTunnelReceiveTimeout =
        config.getDuration("response-tunnel-receive-timeout", MILLISECONDS).millis
      val name = config.getString("name")
      // important to use val mediator here to activate it outside of ClusterReceptionist constructor
      val mediator = pubSubMediator
      system.actorOf(ClusterReceptionist.props(mediator, role, numberOfContacts,
        responseTunnelReceiveTimeout), name)
    }
  }
}

object ClusterReceptionist {

  /**
   * Scala API: Factory method for `ClusterReceptionist` [[akka.actor.Props]].
   */
  def props(
    pubSubMediator: ActorRef,
    role: Option[String],
    numberOfContacts: Int = 3,
    responseTunnelReceiveTimeout: FiniteDuration = 30.seconds): Props =
    Props(classOf[ClusterReceptionist], pubSubMediator, role, numberOfContacts, responseTunnelReceiveTimeout)

  /**
   * Java API: Factory method for `ClusterReceptionist` [[akka.actor.Props]].
   */
  def props(
    pubSubMediator: ActorRef,
    role: String,
    numberOfContacts: Int,
    responseTunnelReceiveTimeout: FiniteDuration): Props =
    props(pubSubMediator, Internal.roleOption(role), numberOfContacts, responseTunnelReceiveTimeout)

  /**
   * Java API: Factory method for `ClusterReceptionist` [[akka.actor.Props]]
   * with default values.
   */
  def props(
    pubSubMediator: ActorRef,
    role: String): Props =
    props(pubSubMediator, Internal.roleOption(role))

  /**
   * INTERNAL API
   */
  private[pattern] object Internal {
    @SerialVersionUID(1L)
    case object GetContacts
    @SerialVersionUID(1L)
    final case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection])
    @SerialVersionUID(1L)
    case object Ping

    def roleOption(role: String): Option[String] = role match {
      case null | "" ⇒ None
      case _         ⇒ Some(role)
    }

    /**
     * Replies are tunneled via this actor, child of the receptionist, to avoid
     * inbound connections from other cluster nodes to the client.
     */
    class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor {
      context.setReceiveTimeout(timeout)
      context.watch(client)
      def receive = {
        case Ping                 ⇒ // keep alive from client
        case ReceiveTimeout       ⇒ context stop self
        case Terminated(`client`) ⇒ context stop self
        case msg                  ⇒ client forward msg
      }
    }
  }

}

/**
 * [[ClusterClient]] connects to this actor to retrieve. The `ClusterReceptionist` is
 * supposed to be started on all nodes, or all nodes with specified role, in the cluster.
 * The receptionist can be started with the [[ClusterReceptionistExtension]] or as an
 * ordinary actor (use the factory method [[ClusterReceptionist#props]]).
 *
 * The receptionist forwards messages from the client to the associated [[DistributedPubSubMediator]],
 * i.e. the client can send messages to any actor in the cluster that is registered in the
 * `DistributedPubSubMediator`. Messages from the client are wrapped in
 * [[DistributedPubSubMediator.Send]], [[DistributedPubSubMediator.SendToAll]]
 * or [[DistributedPubSubMediator.Publish]] with the semantics described in
 * [[DistributedPubSubMediator]].
 *
 * Response messages from the destination actor are tunneled via the receptionist
 * to avoid inbound connections from other cluster nodes to the client, i.e.
 * the `sender`, as seen by the destination actor, is not the client itself.
 * The `sender` of the response messages, as seen by the client, is preserved
 * as the original sender, so the client can choose to send subsequent messages
 * directly to the actor in the cluster.
 */
class ClusterReceptionist(
  pubSubMediator: ActorRef,
  role: Option[String],
  numberOfContacts: Int,
  responseTunnelReceiveTimeout: FiniteDuration)
  extends Actor with ActorLogging {

  import DistributedPubSubMediator.{ Send, SendToAll, Publish }

  import ClusterReceptionist.Internal._

  val cluster = Cluster(context.system)
  import cluster.selfAddress

  require(role.forall(cluster.selfRoles.contains),
    s"This cluster member [${selfAddress}] doesn't have the role [$role]")

  var nodes: immutable.SortedSet[Address] = {
    def hashFor(node: Address): Int = node match {
      // cluster node identifier is the host and port of the address; protocol and system is assumed to be the same
      case Address(_, _, Some(host), Some(port)) ⇒ MurmurHash.stringHash(s"${host}:${port}")
      case _ ⇒
        throw new IllegalStateException(s"Unexpected address without host/port: [$node]")
    }
    implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒
      val ha = hashFor(a)
      val hb = hashFor(b)
      ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0)
    }
    immutable.SortedSet()
  }
  val virtualNodesFactor = 10
  var consistentHash: ConsistentHash[Address] = ConsistentHash(nodes, virtualNodesFactor)

  override def preStart(): Unit = {
    super.preStart()
    require(!cluster.isTerminated, "Cluster node must not be terminated")
    cluster.subscribe(self, classOf[MemberEvent])
  }

  override def postStop(): Unit = {
    super.postStop()
    cluster unsubscribe self
  }

  def matchingRole(m: Member): Boolean = role.forall(m.hasRole)

  def responseTunnel(client: ActorRef): ActorRef = {
    val encName = URLEncoder.encode(client.path.toSerializationFormat, "utf-8")
    context.child(encName) match {
      case Some(tunnel) ⇒ tunnel
      case None ⇒
        context.actorOf(Props(classOf[ClientResponseTunnel], client, responseTunnelReceiveTimeout), encName)
    }
  }

  def receive = {
    case msg @ (_: Send | _: SendToAll | _: Publish) ⇒
      val tunnel = responseTunnel(sender())
      tunnel ! Ping // keep alive
      pubSubMediator.tell(msg, tunnel)

    case GetContacts ⇒
      // Consistent hashing is used to ensure that the reply to GetContacts
      // is the same from all nodes (most of the time) and it also
      // load balances the client connections among the nodes in the cluster.
      if (numberOfContacts >= nodes.size) {
        sender() ! Contacts(nodes.map(a ⇒ context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut))
      } else {
        // using toStringWithAddress in case the client is local, normally it is not, and
        // toStringWithAddress will use the remote address of the client
        val a = consistentHash.nodeFor(sender().path.toStringWithAddress(cluster.selfAddress))
        val slice = {
          val first = nodes.from(a).tail.take(numberOfContacts)
          if (first.size == numberOfContacts) first
          else first ++ nodes.take(numberOfContacts - first.size)
        }
        sender() ! Contacts(slice.map(a ⇒ context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut))
      }

    case state: CurrentClusterState ⇒
      nodes = nodes.empty ++ state.members.collect { case m if m.status != MemberStatus.Joining && matchingRole(m) ⇒ m.address }
      consistentHash = ConsistentHash(nodes, virtualNodesFactor)

    case MemberUp(m) ⇒
      if (matchingRole(m)) {
        nodes += m.address
        consistentHash = ConsistentHash(nodes, virtualNodesFactor)
      }

    case MemberRemoved(m, _) ⇒
      if (m.address == selfAddress)
        context stop self
      else if (matchingRole(m)) {
        nodes -= m.address
        consistentHash = ConsistentHash(nodes, virtualNodesFactor)
      }

    case _: MemberEvent ⇒ // not of interest
  }

}

Other Akka source code examples

Here is a short list of links related to this Akka ClusterClient.scala source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.