|
Akka/Scala example source code file (Cluster.scala)
The Cluster.scala Akka example source code
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.implicitConversions
import akka.actor._
import akka.actor.Status._
import akka.ConfigurationException
import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import akka.pattern._
import akka.remote._
import akka.routing._
import akka.util._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.annotation.tailrec
import scala.collection.immutable
import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ ExecutionContext, Await }
import com.typesafe.config.ConfigFactory
import akka.remote.DefaultFailureDetectorRegistry
import akka.remote.FailureDetector
import com.typesafe.config.Config
import akka.event.LoggingAdapter
import java.util.concurrent.ThreadFactory
import scala.util.control.NonFatal
import scala.annotation.varargs
/**
* Cluster Extension Id and factory for creating Cluster extension.
*/
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def get(system: ActorSystem): Cluster = super.get(system)
override def lookup = Cluster
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
/**
* INTERNAL API
*/
private[cluster] final val isAssertInvariantsEnabled: Boolean =
System.getProperty("akka.cluster.assert", "off").toLowerCase match {
case "on" | "true" ⇒ true
case _ ⇒ false
}
}
/**
* This module is responsible cluster membership information. Changes to the cluster
* information is retrieved through [[#subscribe]]. Commands to operate the cluster is
* available through methods in this class, such as [[#join]], [[#down]] and [[#leave]].
*
* Each cluster [[Member]] is identified by its [[akka.actor.Address]], and
* the cluster address of this actor system is [[#selfAddress]]. A member also has a status;
* initially [[MemberStatus.Joining]] followed by [[MemberStatus.Up]].
*/
class Cluster(val system: ExtendedActorSystem) extends Extension {
import ClusterEvent._
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
import InfoLogger._
/**
* The address including a `uid` of this cluster member.
* The `uid` is needed to be able to distinguish different
* incarnations of a member with same hostname and port.
*/
val selfUniqueAddress: UniqueAddress = system.provider match {
case c: ClusterActorRefProvider ⇒
UniqueAddress(c.transport.defaultAddress, AddressUidExtension(system).addressUid)
case other ⇒ throw new ConfigurationException(
s"ActorSystem [${system}] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]")
}
/**
* The address of this cluster member.
*/
def selfAddress: Address = selfUniqueAddress.address
/**
* roles that this member has
*/
def selfRoles: Set[String] = settings.Roles
/**
* Java API: roles that this member has
*/
def getSelfRoles: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(selfRoles).asJava
private val _isTerminated = new AtomicBoolean(false)
private val log = Logging(system, "Cluster")
// ClusterJmx is initialized as the last thing in the constructor
private var clusterJmx: Option[ClusterJmx] = None
logInfo("Starting up...")
val failureDetector: FailureDetectorRegistry[Address] = {
def createFailureDetector(): FailureDetector =
FailureDetectorLoader.load(settings.FailureDetectorImplementationClass, settings.FailureDetectorConfig, system)
new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector())
}
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
/**
* INTERNAL API
*/
private[cluster] val scheduler: Scheduler = {
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
import scala.collection.JavaConverters._
logInfo("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
(1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis)
val cfg = ConfigFactory.parseString(
s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback(
system.settings.config)
val threadFactory = system.threadFactory match {
case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler")
case tf ⇒ tf
}
system.dynamicAccess.createInstanceFor[Scheduler](system.settings.SchedulerClass, immutable.Seq(
classOf[Config] -> cfg,
classOf[LoggingAdapter] -> log,
classOf[ThreadFactory] -> threadFactory)).get
} else {
// delegate to system.scheduler, but don't close over system
val systemScheduler = system.scheduler
new Scheduler with Closeable {
override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing
override def maxFrequency: Double = systemScheduler.maxFrequency
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.schedule(initialDelay, interval, runnable)
override def scheduleOnce(delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay, runnable)
}
}
}
// create supervisor for daemons under path "/system/cluster"
private val clusterDaemons: ActorRef = {
system.systemActorOf(Props(classOf[ClusterDaemon], settings).
withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster")
}
/**
* INTERNAL API
*/
private[cluster] val clusterCore: ActorRef = {
implicit val timeout = system.settings.CreationTimeout
try {
Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration)
} catch {
case NonFatal(e) ⇒
log.error(e, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'.")
shutdown()
// don't re-throw, that would cause the extension to be re-recreated
// from shutdown() or other places, which may result in
// InvalidActorNameException: actor name [cluster] is not unique
system.deadLetters
}
}
private[cluster] val readView: ClusterReadView = new ClusterReadView(this)
system.registerOnTermination(shutdown())
if (JmxEnabled)
clusterJmx = {
val jmx = new ClusterJmx(this, log)
jmx.createMBean()
Some(jmx)
}
logInfo("Started up successfully")
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
/**
* Returns true if this cluster instance has be shutdown.
*/
def isTerminated: Boolean = _isTerminated.get
/**
* Current snapshot state of the cluster.
*/
def state: CurrentClusterState = readView.state
/**
* Subscribe to one or more cluster domain events.
* The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclasses.
*
* A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]]
* will be sent to the subscriber as the first message.
*/
@varargs def subscribe(subscriber: ActorRef, to: Class[_]*): Unit =
clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode = InitialStateAsSnapshot, to.toSet)
/**
* Subscribe to one or more cluster domain events.
* The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclasses.
*
* If `initialStateMode` is [[ClusterEvent.InitialStateAsEvents]] the events corresponding
* to the current state will be sent to the subscriber to mimic what you would
* have seen if you were listening to the events when they occurred in the past.
*
* If `initialStateMode` is [[ClusterEvent.InitialStateAsSnapshot]] a snapshot of
* [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the subscriber as the
* first message.
*
* Note that for large clusters it is more efficient to use `InitialStateAsSnapshot`.
*/
@varargs def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit =
clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode, to.toSet)
/**
* Unsubscribe to all cluster domain events.
*/
def unsubscribe(subscriber: ActorRef): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber, None)
/**
* Unsubscribe to a specific type of cluster domain events,
* matching previous `subscribe` registration.
*/
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber, Some(to))
/**
* Send current (full) state of the cluster to the specified
* receiver. If you want this to happen periodically you need to schedule
* a call to this method yourself. Note that you can also retrieve the current
* state with [[#state]].
*/
def sendCurrentClusterState(receiver: ActorRef): Unit =
clusterCore ! InternalClusterAction.SendCurrentClusterState(receiver)
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(selfAddress)' command is sent to the node to join.
*
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*/
def join(address: Address): Unit =
clusterCore ! ClusterUserAction.JoinTo(address)
/**
* Join the specified seed nodes without defining them in config.
* Especially useful from tests when Addresses are unknown before startup time.
*
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*
* JAVA API: Use akka.japi.Util.immutableSeq to convert a java.lang.Iterable
* to the type needed for the seedNodes parameter.
*/
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector)
/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
* The member will go through the status changes [[MemberStatus.Leaving]] (not published to
* subscribers) followed by [[MemberStatus.Exiting]] and finally [[MemberStatus.Removed]].
*
* Note that this command can be issued to any member in the cluster, not necessarily the
* one that is leaving. The cluster extension, but not the actor system or JVM, of the
* leaving member will be shutdown after the leader has changed status of the member to
* Exiting. Thereafter the member will be removed from the cluster. Normally this is
* handled automatically, but in case of network failures during this process it might
* still be necessary to set the node’s status to Down in order to complete the removal.
*/
def leave(address: Address): Unit =
clusterCore ! ClusterUserAction.Leave(address)
/**
* Send command to DOWN the node specified by 'address'.
*
* When a member is considered by the failure detector to be unreachable the leader is not
* allowed to perform its duties, such as changing status of new joining members to 'Up'.
* The status of the unreachable member must be changed to 'Down', which can be done with
* this method.
*/
def down(address: Address): Unit =
clusterCore ! ClusterUserAction.Down(address)
/**
* The supplied thunk will be run, once, when current cluster member is `Up`.
* Typically used together with configuration option `akka.cluster.min-nr-of-members'
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
*/
def registerOnMemberUp[T](code: ⇒ T): Unit =
registerOnMemberUp(new Runnable { def run = code })
/**
* Java API: The supplied callback will be run, once, when current cluster member is `Up`.
* Typically used together with configuration option `akka.cluster.min-nr-of-members'
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
*/
def registerOnMemberUp(callback: Runnable): Unit = clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
/**
* INTERNAL API.
*
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*
* Should not called by the user. The user can issue a LEAVE command which will tell the node
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
*/
private[cluster] def shutdown(): Unit = {
if (_isTerminated.compareAndSet(false, true)) {
logInfo("Shutting down...")
system.stop(clusterDaemons)
readView.close()
closeScheduler()
clusterJmx foreach { _.unregisterMBean() }
logInfo("Successfully shut down")
}
}
private def closeScheduler(): Unit = scheduler match {
case x: Closeable ⇒ x.close()
case _ ⇒
}
/**
* INTERNAL API
*/
private[cluster] object InfoLogger {
def logInfo(message: String): Unit =
if (LogInfo) log.info("Cluster Node [{}] - {}", selfAddress, message)
def logInfo(template: String, arg1: Any): Unit =
if (LogInfo) log.info("Cluster Node [{}] - " + template, selfAddress, arg1)
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
if (LogInfo) log.info("Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
}
}
Other Akka source code examplesHere is a short list of links related to this Akka Cluster.scala source code file: |
| ... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2024 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.